diff options
-rw-r--r-- | examples/demo1.rs | 12 | ||||
-rw-r--r-- | src/wal.rs | 53 | ||||
-rw-r--r-- | tests/common/mod.rs | 44 | ||||
-rw-r--r-- | tests/rand_fail.rs | 58 |
4 files changed, 114 insertions, 53 deletions
diff --git a/examples/demo1.rs b/examples/demo1.rs index 48d7c2c..5ee2b64 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -114,10 +114,10 @@ impl WALStore for WALStoreTest { Ok(logfiles.into_iter()) } - fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()> { - println!("apply_payload(payload={}, wal_off={})", + fn apply_payload(&mut self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()> { + println!("apply_payload(payload={}, ringid={:?})", std::str::from_utf8(&payload).unwrap(), - wal_off); + ringid); Ok(()) } } @@ -134,7 +134,7 @@ fn test(records: Vec<String>, wal: &mut WALWriter<WALStoreTest>) -> Box<[WALRing fn main() { let mut rng = rand::thread_rng(); let store = WALStoreTest::new("./wal_demo1", true); - let mut wal = WALLoader::new(store, 9, 8, 1000).recover().unwrap(); + let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::<Vec<String>>(), &mut wal); } @@ -143,13 +143,13 @@ fn main() { } let store = WALStoreTest::new("./wal_demo1", false); - let mut wal = WALLoader::new(store, 9, 8, 1000).recover().unwrap(); + let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { test(vec!["a".repeat(10), "b".repeat(100), "c".repeat(300), "d".repeat(400)], &mut wal); } let store = WALStoreTest::new("./wal_demo1", false); - let mut wal = WALLoader::new(store, 9, 8, 1000).recover().unwrap(); + let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { let mut ids = Vec::new(); for _ in 0..3 { @@ -84,7 +84,7 @@ pub trait WALStore { /// Apply the payload during recovery. An invocation of the callback waits the application for /// redoing the given operation to ensure its state is consistent. We assume the necessary /// changes by the payload has already been persistent when the callback returns. - fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()>; + fn apply_payload(&mut self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()>; } /// The middle layer that manages WAL file handles and invokes public trait functions to actually @@ -301,31 +301,33 @@ impl<F: WALStore> WALWriter<F> { } } -pub struct WALLoader<F: WALStore> { - file_pool: WALFilePool<F>, +pub struct WALLoader { + file_nbit: u8, + block_nbit: u8, + cache_size: usize, filename_fmt: regex::Regex } -impl<F: WALStore> WALLoader<F> { - pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { - let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size); +impl WALLoader { + pub fn new(file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap(); - WALLoader{ file_pool, filename_fmt } + WALLoader{ file_nbit, block_nbit, cache_size, filename_fmt } } /// Recover by reading the WAL log files. - pub fn recover(mut self) -> Result<WALWriter<F>, ()> { - let block_size = 1 << self.file_pool.block_nbit; + pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> { + let mut file_pool = WALFilePool::new(store, self.file_nbit, self.block_nbit, self.cache_size); + let block_size = 1 << file_pool.block_nbit; let msize = std::mem::size_of::<WALRingBlob>() as u32; - let mut logfiles: Vec<String> = self.file_pool.store + let mut logfiles: Vec<String> = file_pool.store .enumerate_files()? .filter(|f| self.filename_fmt.is_match(f)).collect(); // TODO: check for missing logfiles logfiles.sort(); let mut chunks = None; for fname in logfiles.iter() { - let fid = self.file_pool.get_fid(fname); - let f = self.file_pool.get_file(fid, false)?; + let fid = file_pool.get_fid(fname); + let f = file_pool.get_file(fid, false)?; let mut off = 0; while let Some(header_raw) = f.read(off, msize as usize)? { let block_remain = block_size - (off & (block_size - 1)); @@ -333,7 +335,7 @@ impl<F: WALStore> WALLoader<F> { off += block_remain; continue } - let ringid_start = (fid << self.file_pool.file_nbit) | off; + let ringid_start = (fid << file_pool.file_nbit) | off; off += msize as u64; let header = unsafe { std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; @@ -343,9 +345,12 @@ impl<F: WALStore> WALLoader<F> { assert!(chunks.is_none()); let payload = f.read(off, rsize as usize)?.ok_or(())?; off += rsize as u64; - self.file_pool.store.apply_payload( + file_pool.store.apply_payload( payload, - ringid_start)?; + WALRingId { + start: ringid_start, + end: (fid << file_pool.file_nbit) | off + })?; }, WALRingType::First => { assert!(chunks.is_none()); @@ -361,6 +366,7 @@ impl<F: WALStore> WALLoader<F> { WALRingType::Last => { if let Some((mut chunks, ringid_start)) = chunks.take() { chunks.push(f.read(off, rsize as usize)?.ok_or(())?); + off += rsize as u64; let mut payload = Vec::new(); payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0); let mut ps = &mut payload[..]; @@ -368,23 +374,26 @@ impl<F: WALStore> WALLoader<F> { ps[..c.len()].copy_from_slice(&*c); ps = &mut ps[c.len()..]; } - self.file_pool.store.apply_payload( + file_pool.store.apply_payload( payload.into_boxed_slice(), - ringid_start)?; + WALRingId { + start: ringid_start, + end: (fid << file_pool.file_nbit) | off + })?; } // otherwise ignore the leftover - off += rsize as u64; + else { off += rsize as u64; } }, WALRingType::Null => break, } } f.truncate(0)?; - self.file_pool.remove_file(fid)?; + file_pool.remove_file(fid)?; } - self.file_pool.reset(); + file_pool.reset(); Ok(WALWriter::new(WALState { first_fid: 0, next: 0, - file_nbit: self.file_pool.file_nbit, - }, self.file_pool)) + file_nbit: file_pool.file_nbit, + }, file_pool)) } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a8dab51..43b717a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -92,13 +92,13 @@ impl WALStoreEmulState { pub struct WALStoreEmul<'a, G, F> where G: FailGen, - F: Fn(WALBytes, WALPos) { + F: FnMut(WALBytes, WALRingId) { state: &'a mut WALStoreEmulState, fgen: Rc<G>, recover: F } -impl<'a, G: FailGen, F: Fn(WALBytes, WALPos)> WALStoreEmul<'a, G, F> { +impl<'a, G: FailGen, F: FnMut(WALBytes, WALRingId)> WALStoreEmul<'a, G, F> { pub fn new(state: &'a mut WALStoreEmulState, fail_gen: G, recover: F) -> Self { WALStoreEmul { @@ -111,7 +111,7 @@ impl<'a, G: FailGen, F: Fn(WALBytes, WALPos)> WALStoreEmul<'a, G, F> { impl<'a, G, F> WALStore for WALStoreEmul<'a, G, F> where - G: 'static + FailGen, F: Fn(WALBytes, WALPos) { + G: 'static + FailGen, F: FnMut(WALBytes, WALRingId) { type FileNameIter = std::vec::IntoIter<String>; fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()> { @@ -147,12 +147,12 @@ where Ok(logfiles.into_iter()) } - fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()> { + fn apply_payload(&mut self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()> { if self.fgen.next_fail() { return Err(()) } - println!("apply_payload(payload=0x{}, wal_off={})", + println!("apply_payload(payload=0x{}, ringid={:?})", hex::encode(&payload), - wal_off); - (self.recover)(payload, wal_off); + ringid); + (self.recover)(payload, ringid); Ok(()) } } @@ -278,6 +278,18 @@ impl Canvas { } } + pub fn new_reference(&self, ops: &[PaintStrokes]) -> Self { + let mut res = Self::new(self.canvas.len()); + for op in ops { + for (s, e, c) in op.0.iter() { + for i in *s..*e { + res.canvas[i as usize] = *c + } + } + } + res + } + fn get_waiting(&mut self, rid: WALRingId) -> &mut usize { match self.waiting.entry(rid) { hash_map::Entry::Occupied(e) => e.into_mut(), @@ -308,13 +320,27 @@ impl Canvas { /// Schedule to paint one position, randomly. It optionally returns a finished batch write /// identified by its start position of WALRingId. pub fn rand_paint<R: rand::Rng>(&mut self, rng: &mut R) -> Option<(Option<WALRingId>, u32)> { - if self.queue.is_empty() { return None } + if self.is_empty() { return None } let idx = rng.gen_range(0, self.queue.len()); let (pos, _) = self.queue.get_index_mut(idx).unwrap(); let pos = *pos; Some((self.paint(pos), pos)) } + pub fn clear_queued(&mut self) { + self.queue.clear(); + self.waiting.clear(); + } + + pub fn paint_all(&mut self) { + for (k, q) in self.queue.iter() { + self.canvas[*k as usize] = q.back().unwrap().0; + } + self.clear_queued() + } + + pub fn is_empty(&self) -> bool { self.queue.is_empty() } + pub fn paint(&mut self, pos: u32) -> Option<WALRingId> { let q = self.queue.get_mut(&pos).unwrap(); let (c, rid) = q.pop_front().unwrap(); @@ -332,12 +358,14 @@ impl Canvas { } pub fn print(&self, max_col: usize) { + println!("# begin canvas"); for r in self.canvas.chunks(max_col) { for c in r.iter() { print!("{:02x} ", c & 0xff); } println!(""); } + println!("# end canvas"); } } diff --git a/tests/rand_fail.rs b/tests/rand_fail.rs index 6c2c3c1..c630c07 100644 --- a/tests/rand_fail.rs +++ b/tests/rand_fail.rs @@ -3,11 +3,14 @@ mod common; use growthring::wal::{WALLoader, WALWriter, WALStore, WALRingId, WALBytes, WALPos}; use common::{FailGen, SingleFailGen, Canvas, WALStoreEmulState, WALStoreEmul, PaintStrokes}; +use std::collections::HashMap; -fn run<F: WALStore, R: rand::Rng>(n: usize, m: usize, k: usize, - canvas: &mut Canvas, wal: &mut WALWriter<F>, trace: &mut Vec<u32>, - rng: &mut R) -> Result<(), ()> { - for i in 0..n { +fn run<G: 'static + FailGen, R: rand::Rng>(n: usize, m: usize, k: usize, + state: &mut WALStoreEmulState, canvas: &mut Canvas, wal: WALLoader, + ops: &mut Vec<PaintStrokes>, ringid_map: &mut HashMap<WALRingId, usize>, + fgen: G, rng: &mut R) -> Result<(), ()> { + let mut wal = wal.recover(WALStoreEmul::new(state, fgen, |_, _|{})).unwrap(); + for _ in 0..n { let s = (0..m).map(|_| PaintStrokes::gen_rand(1000, 10, 256, 5, rng)).collect::<Vec<PaintStrokes>>(); let recs = s.iter().map(|e| e.to_bytes()).collect::<Vec<WALBytes>>(); @@ -18,8 +21,10 @@ fn run<F: WALStore, R: rand::Rng>(n: usize, m: usize, k: usize, } // WAL append done // prepare data writes - for (e, rid) in s.iter().zip(rids.iter()) { - canvas.prepaint(e, &*rid) + for (e, rid) in s.into_iter().zip(rids.iter()) { + canvas.prepaint(&e, &*rid); + ops.push(e); + ringid_map.insert(*rid, ops.len() - 1); } // run the scheduler for a bit for _ in 0..k { @@ -27,7 +32,7 @@ fn run<F: WALStore, R: rand::Rng>(n: usize, m: usize, k: usize, if let Some(rid) = fin_rid { wal.peel(&[rid])? } - trace.push(t); + //trace.push(t); } else { break } } } @@ -35,28 +40,47 @@ fn run<F: WALStore, R: rand::Rng>(n: usize, m: usize, k: usize, if let Some(rid) = fin_rid { wal.peel(&[rid])? } - trace.push(t); + //trace.push(t); } canvas.print(40); Ok(()) } -fn check<F: WALStore>(canvas: &mut Canvas, wal: &mut WALLoader<F>, trace: &Vec<u32>) -> bool { - true +fn check(state: &mut WALStoreEmulState, canvas: &mut Canvas, + wal: WALLoader, + ops: &Vec<PaintStrokes>, ringid_map: &HashMap<WALRingId, usize>) -> bool { + let mut last_idx = ops.len() - 1; + canvas.clear_queued(); + wal.recover(WALStoreEmul::new(state, common::ZeroFailGen, |payload, ringid| { + let s = PaintStrokes::from_bytes(&payload); + canvas.prepaint(&s, &ringid); + last_idx = *ringid_map.get(&ringid).unwrap(); + })).unwrap(); + println!("last = {}/{}", last_idx, ops.len() - 1); + canvas.paint_all(); + // recover complete + let canvas0 = canvas.new_reference(&ops[..last_idx + 1]); + let res = canvas.is_same(&canvas0); + if !res { + canvas.print(40); + canvas0.print(40); + } + res } #[test] fn test_rand_fail() { - let fgen = SingleFailGen::new(100); + let fgen = SingleFailGen::new(105); let n = 100; let m = 10; let k = 100; - let mut rng = rand::thread_rng(); + let mut rng = <rand::rngs::StdRng as rand::SeedableRng>::from_seed([0; 32]); //rand::thread_rng(); let mut state = WALStoreEmulState::new(); - let mut wal = WALLoader::new(WALStoreEmul::new(&mut state, fgen, |_, _|{}), 9, 8, 1000).recover().unwrap(); - let mut trace: Vec<u32> = Vec::new(); + let wal = WALLoader::new(9, 8, 1000); + let mut ops: Vec<PaintStrokes> = Vec::new(); + let mut ringid_map = HashMap::new(); let mut canvas = Canvas::new(1000); - run(n, m, k, &mut canvas, &mut wal, &mut trace, &mut rng); //.unwrap(); - let mut wal = WALLoader::new(WALStoreEmul::new(&mut state, common::ZeroFailGen, |payload, wal_off|{}), 9, 8, 1000); - assert!(check(&mut canvas, &mut wal, &trace)); + run(n, m, k, &mut state, &mut canvas, wal, &mut ops, &mut ringid_map, fgen, &mut rng); + let wal = WALLoader::new(9, 8, 1000); + assert!(check(&mut state, &mut canvas, wal, &ops, &ringid_map)); } |