From 895c761cc46f48907dc8442297f84c8959692b49 Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 10 Jun 2020 01:32:16 -0400 Subject: basic WAL functionality works --- src/lib.rs | 70 ++++++++++++++++++++++++++++++++------------------------------ 1 file changed, 36 insertions(+), 34 deletions(-) (limited to 'src') diff --git a/src/lib.rs b/src/lib.rs index 1e1b653..5dcb78a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,7 +55,9 @@ struct WALState { pub trait WALFile { /// Initialize the file space in [offset, offset + length) to zero. - fn allocate(&self, offset: WALPos, length: usize); + fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>; + /// Truncate a file to a specified length. + fn truncate(&self, length: usize) -> Result<(), ()>; /// Write data with offset. fn write(&self, offset: WALPos, data: WALBytes); /// Read data with offset. @@ -67,7 +69,7 @@ pub trait WALStore { fn open_file(&self, filename: &str, touch: bool) -> Option>; /// Unlink a file given the filename. fn remove_file(&self, filename: &str) -> bool; - /// Enumerate all WAL files, ordered by their filenames. + /// Enumerate all WAL files. fn enumerate_files(&self) -> Box<[String]>; /// Apply (redo) the payload during recovery. fn apply_payload(&self, payload: WALBytes); @@ -112,7 +114,7 @@ impl WALFilePool { } fn get_fid(&mut self, fname: &str) -> WALFileId { - scan_fmt!(fname, "{:x}.log", [hex WALFileId]).unwrap() + scan_fmt!(fname, "{x}.log", [hex WALFileId]).unwrap() } // TODO: evict stale handles @@ -125,7 +127,7 @@ impl WALFilePool { for (off, w) in &writes[1..] { let next_fid = off >> self.file_nbit; if next_fid != fid { - h.allocate(alloc_start, (alloc_end - alloc_start) as usize); + h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap(); h = self.get_file(next_fid, true); alloc_start = 0; alloc_end = alloc_start + w.len() as u64; @@ -134,7 +136,7 @@ impl WALFilePool { alloc_end += w.len() as u64; } } - h.allocate(alloc_start, (alloc_end - alloc_start) as usize); + h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap(); for (off, w) in writes.into_iter() { self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w); } @@ -143,6 +145,8 @@ impl WALFilePool { fn remove_file(&self, fid: u64) -> bool { self.store.remove_file(&Self::get_fname(fid)) } + + fn reset(&mut self) { self.handles.clear() } } pub struct WALWriter { @@ -201,7 +205,6 @@ impl WALWriter { if d >= rsize { // the remaining rec fits in the block let payload = rec; - println!("rsize {} {}", rsize, d); blob.crc32 = crc::crc32::checksum_ieee(payload); blob.rsize = rsize; blob.rtype = if started {WALRingType::Last} else {WALRingType::Full}; @@ -285,61 +288,60 @@ impl WALLoader { } pub fn recover(mut self) -> WALWriter { + let block_size = 1 << self.file_pool.block_nbit; let msize = std::mem::size_of::() as u32; - let logfiles = self.file_pool.store.enumerate_files(); + let mut logfiles = self.file_pool.store.enumerate_files(); + // TODO: use regex to filter out invalid files + // TODO: check for missing logfiles + logfiles.sort(); + let mut chunks = None; for fname in logfiles.iter() { let fid = self.file_pool.get_fid(fname); let f = self.file_pool.get_file(fid, false); let mut off = 0; - let mut end = false; - while !end { + while block_size - (off & (block_size - 1)) > msize as u64 { let header_raw = f.read(off, msize as usize); + off += msize as u64; let header = unsafe { std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; let rsize = header.rsize; - off += msize as u64; match header.rtype { WALRingType::Full => { + assert!(chunks.is_none()); let payload = f.read(off, rsize as usize); - self.file_pool.store.apply_payload(payload); off += rsize as u64; + self.file_pool.store.apply_payload(payload); }, WALRingType::First => { - let mut chunks = vec![f.read(off, rsize as usize)]; + assert!(chunks.is_none()); + chunks = Some(vec![f.read(off, rsize as usize)]); off += rsize as u64; - loop { - let header_raw = f.read(off, msize as usize); - let header = unsafe { - std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; - if let WALRingType::Null = header.rtype { - end = true; - break - } - let rsize = header.rsize; - let payload = f.read(off, rsize as usize); - off += msize as u64; - chunks.push(payload); - match header.rtype { - WALRingType::Middle => (), - WALRingType::Last => break, - _ => unreachable!() - } - } + }, + WALRingType::Middle => { + chunks.as_mut().unwrap().push(f.read(off, rsize as usize)); + off += rsize as u64; + }, + WALRingType::Last => { + chunks.as_mut().unwrap().push(f.read(off, rsize as usize)); + off += rsize as u64; + + let _chunks = chunks.take().unwrap(); let mut payload = Vec::new(); - payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0); + payload.resize(_chunks.iter().fold(0, |acc, v| acc + v.len()), 0); let mut ps = &mut payload[..]; - for c in chunks { + for c in _chunks { ps[..c.len()].copy_from_slice(&*c); ps = &mut ps[c.len()..]; } self.file_pool.store.apply_payload(payload.into_boxed_slice()); }, - WALRingType::Null => end = true, - _ => unreachable!() + WALRingType::Null => break, } } + f.truncate(0).unwrap(); self.file_pool.remove_file(fid); } + self.file_pool.reset(); WALWriter::new(WALState { first_fid: 0, next: 0, -- cgit v1.2.3-70-g09d2