diff options
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | examples/demo1.rs | 11 | ||||
-rw-r--r-- | src/lib.rs | 10 | ||||
-rw-r--r-- | src/wal.rs | 109 | ||||
-rw-r--r-- | tests/common/mod.rs | 15 |
5 files changed, 106 insertions, 41 deletions
@@ -23,8 +23,6 @@ libc = "0.2.71" [dev-dependencies] hex = "0.4.2" -libc = "0.2.44" -nix = "0.17.0" rand = "0.7.3" indexmap = "1.4.0" diff --git a/examples/demo1.rs b/examples/demo1.rs index 4a923c6..e214177 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -1,5 +1,5 @@ use growthring::{ - wal::{WALBytes, WALLoader, WALRingId, WALWriter}, + wal::{WALBytes, WALRingId, WALLoader, WALWriter}, WALStoreAIO, }; use rand::{seq::SliceRandom, Rng}; @@ -29,8 +29,11 @@ fn recover(payload: WALBytes, ringid: WALRingId) -> Result<(), ()> { fn main() { let wal_dir = "./wal_demo1"; let mut rng = rand::thread_rng(); + let mut loader = WALLoader::new(); + loader.file_nbit(9).block_nbit(8); + let store = WALStoreAIO::new(&wal_dir, true, recover); - let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); + let mut wal = loader.load(store).unwrap(); for _ in 0..3 { test( ["hi", "hello", "lol"] @@ -48,7 +51,7 @@ fn main() { } let store = WALStoreAIO::new(&wal_dir, false, recover); - let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); + let mut wal = loader.load(store).unwrap(); for _ in 0..3 { test( vec![ @@ -62,7 +65,7 @@ fn main() { } let store = WALStoreAIO::new(&wal_dir, false, recover); - let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); + let mut wal = loader.load(store).unwrap(); for _ in 0..3 { let mut ids = Vec::new(); for _ in 0..3 { @@ -5,9 +5,13 @@ //! ``` //! use growthring::{WALStoreAIO, wal::WALLoader}; //! use futures::executor::block_on; +//! let mut loader = WALLoader::new(); +//! loader.file_nbit(9).block_nbit(8); +//! +//! //! // Start with empty WAL (truncate = true). //! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}); -//! let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); +//! let mut wal = loader.load(store).unwrap(); //! // Write a vector of records to WAL. //! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() { //! let ring_id = block_on(f).unwrap().1; @@ -23,7 +27,7 @@ //! ringid); //! Ok(()) //! }); -//! let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); +//! let mut wal = loader.load(store).unwrap(); //! // We saw some log playback, even there is no failure. //! // Let's try to grow the WAL to create many files. //! let ring_ids = wal.grow((0..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>()) @@ -37,7 +41,7 @@ //! println!("payload.len() = {}", payload.len()); //! Ok(()) //! }); -//! let wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); +//! let wal = loader.load(store).unwrap(); //! // After each recovery, the ./walfiles is empty. //! ``` @@ -1,5 +1,6 @@ use async_trait::async_trait; use futures::future::{self, FutureExt, TryFutureExt}; +use futures::executor::block_on; use std::cell::{RefCell, UnsafeCell}; use std::collections::{BinaryHeap, HashMap, hash_map}; use std::future::Future; @@ -558,31 +559,68 @@ impl<F: WALStore> WALWriter<F> { pub fn file_pool_in_use(&self) -> usize { self.file_pool.in_use_len() } } +#[derive(Copy, Clone)] +pub enum RecoverPolicy { + /// all checksums must be correct, otherwise recovery fails + Strict, + /// stop recovering when hitting the first corrupted record + BestEffort +} + pub struct WALLoader { file_nbit: u8, block_nbit: u8, cache_size: usize, - msize: usize, - filename_fmt: regex::Regex, + recover_policy: RecoverPolicy, } impl WALLoader { - pub fn new(file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { - let msize = std::mem::size_of::<WALRingBlob>(); - assert!(file_nbit > block_nbit); - assert!(msize < 1 << block_nbit); - let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap(); + pub fn new() -> Self { WALLoader { - file_nbit, - block_nbit, - cache_size, - msize, - filename_fmt, + file_nbit: 22, // 4MB + block_nbit: 15, // 32KB, + cache_size: 16, + recover_policy: RecoverPolicy::Strict + } + } + + pub fn file_nbit(&mut self, v: u8) -> &mut Self { + self.file_nbit = v; + self + } + + pub fn block_nbit(&mut self, v: u8) -> &mut Self { + self.block_nbit = v; + self + } + + pub fn cache_size(&mut self, v: usize) -> &mut Self { + self.cache_size = v; + self + } + + pub fn recover_policy(&mut self, p: RecoverPolicy) -> &mut Self { + self.recover_policy = p; + self + } + + fn verify_checksum(&self, data: &[u8], checksum: u32) -> Result<bool, ()> { + if checksum == crc::crc32::checksum_ieee(data) { + Ok(true) + } else { + match self.recover_policy { + RecoverPolicy::Strict => Err(()), + RecoverPolicy::BestEffort => Ok(false), + } } } /// Recover by reading the WAL files. - pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> { + pub fn load<F: WALStore>(&self, store: F) -> Result<WALWriter<F>, ()> { + let msize = std::mem::size_of::<WALRingBlob>(); + assert!(self.file_nbit > self.block_nbit); + assert!(msize < 1 << self.block_nbit); + let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap(); let mut file_pool = WALFilePool::new( store, self.file_nbit, @@ -590,20 +628,24 @@ impl WALLoader { self.cache_size, ); let block_size = 1 << file_pool.block_nbit; - let msize = self.msize as u32; let mut logfiles: Vec<String> = file_pool .store .enumerate_files()? - .filter(|f| self.filename_fmt.is_match(f)) + .filter(|f| filename_fmt.is_match(f)) .collect(); // TODO: check for missing logfiles logfiles.sort(); let mut chunks = None; + let mut skip = false; for fname in logfiles.into_iter() { let fid = file_pool.get_fid(&fname); - let f = - futures::executor::block_on(file_pool.get_file(fid, false))?; + let f = block_on(file_pool.get_file(fid, false))?; let mut off = 0; + if skip { + f.truncate(0)?; + block_on(file_pool.store.remove_file(fname))?; + continue + } while let Some(header_raw) = f.read(off, msize as usize)? { let ringid_start = (fid << file_pool.file_nbit) + off; off += msize as u64; @@ -617,6 +659,11 @@ impl WALLoader { WALRingType::Full => { assert!(chunks.is_none()); let payload = f.read(off, rsize as usize)?.ok_or(())?; + // TODO: improve the behavior when CRC32 fails + if !self.verify_checksum(&payload, header.crc32)? { + skip = true; + break + } off += rsize as u64; file_pool.store.apply_payload( payload, @@ -628,25 +675,35 @@ impl WALLoader { } WALRingType::First => { assert!(chunks.is_none()); - chunks = Some(( - vec![f.read(off, rsize as usize)?.ok_or(())?], - ringid_start, - )); + let chunk = f.read(off, rsize as usize)?.ok_or(())?; + if !self.verify_checksum(&chunk, header.crc32)? { + skip = true; + break + } + chunks = Some((vec![chunk], ringid_start)); off += rsize as u64; } WALRingType::Middle => { if let Some((chunks, _)) = &mut chunks { - chunks - .push(f.read(off, rsize as usize)?.ok_or(())?); + let chunk = f.read(off, rsize as usize)?.ok_or(())?; + if !self.verify_checksum(&chunk, header.crc32)? { + skip = true; + break + } + chunks.push(chunk); } // otherwise ignore the leftover off += rsize as u64; } WALRingType::Last => { if let Some((mut chunks, ringid_start)) = chunks.take() { - chunks - .push(f.read(off, rsize as usize)?.ok_or(())?); + let chunk = f.read(off, rsize as usize)?.ok_or(())?; off += rsize as u64; + if !self.verify_checksum(&chunk, header.crc32)? { + skip = true; + break + } + chunks.push(chunk); let mut payload = Vec::new(); payload.resize( chunks.iter().fold(0, |acc, v| acc + v.len()), @@ -678,7 +735,7 @@ impl WALLoader { } } f.truncate(0)?; - futures::executor::block_on(file_pool.store.remove_file(fname))?; + block_on(file_pool.store.remove_file(fname))?; } file_pool.reset(); Ok(WALWriter::new( diff --git a/tests/common/mod.rs b/tests/common/mod.rs index b61233b..247b2ee 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -3,7 +3,7 @@ #[allow(dead_code)] use async_trait::async_trait; use growthring::wal::{ - WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, + WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, RecoverPolicy }; use indexmap::{map::Entry, IndexMap}; use rand::Rng; @@ -531,15 +531,14 @@ impl PaintingSim { &self, state: &mut WALStoreEmulState, canvas: &mut Canvas, - wal: WALLoader, + loader: WALLoader, ops: &mut Vec<PaintStrokes>, ringid_map: &mut HashMap<WALRingId, usize>, fgen: Rc<G>, ) -> Result<(), ()> { let mut rng = <rand::rngs::StdRng as rand::SeedableRng>::seed_from_u64(self.seed); - let mut wal = - wal.recover(WALStoreEmul::new(state, fgen.clone(), |_, _| {}))?; + let mut wal = loader.load(WALStoreEmul::new(state, fgen.clone(), |_, _| {}))?; for _ in 0..self.n { let pss = (0..self.m) .map(|_| { @@ -597,7 +596,11 @@ impl PaintingSim { } pub fn get_walloader(&self) -> WALLoader { - WALLoader::new(self.file_nbit, self.block_nbit, self.file_cache) + let mut loader = WALLoader::new(); + loader.file_nbit(self.file_nbit) + .block_nbit(self.block_nbit) + .cache_size(self.file_cache); + loader } pub fn get_nticks(&self, state: &mut WALStoreEmulState) -> usize { @@ -631,7 +634,7 @@ impl PaintingSim { let mut last_idx = 0; let mut napplied = 0; canvas.clear_queued(); - wal.recover(WALStoreEmul::new( + wal.load(WALStoreEmul::new( state, Rc::new(ZeroFailGen), |payload, ringid| { |