diff options
author | Determinant <[email protected]> | 2020-06-09 21:46:12 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-09 21:46:12 -0400 |
commit | 56bc442c4da0e6e75401d50f206fe56e1bf8adbb (patch) | |
tree | d2fb87c5001d17cf14dda1701a93b085fb2a6402 /src | |
parent | 61efb65b72a1d8d02a2c1615ae4a498eef7540c6 (diff) |
...
Diffstat (limited to 'src')
-rw-r--r-- | src/lib.rs | 148 |
1 files changed, 119 insertions, 29 deletions
@@ -1,7 +1,9 @@ +#[macro_use] extern crate scan_fmt; use std::collections::BinaryHeap; #[repr(u8)] enum WALRingType { + #[allow(dead_code)] Null = 0x0, Full, First, @@ -17,6 +19,7 @@ struct WALRingBlob { // payload follows } +type WALBytes = Box<[u8]>; type WALFileId = u64; type WALPos = u64; @@ -45,21 +48,29 @@ struct WALState { /// the next position for a record, addressed in the entire WAL space pub next: WALPos, /// number of bits for a block - pub block_nbit: u8, + pub block_nbit: u64, /// number of bits for a file - pub file_nbit: u8, + pub file_nbit: u64, } pub trait WALFile { + /// Initialize the file space in [offset, offset + length) to zero. fn allocate(&self, offset: WALPos, length: usize); - fn write(&self, offset: WALPos, data: Box<[u8]>); - fn read(&self, offset: WALPos, length: usize) -> Box<[u8]>; + /// Write data with offset. + fn write(&self, offset: WALPos, data: WALBytes); + /// Read data with offset. + fn read(&self, offset: WALPos, length: usize) -> WALBytes; } pub trait WALStore { + /// Open a file given the filename, create the file if not exists when `touch` is `true`. fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>; + /// Unlink a file given the filename. fn remove_file(&self, filename: &str) -> bool; - fn scan_files(&self) -> Box<[&str]>; + /// Enumerate all WAL files, ordered by their filenames. + fn enumerate_files(&self) -> Box<[String]>; + /// Apply (redo) the payload during recovery. + fn apply_payload(&self, payload: WALBytes); } /// The middle layer that manages WAL file handles and invokes public trait functions to actually @@ -69,18 +80,19 @@ struct WALFilePool<F: WALStore> { handles: lru::LruCache<WALFileId, Box<dyn WALFile>>, file_nbit: u64, file_size: u64, - block_size: u64, + block_nbit: u64, } impl<F: WALStore> WALFilePool<F> { fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { let file_nbit = file_nbit as u64; + let block_nbit = block_nbit as u64; WALFilePool { store, handles: lru::LruCache::new(cache_size), file_nbit, file_size: 1 << (file_nbit as u64), - block_size: 1 << (block_nbit as u64) + block_nbit, } } @@ -88,65 +100,71 @@ impl<F: WALStore> WALFilePool<F> { format!("{:08x}.log", fid) } - fn get_file(&mut self, fid: u64) -> &'static dyn WALFile { + fn get_file(&mut self, fid: u64, touch: bool) -> &'static dyn WALFile { let h = match self.handles.get(&fid) { Some(h) => &**h, None => { - self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), true).unwrap()); + self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch).unwrap()); &**self.handles.get(&fid).unwrap() } }; unsafe {&*(h as *const dyn WALFile)} } - fn write(&mut self, writes: Vec<(WALPos, Box<[u8]>)>) { - // pre-allocate the blocks + fn get_fid(&mut self, fname: &str) -> WALFileId { + scan_fmt!(fname, "{:x}.log", [hex WALFileId]).unwrap() + } + + // TODO: evict stale handles + fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) { + // pre-allocate the file space let fid = writes[0].0 >> self.file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); - let mut alloc_end = alloc_start + self.block_size; - let mut h = self.get_file(fid); - for (off, _) in &writes[1..] { + let mut alloc_end = alloc_start + writes[0].1.len() as u64; + let mut h = self.get_file(fid, true); + for (off, w) in &writes[1..] { let next_fid = off >> self.file_nbit; if next_fid != fid { h.allocate(alloc_start, (alloc_end - alloc_start) as usize); - h = self.get_file(next_fid); + h = self.get_file(next_fid, true); alloc_start = 0; - alloc_end = alloc_start + self.block_size; + alloc_end = alloc_start + w.len() as u64; } else { - alloc_end += self.block_size; + alloc_end += w.len() as u64; } } h.allocate(alloc_start, (alloc_end - alloc_start) as usize); for (off, w) in writes.into_iter() { - self.get_file(off >> self.file_nbit).write(off, w); + self.get_file(off >> self.file_nbit, true).write(off, w); } } fn remove_file(&self, fid: u64) -> bool { - true + self.store.remove_file(&Self::get_fname(fid)) } } pub struct WALWriter<F: WALStore> { state: WALState, file_pool: WALFilePool<F>, - block_buffer: Box<[u8]>, + block_buffer: WALBytes, block_size: u32, next_complete: WALPos, io_complete: BinaryHeap<WALRingId> } impl<F: WALStore> WALWriter<F> { - fn new(state: WALState, wal_store: F, cache_size: usize) -> Self { + fn new(state: WALState, file_pool: WALFilePool<F>) -> Self { let mut b = Vec::new(); - let block_nbit = state.block_nbit; - let block_size = 1 << (block_nbit as u32); - let file_nbit = state.file_nbit; - let file_size = 1 << (file_nbit as u64); + let block_size = 1 << file_pool.block_nbit as u32; + //let block_nbit = state.block_nbit; + //let block_size = 1 << (block_nbit as u32); + //let file_nbit = state.file_nbit; + //let file_size = 1 << (file_nbit as u64); b.resize(block_size as usize, 0); WALWriter{ state, - file_pool: WALFilePool::new(wal_store, file_nbit, block_nbit, cache_size), + file_pool, block_buffer: b.into_boxed_slice(), block_size, next_complete: 0, @@ -157,7 +175,7 @@ impl<F: WALStore> WALWriter<F> { /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the /// function returns. The caller then has the knowledge of WAL writes so it should defer /// actual data writes after WAL writes. - pub fn grow(&mut self, records: &[Box<[u8]>]) -> Box<[WALRingId]> { + pub fn grow(&mut self, records: &[WALBytes]) -> Box<[WALRingId]> { let mut res = Vec::new(); let mut writes = Vec::new(); let msize = std::mem::size_of::<WALRingBlob>() as u32; @@ -176,7 +194,7 @@ impl<F: WALStore> WALWriter<F> { if remain > msize { let d = remain - msize; let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>( - &mut self.block_buffer[bbuff_cur as usize] as *mut u8)}; + (&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())}; let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64; if d >= rsize { // the remaining rec fits in the block @@ -249,5 +267,77 @@ impl<F: WALStore> WALWriter<F> { } } -struct WALReader { +pub struct WALReader<F: WALStore> { + file_pool: WALFilePool<F>, +} + +impl<F: WALStore> WALReader<F> { + pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { + let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size); + WALReader{ file_pool } + } + + pub fn recover(mut self) -> WALWriter<F> { + let msize = std::mem::size_of::<WALRingBlob>() as u32; + let logfiles = self.file_pool.store.enumerate_files(); + for fname in logfiles.iter() { + let fid = self.file_pool.get_fid(fname); + let f = self.file_pool.get_file(fid, false); + let mut off = 0; + let mut end = false; + while !end { + let header_raw = f.read(off, msize as usize); + let header = unsafe { + std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; + let rsize = header.rsize; + off += msize as u64; + match header.rtype { + WALRingType::Full => { + let payload = f.read(off, rsize as usize); + self.file_pool.store.apply_payload(payload); + off += rsize as u64; + }, + WALRingType::First => { + let mut chunks = vec![f.read(off, rsize as usize)]; + off += rsize as u64; + loop { + let header_raw = f.read(off, msize as usize); + let header = unsafe { + std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; + if let WALRingType::Null = header.rtype { + end = true; + break + } + let rsize = header.rsize; + let payload = f.read(off, rsize as usize); + off += msize as u64; + chunks.push(payload); + match header.rtype { + WALRingType::Middle => (), + WALRingType::Last => break, + _ => unreachable!() + } + } + let mut payload = Vec::new(); + payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0); + let mut ps = &mut payload[..]; + for c in chunks { + ps[..c.len()].copy_from_slice(&*c); + ps = &mut ps[c.len()..]; + } + self.file_pool.store.apply_payload(payload.into_boxed_slice()); + }, + WALRingType::Null => end = true, + _ => unreachable!() + } + } + self.file_pool.remove_file(fid); + } + WALWriter::new(WALState { + first_fid: 0, + next: 0, + block_nbit: self.file_pool.block_nbit, + file_nbit: self.file_pool.file_nbit, + }, self.file_pool) + } } |