From 56bc442c4da0e6e75401d50f206fe56e1bf8adbb Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 9 Jun 2020 21:46:12 -0400 Subject: ... --- Cargo.lock | 58 ++++++++++++++++++++++++ Cargo.toml | 1 + src/lib.rs | 148 +++++++++++++++++++++++++++++++++++++++++++++++++------------ 3 files changed, 178 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f9caa86..d6e8d74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9,6 +9,15 @@ dependencies = [ "const-random", ] +[[package]] +name = "aho-corasick" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada" +dependencies = [ + "memchr", +] + [[package]] name = "autocfg" version = "0.1.7" @@ -73,6 +82,7 @@ version = "0.1.0" dependencies = [ "crc", "lru", + "scan_fmt", ] [[package]] @@ -85,6 +95,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + [[package]] name = "libc" version = "0.2.71" @@ -100,12 +116,54 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "memchr" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" + [[package]] name = "proc-macro-hack" version = "0.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4" +[[package]] +name = "regex" +version = "1.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", + "thread_local", +] + +[[package]] +name = "regex-syntax" +version = "0.6.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8" + +[[package]] +name = "scan_fmt" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248286eec0f55678879ef1caec3d76276643ebcb5460d8cb6e732ef40f50aabe" +dependencies = [ + "regex", +] + +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index fb77a6a..ee8d867 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ edition = "2018" [dependencies] crc = "1.8.1" lru = "0.5.1" +scan_fmt = "0.2.5" [lib] name = "growthring" diff --git a/src/lib.rs b/src/lib.rs index 29f7835..53f4e0f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,9 @@ +#[macro_use] extern crate scan_fmt; use std::collections::BinaryHeap; #[repr(u8)] enum WALRingType { + #[allow(dead_code)] Null = 0x0, Full, First, @@ -17,6 +19,7 @@ struct WALRingBlob { // payload follows } +type WALBytes = Box<[u8]>; type WALFileId = u64; type WALPos = u64; @@ -45,21 +48,29 @@ struct WALState { /// the next position for a record, addressed in the entire WAL space pub next: WALPos, /// number of bits for a block - pub block_nbit: u8, + pub block_nbit: u64, /// number of bits for a file - pub file_nbit: u8, + pub file_nbit: u64, } pub trait WALFile { + /// Initialize the file space in [offset, offset + length) to zero. fn allocate(&self, offset: WALPos, length: usize); - fn write(&self, offset: WALPos, data: Box<[u8]>); - fn read(&self, offset: WALPos, length: usize) -> Box<[u8]>; + /// Write data with offset. + fn write(&self, offset: WALPos, data: WALBytes); + /// Read data with offset. + fn read(&self, offset: WALPos, length: usize) -> WALBytes; } pub trait WALStore { + /// Open a file given the filename, create the file if not exists when `touch` is `true`. fn open_file(&self, filename: &str, touch: bool) -> Option>; + /// Unlink a file given the filename. fn remove_file(&self, filename: &str) -> bool; - fn scan_files(&self) -> Box<[&str]>; + /// Enumerate all WAL files, ordered by their filenames. + fn enumerate_files(&self) -> Box<[String]>; + /// Apply (redo) the payload during recovery. + fn apply_payload(&self, payload: WALBytes); } /// The middle layer that manages WAL file handles and invokes public trait functions to actually @@ -69,18 +80,19 @@ struct WALFilePool { handles: lru::LruCache>, file_nbit: u64, file_size: u64, - block_size: u64, + block_nbit: u64, } impl WALFilePool { fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { let file_nbit = file_nbit as u64; + let block_nbit = block_nbit as u64; WALFilePool { store, handles: lru::LruCache::new(cache_size), file_nbit, file_size: 1 << (file_nbit as u64), - block_size: 1 << (block_nbit as u64) + block_nbit, } } @@ -88,65 +100,71 @@ impl WALFilePool { format!("{:08x}.log", fid) } - fn get_file(&mut self, fid: u64) -> &'static dyn WALFile { + fn get_file(&mut self, fid: u64, touch: bool) -> &'static dyn WALFile { let h = match self.handles.get(&fid) { Some(h) => &**h, None => { - self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), true).unwrap()); + self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch).unwrap()); &**self.handles.get(&fid).unwrap() } }; unsafe {&*(h as *const dyn WALFile)} } - fn write(&mut self, writes: Vec<(WALPos, Box<[u8]>)>) { - // pre-allocate the blocks + fn get_fid(&mut self, fname: &str) -> WALFileId { + scan_fmt!(fname, "{:x}.log", [hex WALFileId]).unwrap() + } + + // TODO: evict stale handles + fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) { + // pre-allocate the file space let fid = writes[0].0 >> self.file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); - let mut alloc_end = alloc_start + self.block_size; - let mut h = self.get_file(fid); - for (off, _) in &writes[1..] { + let mut alloc_end = alloc_start + writes[0].1.len() as u64; + let mut h = self.get_file(fid, true); + for (off, w) in &writes[1..] { let next_fid = off >> self.file_nbit; if next_fid != fid { h.allocate(alloc_start, (alloc_end - alloc_start) as usize); - h = self.get_file(next_fid); + h = self.get_file(next_fid, true); alloc_start = 0; - alloc_end = alloc_start + self.block_size; + alloc_end = alloc_start + w.len() as u64; } else { - alloc_end += self.block_size; + alloc_end += w.len() as u64; } } h.allocate(alloc_start, (alloc_end - alloc_start) as usize); for (off, w) in writes.into_iter() { - self.get_file(off >> self.file_nbit).write(off, w); + self.get_file(off >> self.file_nbit, true).write(off, w); } } fn remove_file(&self, fid: u64) -> bool { - true + self.store.remove_file(&Self::get_fname(fid)) } } pub struct WALWriter { state: WALState, file_pool: WALFilePool, - block_buffer: Box<[u8]>, + block_buffer: WALBytes, block_size: u32, next_complete: WALPos, io_complete: BinaryHeap } impl WALWriter { - fn new(state: WALState, wal_store: F, cache_size: usize) -> Self { + fn new(state: WALState, file_pool: WALFilePool) -> Self { let mut b = Vec::new(); - let block_nbit = state.block_nbit; - let block_size = 1 << (block_nbit as u32); - let file_nbit = state.file_nbit; - let file_size = 1 << (file_nbit as u64); + let block_size = 1 << file_pool.block_nbit as u32; + //let block_nbit = state.block_nbit; + //let block_size = 1 << (block_nbit as u32); + //let file_nbit = state.file_nbit; + //let file_size = 1 << (file_nbit as u64); b.resize(block_size as usize, 0); WALWriter{ state, - file_pool: WALFilePool::new(wal_store, file_nbit, block_nbit, cache_size), + file_pool, block_buffer: b.into_boxed_slice(), block_size, next_complete: 0, @@ -157,7 +175,7 @@ impl WALWriter { /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the /// function returns. The caller then has the knowledge of WAL writes so it should defer /// actual data writes after WAL writes. - pub fn grow(&mut self, records: &[Box<[u8]>]) -> Box<[WALRingId]> { + pub fn grow(&mut self, records: &[WALBytes]) -> Box<[WALRingId]> { let mut res = Vec::new(); let mut writes = Vec::new(); let msize = std::mem::size_of::() as u32; @@ -176,7 +194,7 @@ impl WALWriter { if remain > msize { let d = remain - msize; let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>( - &mut self.block_buffer[bbuff_cur as usize] as *mut u8)}; + (&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())}; let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64; if d >= rsize { // the remaining rec fits in the block @@ -249,5 +267,77 @@ impl WALWriter { } } -struct WALReader { +pub struct WALReader { + file_pool: WALFilePool, +} + +impl WALReader { + pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { + let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size); + WALReader{ file_pool } + } + + pub fn recover(mut self) -> WALWriter { + let msize = std::mem::size_of::() as u32; + let logfiles = self.file_pool.store.enumerate_files(); + for fname in logfiles.iter() { + let fid = self.file_pool.get_fid(fname); + let f = self.file_pool.get_file(fid, false); + let mut off = 0; + let mut end = false; + while !end { + let header_raw = f.read(off, msize as usize); + let header = unsafe { + std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; + let rsize = header.rsize; + off += msize as u64; + match header.rtype { + WALRingType::Full => { + let payload = f.read(off, rsize as usize); + self.file_pool.store.apply_payload(payload); + off += rsize as u64; + }, + WALRingType::First => { + let mut chunks = vec![f.read(off, rsize as usize)]; + off += rsize as u64; + loop { + let header_raw = f.read(off, msize as usize); + let header = unsafe { + std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; + if let WALRingType::Null = header.rtype { + end = true; + break + } + let rsize = header.rsize; + let payload = f.read(off, rsize as usize); + off += msize as u64; + chunks.push(payload); + match header.rtype { + WALRingType::Middle => (), + WALRingType::Last => break, + _ => unreachable!() + } + } + let mut payload = Vec::new(); + payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0); + let mut ps = &mut payload[..]; + for c in chunks { + ps[..c.len()].copy_from_slice(&*c); + ps = &mut ps[c.len()..]; + } + self.file_pool.store.apply_payload(payload.into_boxed_slice()); + }, + WALRingType::Null => end = true, + _ => unreachable!() + } + } + self.file_pool.remove_file(fid); + } + WALWriter::new(WALState { + first_fid: 0, + next: 0, + block_nbit: self.file_pool.block_nbit, + file_nbit: self.file_pool.file_nbit, + }, self.file_pool) + } } -- cgit v1.2.3-70-g09d2