diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lib.rs | 3 | ||||
-rw-r--r-- | src/wal.rs | 228 |
2 files changed, 161 insertions, 70 deletions
@@ -1,2 +1,3 @@ -#[macro_use] extern crate scan_fmt; +#[macro_use] +extern crate scan_fmt; pub mod wal; @@ -7,7 +7,7 @@ enum WALRingType { Full, First, Middle, - Last + Last, } #[repr(packed)] @@ -25,18 +25,27 @@ pub type WALPos = u64; #[derive(Eq, PartialEq, Copy, Clone, Debug, Hash)] pub struct WALRingId { start: WALPos, - end: WALPos + end: WALPos, } impl WALRingId { - pub fn empty_id() -> Self { WALRingId { start: 0, end: 0 } } - pub fn get_start(&self) -> WALPos { self.start } - pub fn get_end(&self) -> WALPos { self.end } + pub fn empty_id() -> Self { + WALRingId { start: 0, end: 0 } + } + pub fn get_start(&self) -> WALPos { + self.start + } + pub fn get_end(&self) -> WALPos { + self.end + } } impl Ord for WALRingId { fn cmp(&self, other: &WALRingId) -> std::cmp::Ordering { - other.start.cmp(&self.start).then_with(|| other.end.cmp(&self.end)) + other + .start + .cmp(&self.start) + .then_with(|| other.end.cmp(&self.end)) } } @@ -68,14 +77,22 @@ pub trait WALFile { /// should be _atomic_ (the entire single write should be all or nothing). fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>; /// Read data with offset. - fn read(&self, offset: WALPos, length: usize) -> Result<Option<WALBytes>, ()>; + fn read( + &self, + offset: WALPos, + length: usize, + ) -> Result<Option<WALBytes>, ()>; } pub trait WALStore { type FileNameIter: Iterator<Item = String>; /// Open a file given the filename, create the file if not exists when `touch` is `true`. - fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()>; + fn open_file( + &mut self, + filename: &str, + touch: bool, + ) -> Result<Box<dyn WALFile>, ()>; /// Unlink a file given the filename. fn remove_file(&mut self, filename: &str) -> Result<(), ()>; /// Enumerate all WAL filenames. It should include all WAL files that are previously opened @@ -84,7 +101,11 @@ pub trait WALStore { /// Apply the payload during recovery. An invocation of the callback waits the application for /// redoing the given operation to ensure its state is consistent. We assume the necessary /// changes by the payload has already been persistent when the callback returns. - fn apply_payload(&mut self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()>; + fn apply_payload( + &mut self, + payload: WALBytes, + ringid: WALRingId, + ) -> Result<(), ()>; } /// The middle layer that manages WAL file handles and invokes public trait functions to actually @@ -114,15 +135,22 @@ impl<F: WALStore> WALFilePool<F> { format!("{:08x}.log", fid) } - fn get_file(&mut self, fid: u64, touch: bool) -> Result<&'static dyn WALFile, ()> { + fn get_file( + &mut self, + fid: u64, + touch: bool, + ) -> Result<&'static dyn WALFile, ()> { let h = match self.handles.get(&fid) { Some(h) => &**h, None => { - self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch)?); + self.handles.put( + fid, + self.store.open_file(&Self::get_fname(fid), touch)?, + ); &**self.handles.get(&fid).unwrap() } }; - Ok(unsafe {&*(h as *const dyn WALFile)}) + Ok(unsafe { &*(h as *const dyn WALFile) }) } fn get_fid(&mut self, fname: &str) -> WALFileId { @@ -135,14 +163,19 @@ impl<F: WALStore> WALFilePool<F> { let mut fid = writes[0].0 >> self.file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); let mut alloc_end = alloc_start + writes[0].1.len() as u64; - let files = writes.iter().map(|(off, _)| - self.get_file((*off) >> self.file_nbit, true)).collect::<Result<Vec<&dyn WALFile>, ()>>()?; + let files = writes + .iter() + .map(|(off, _)| self.get_file((*off) >> self.file_nbit, true)) + .collect::<Result<Vec<&dyn WALFile>, ()>>()?; // prepare file handles let mut last_h = files[0]; for ((off, w), h) in writes[1..].iter().zip(files[1..].iter()) { let next_fid = off >> self.file_nbit; if next_fid != fid { - last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?; + last_h.allocate( + alloc_start, + (alloc_end - alloc_start) as usize, + )?; last_h = *h; alloc_start = 0; alloc_end = alloc_start + w.len() as u64; @@ -153,7 +186,8 @@ impl<F: WALStore> WALFilePool<F> { } last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?; for (off, w) in writes.into_iter() { - self.get_file(off >> self.file_nbit, true)?.write(off & (self.file_size - 1), w)?; + self.get_file(off >> self.file_nbit, true)? + .write(off & (self.file_size - 1), w)?; } Ok(()) } @@ -162,7 +196,9 @@ impl<F: WALStore> WALFilePool<F> { self.store.remove_file(&Self::get_fname(fid)) } - fn reset(&mut self) { self.handles.clear() } + fn reset(&mut self) { + self.handles.clear() + } } pub struct WALWriter<F: WALStore> { @@ -172,7 +208,7 @@ pub struct WALWriter<F: WALStore> { block_size: u32, next_complete: WALPos, io_complete: BinaryHeap<WALRingId>, - msize: usize + msize: usize, } impl<F: WALStore> WALWriter<F> { @@ -181,21 +217,24 @@ impl<F: WALStore> WALWriter<F> { let block_size = 1 << file_pool.block_nbit as u32; let msize = std::mem::size_of::<WALRingBlob>(); b.resize(block_size as usize, 0); - WALWriter{ + WALWriter { state, file_pool, block_buffer: b.into_boxed_slice(), block_size, next_complete: 0, io_complete: BinaryHeap::new(), - msize + msize, } } /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the /// function returns. The caller then has the knowledge of WAL writes so it should defer /// actual data writes after WAL writes. - pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> (Box<[WALRingId]>, Result<(), ()>) { + pub fn grow<T: AsRef<[WALBytes]>>( + &mut self, + records: T, + ) -> (Box<[WALRingId]>, Result<(), ()>) { let mut res = Vec::new(); let mut writes = Vec::new(); let msize = self.msize as u32; @@ -213,9 +252,14 @@ impl<F: WALStore> WALWriter<F> { let remain = self.block_size - bbuff_cur; if remain > msize { let d = remain - msize; - let rs0 = self.state.next + (bbuff_cur - bbuff_start) as u64; - let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>( - (&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())}; + let rs0 = + self.state.next + (bbuff_cur - bbuff_start) as u64; + let blob = unsafe { + std::mem::transmute::<*mut u8, &mut WALRingBlob>( + (&mut self.block_buffer[bbuff_cur as usize..]) + .as_mut_ptr(), + ) + }; bbuff_cur += msize; if d >= rsize { // the remaining rec fits in the block @@ -228,25 +272,28 @@ impl<F: WALStore> WALWriter<F> { (rs0, WALRingType::Full) }; blob.rtype = rt; - &mut self.block_buffer[ - bbuff_cur as usize.. - bbuff_cur as usize + payload.len()].copy_from_slice(payload); + &mut self.block_buffer[bbuff_cur as usize.. + bbuff_cur as usize + payload.len()] + .copy_from_slice(payload); bbuff_cur += rsize; rsize = 0; - let end = self.state.next + (bbuff_cur - bbuff_start) as u64; - res.push(WALRingId{start: rs, end }); + let end = + self.state.next + (bbuff_cur - bbuff_start) as u64; + res.push(WALRingId { start: rs, end }); } else { // the remaining block can only accommodate partial rec let payload = &rec[..d as usize]; blob.crc32 = crc::crc32::checksum_ieee(payload); blob.rsize = d; - blob.rtype = if ring_start.is_some() {WALRingType::Middle} else { + blob.rtype = if ring_start.is_some() { + WALRingType::Middle + } else { ring_start = Some(rs0); WALRingType::First }; - &mut self.block_buffer[ - bbuff_cur as usize.. - bbuff_cur as usize + payload.len()].copy_from_slice(payload); + &mut self.block_buffer[bbuff_cur as usize.. + bbuff_cur as usize + payload.len()] + .copy_from_slice(payload); bbuff_cur += d; rsize -= d; rec = &rec[d as usize..]; @@ -256,9 +303,12 @@ impl<F: WALStore> WALWriter<F> { bbuff_cur = self.block_size; } if bbuff_cur == self.block_size { - writes.push((self.state.next, - self.block_buffer[bbuff_start as usize..] - .to_vec().into_boxed_slice())); + writes.push(( + self.state.next, + self.block_buffer[bbuff_start as usize..] + .to_vec() + .into_boxed_slice(), + )); self.state.next += (self.block_size - bbuff_start) as u64; bbuff_start = 0; bbuff_cur = 0; @@ -266,9 +316,12 @@ impl<F: WALStore> WALWriter<F> { } } if bbuff_cur > bbuff_start { - writes.push((self.state.next, - self.block_buffer[bbuff_start as usize..bbuff_cur as usize] - .to_vec().into_boxed_slice())); + writes.push(( + self.state.next, + self.block_buffer[bbuff_start as usize..bbuff_cur as usize] + .to_vec() + .into_boxed_slice(), + )); self.state.next += (bbuff_cur - bbuff_start) as u64; } @@ -277,16 +330,20 @@ impl<F: WALStore> WALWriter<F> { /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are /// complete so that it could automatically remove obsolete WAL files. - pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) -> Result<(), ()> { + pub fn peel<T: AsRef<[WALRingId]>>( + &mut self, + records: T, + ) -> Result<(), ()> { let msize = self.msize as u64; let block_size = self.block_size as u64; for rec in records.as_ref() { self.io_complete.push(*rec); } let orig_fid = self.state.first_fid; - while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) { + while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) + { if s != self.next_complete { - break + break; } let mut m = self.io_complete.pop().unwrap(); let block_remain = block_size - (m.end & (block_size - 1)); @@ -309,7 +366,7 @@ pub struct WALLoader { block_nbit: u8, cache_size: usize, msize: usize, - filename_fmt: regex::Regex + filename_fmt: regex::Regex, } impl WALLoader { @@ -318,17 +375,30 @@ impl WALLoader { assert!(file_nbit > block_nbit); assert!(msize < 1 << block_nbit); let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap(); - WALLoader{ file_nbit, block_nbit, cache_size, msize, filename_fmt } + WALLoader { + file_nbit, + block_nbit, + cache_size, + msize, + filename_fmt, + } } /// Recover by reading the WAL log files. pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> { - let mut file_pool = WALFilePool::new(store, self.file_nbit, self.block_nbit, self.cache_size); + let mut file_pool = WALFilePool::new( + store, + self.file_nbit, + self.block_nbit, + self.cache_size, + ); let block_size = 1 << file_pool.block_nbit; - let msize = std::mem::size_of::<WALRingBlob>() as u32; - let mut logfiles: Vec<String> = file_pool.store + let msize = self.msize as u32; + let mut logfiles: Vec<String> = file_pool + .store .enumerate_files()? - .filter(|f| self.filename_fmt.is_match(f)).collect(); + .filter(|f| self.filename_fmt.is_match(f)) + .collect(); // TODO: check for missing logfiles logfiles.sort(); let mut chunks = None; @@ -340,7 +410,10 @@ impl WALLoader { let ringid_start = (fid << file_pool.file_nbit) + off; off += msize as u64; let header = unsafe { - std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; + std::mem::transmute::<*const u8, &WALRingBlob>( + header_raw.as_ptr(), + ) + }; let rsize = header.rsize; match header.rtype { WALRingType::Full => { @@ -351,26 +424,36 @@ impl WALLoader { payload, WALRingId { start: ringid_start, - end: (fid << file_pool.file_nbit) + off - })?; - }, + end: (fid << file_pool.file_nbit) + off, + }, + )?; + } WALRingType::First => { assert!(chunks.is_none()); - chunks = Some((vec![f.read(off, rsize as usize)?.ok_or(())?], ringid_start)); + chunks = Some(( + vec![f.read(off, rsize as usize)?.ok_or(())?], + ringid_start, + )); off += rsize as u64; - }, + } WALRingType::Middle => { if let Some((chunks, _)) = &mut chunks { - chunks.push(f.read(off, rsize as usize)?.ok_or(())?); + chunks + .push(f.read(off, rsize as usize)?.ok_or(())?); } // otherwise ignore the leftover off += rsize as u64; - }, + } WALRingType::Last => { - if let Some((mut chunks, ringid_start)) = chunks.take() { - chunks.push(f.read(off, rsize as usize)?.ok_or(())?); + if let Some((mut chunks, ringid_start)) = chunks.take() + { + chunks + .push(f.read(off, rsize as usize)?.ok_or(())?); off += rsize as u64; let mut payload = Vec::new(); - payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0); + payload.resize( + chunks.iter().fold(0, |acc, v| acc + v.len()), + 0, + ); let mut ps = &mut payload[..]; for c in chunks { ps[..c.len()].copy_from_slice(&*c); @@ -380,11 +463,15 @@ impl WALLoader { payload.into_boxed_slice(), WALRingId { start: ringid_start, - end: (fid << file_pool.file_nbit) + off - })?; - } // otherwise ignore the leftover - else { off += rsize as u64; } - }, + end: (fid << file_pool.file_nbit) + off, + }, + )?; + } + // otherwise ignore the leftover + else { + off += rsize as u64; + } + } WALRingType::Null => break, } let block_remain = block_size - (off & (block_size - 1)); @@ -396,10 +483,13 @@ impl WALLoader { file_pool.remove_file(fid)?; } file_pool.reset(); - Ok(WALWriter::new(WALState { - first_fid: 0, - next: 0, - file_nbit: file_pool.file_nbit, - }, file_pool)) + Ok(WALWriter::new( + WALState { + first_fid: 0, + next: 0, + file_nbit: file_pool.file_nbit, + }, + file_pool, + )) } } |