diff options
-rw-r--r-- | examples/demo1.rs | 6 | ||||
-rw-r--r-- | src/wal.rs | 66 | ||||
-rw-r--r-- | tests/common/mod.rs | 64 | ||||
-rw-r--r-- | tests/rand_fail.rs | 65 |
4 files changed, 128 insertions, 73 deletions
diff --git a/examples/demo1.rs b/examples/demo1.rs index 40f9562..48d7c2c 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -114,8 +114,10 @@ impl WALStore for WALStoreTest { Ok(logfiles.into_iter()) } - fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()> { - println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap()); + fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()> { + println!("apply_payload(payload={}, wal_off={})", + std::str::from_utf8(&payload).unwrap(), + wal_off); Ok(()) } } @@ -29,6 +29,7 @@ pub struct WALRingId { } impl WALRingId { + pub fn empty_id() -> Self { WALRingId { start: 0, end: 0 } } pub fn get_start(&self) -> WALPos { self.start } pub fn get_end(&self) -> WALPos { self.end } } @@ -83,7 +84,7 @@ pub trait WALStore { /// Apply the payload during recovery. An invocation of the callback waits the application for /// redoing the given operation to ensure its state is consistent. We assume the necessary /// changes by the payload has already been persistent when the callback returns. - fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()>; + fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()>; } /// The middle layer that manages WAL file handles and invokes public trait functions to actually @@ -204,12 +205,12 @@ impl<F: WALStore> WALWriter<F> { for _rec in records.as_ref() { let mut rec = &_rec[..]; let mut rsize = rec.len() as u32; - let mut started = false; + let mut ring_start = None; while rsize > 0 { let remain = self.block_size - bbuff_cur; if remain > msize { let d = remain - msize; - let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64; + let rs0 = self.state.next + (bbuff_cur - bbuff_start) as u64; let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>( (&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())}; bbuff_cur += msize; @@ -218,19 +219,26 @@ impl<F: WALStore> WALWriter<F> { let payload = rec; blob.crc32 = crc::crc32::checksum_ieee(payload); blob.rsize = rsize; - blob.rtype = if started {WALRingType::Last} else {WALRingType::Full}; + let (rs, rt) = if let Some(rs) = ring_start.take() { + (rs, WALRingType::Last) + } else { + (rs0, WALRingType::Full) + }; + blob.rtype = rt; &mut self.block_buffer[ bbuff_cur as usize.. bbuff_cur as usize + payload.len()].copy_from_slice(payload); bbuff_cur += rsize; rsize = 0; + let end = self.state.next + (bbuff_cur - bbuff_start) as u64; + res.push(WALRingId{start: rs, end }); } else { // the remaining block can only accommodate partial rec let payload = &rec[..d as usize]; blob.crc32 = crc::crc32::checksum_ieee(payload); blob.rsize = d; - blob.rtype = if started {WALRingType::Middle} else { - started = true; + blob.rtype = if ring_start.is_some() {WALRingType::Middle} else { + ring_start = Some(rs0); WALRingType::First }; &mut self.block_buffer[ @@ -240,8 +248,6 @@ impl<F: WALStore> WALWriter<F> { rsize -= d; rec = &rec[d as usize..]; } - let ring_end = self.state.next + (bbuff_cur - bbuff_start) as u64; - res.push(WALRingId{start: ring_start, end: ring_end}); } else { // add padding space by moving the point to the end of the block bbuff_cur = self.block_size; @@ -272,7 +278,7 @@ impl<F: WALStore> WALWriter<F> { let msize = std::mem::size_of::<WALRingBlob>() as u64; let block_size = self.block_size as u64; for rec in records.as_ref() { - self.io_complete.push(*rec) + self.io_complete.push(*rec); } let orig_fid = self.state.first_fid; while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) { @@ -307,6 +313,7 @@ impl<F: WALStore> WALLoader<F> { WALLoader{ file_pool, filename_fmt } } + /// Recover by reading the WAL log files. pub fn recover(mut self) -> Result<WALWriter<F>, ()> { let block_size = 1 << self.file_pool.block_nbit; let msize = std::mem::size_of::<WALRingBlob>() as u32; @@ -321,7 +328,12 @@ impl<F: WALStore> WALLoader<F> { let f = self.file_pool.get_file(fid, false)?; let mut off = 0; while let Some(header_raw) = f.read(off, msize as usize)? { - if block_size - (off & (block_size - 1)) <= msize as u64 { break } + let block_remain = block_size - (off & (block_size - 1)); + if block_remain <= msize as u64 { + off += block_remain; + continue + } + let ringid_start = (fid << self.file_pool.file_nbit) | off; off += msize as u64; let header = unsafe { std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; @@ -331,30 +343,36 @@ impl<F: WALStore> WALLoader<F> { assert!(chunks.is_none()); let payload = f.read(off, rsize as usize)?.ok_or(())?; off += rsize as u64; - self.file_pool.store.apply_payload(payload)?; + self.file_pool.store.apply_payload( + payload, + ringid_start)?; }, WALRingType::First => { assert!(chunks.is_none()); - chunks = Some(vec![f.read(off, rsize as usize)?.ok_or(())?]); + chunks = Some((vec![f.read(off, rsize as usize)?.ok_or(())?], ringid_start)); off += rsize as u64; }, WALRingType::Middle => { - chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?); + if let Some((chunks, _)) = &mut chunks { + chunks.push(f.read(off, rsize as usize)?.ok_or(())?); + } // otherwise ignore the leftover off += rsize as u64; }, WALRingType::Last => { - chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?); + if let Some((mut chunks, ringid_start)) = chunks.take() { + chunks.push(f.read(off, rsize as usize)?.ok_or(())?); + let mut payload = Vec::new(); + payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0); + let mut ps = &mut payload[..]; + for c in chunks { + ps[..c.len()].copy_from_slice(&*c); + ps = &mut ps[c.len()..]; + } + self.file_pool.store.apply_payload( + payload.into_boxed_slice(), + ringid_start)?; + } // otherwise ignore the leftover off += rsize as u64; - - let _chunks = chunks.take().unwrap(); - let mut payload = Vec::new(); - payload.resize(_chunks.iter().fold(0, |acc, v| acc + v.len()), 0); - let mut ps = &mut payload[..]; - for c in _chunks { - ps[..c.len()].copy_from_slice(&*c); - ps = &mut ps[c.len()..]; - } - self.file_pool.store.apply_payload(payload.into_boxed_slice())?; }, WALRingType::Null => break, } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 9572c5c..ce27e16 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -2,7 +2,7 @@ #[allow(dead_code)] extern crate growthring; -use growthring::wal::{WALFile, WALStore, WALPos, WALBytes}; +use growthring::wal::{WALFile, WALStore, WALPos, WALBytes, WALRingId}; use indexmap::{IndexMap, map::Entry}; use rand::Rng; use std::collections::{HashMap, hash_map}; @@ -122,6 +122,7 @@ impl<'a, G: 'static + FailGen> WALStore for WALStoreEmul<'a, G> { } fn remove_file(&mut self, filename: &str) -> Result<(), ()> { + println!("remove {}", filename); if self.fgen.next_fail() { return Err(()) } self.state.files.remove(filename).ok_or(()).and_then(|_| Ok(())) } @@ -135,9 +136,11 @@ impl<'a, G: 'static + FailGen> WALStore for WALStoreEmul<'a, G> { Ok(logfiles.into_iter()) } - fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()> { + fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()> { if self.fgen.next_fail() { return Err(()) } - println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap()); + println!("apply_payload(payload=0x{}, wal_off={})", + hex::encode(&payload), + wal_off); Ok(()) } } @@ -211,17 +214,13 @@ impl PaintStrokes { PaintStrokes(res) } - pub fn gen_rand<R: rand::Rng>(min_pos: u32, max_pos: u32, max_col: u32, n: usize, rng: &mut R) -> PaintStrokes { + pub fn gen_rand<R: rand::Rng>(max_pos: u32, max_len: u32, max_col: u32, n: usize, rng: &mut R) -> PaintStrokes { + assert!(max_pos > 0); let mut strokes = Self::new(); for _ in 0..n { - let mut s = 0; - let mut e = 0; - while s == e { - s = rng.gen_range(min_pos, max_pos); - e = rng.gen_range(min_pos, max_pos); - } - if s > e { std::mem::swap(&mut s, &mut e) } - strokes.stroke(s, e, rng.gen_range(0, max_col)) + let pos = rng.gen_range(0, max_pos); + let len = rng.gen_range(1, max_pos - pos + 1); + strokes.stroke(pos, pos + len, rng.gen_range(0, max_col)) } strokes } @@ -249,8 +248,8 @@ fn test_paint_strokes() { } pub struct Canvas { - waiting: HashMap<WALPos, usize>, - queue: IndexMap<u32, VecDeque<(u32, WALPos)>>, + waiting: HashMap<WALRingId, usize>, + queue: IndexMap<u32, VecDeque<(u32, WALRingId)>>, canvas: Box<[u32]> } @@ -267,36 +266,36 @@ impl Canvas { } } - fn get_waiting(&mut self, sid: WALPos) -> &mut usize { - match self.waiting.entry(sid) { + fn get_waiting(&mut self, rid: WALRingId) -> &mut usize { + match self.waiting.entry(rid) { hash_map::Entry::Occupied(e) => e.into_mut(), hash_map::Entry::Vacant(e) => e.insert(0) } } - fn get_queued(&mut self, pos: u32) -> &mut VecDeque<(u32, WALPos)> { + fn get_queued(&mut self, pos: u32) -> &mut VecDeque<(u32, WALRingId)> { match self.queue.entry(pos) { Entry::Occupied(e) => e.into_mut(), Entry::Vacant(e) => e.insert(VecDeque::new()) } } - pub fn prepaint(&mut self, strokes: &PaintStrokes, sid: &WALPos) { - let sid = *sid; + pub fn prepaint(&mut self, strokes: &PaintStrokes, rid: &WALRingId) { + let rid = *rid; let mut nwait = 0; for (s, e, c) in strokes.0.iter() { for i in *s..*e { nwait += 1; - self.get_queued(i).push_back((*c, sid)) + self.get_queued(i).push_back((*c, rid)) } } - *self.get_waiting(sid) += nwait + *self.get_waiting(rid) = nwait } // TODO: allow customized scheduler /// Schedule to paint one position, randomly. It optionally returns a finished batch write /// identified by its start position of WALRingId. - pub fn rand_paint<R: rand::Rng>(&mut self, rng: &mut R) -> Option<(Option<WALPos>, u32)> { + pub fn rand_paint<R: rand::Rng>(&mut self, rng: &mut R) -> Option<(Option<WALRingId>, u32)> { if self.queue.is_empty() { return None } let idx = rng.gen_range(0, self.queue.len()); let (pos, _) = self.queue.get_index_mut(idx).unwrap(); @@ -304,15 +303,15 @@ impl Canvas { Some((self.paint(pos), pos)) } - pub fn paint(&mut self, pos: u32) -> Option<WALPos> { + pub fn paint(&mut self, pos: u32) -> Option<WALRingId> { let q = self.queue.get_mut(&pos).unwrap(); - let (c, sid) = q.pop_front().unwrap(); + let (c, rid) = q.pop_front().unwrap(); if q.is_empty() { self.queue.remove(&pos); } self.canvas[pos as usize] = c; - let cnt = self.waiting.get_mut(&sid).unwrap(); + let cnt = self.waiting.get_mut(&rid).unwrap(); *cnt -= 1; if *cnt == 0 { - Some(sid) + Some(rid) } else { None } } @@ -335,17 +334,18 @@ fn test_canvas() { let mut canvas1 = Canvas::new(100); let mut canvas2 = Canvas::new(100); let canvas3 = Canvas::new(101); + let dummy = WALRingId::empty_id(); let (s1, s2) = RNG.with(|rng| { let rng = &mut *rng.borrow_mut(); - (PaintStrokes::gen_rand(0, 100, 256, 2, rng), - PaintStrokes::gen_rand(0, 100, 256, 2, rng)) + (PaintStrokes::gen_rand(100, 10, 256, 2, rng), + PaintStrokes::gen_rand(100, 10, 256, 2, rng)) }); assert!(canvas1.is_same(&canvas2)); assert!(!canvas2.is_same(&canvas3)); - canvas1.prepaint(&s1, &0); - canvas1.prepaint(&s2, &0); - canvas2.prepaint(&s1, &0); - canvas2.prepaint(&s2, &0); + canvas1.prepaint(&s1, &dummy); + canvas1.prepaint(&s2, &dummy); + canvas2.prepaint(&s1, &dummy); + canvas2.prepaint(&s2, &dummy); assert!(canvas1.is_same(&canvas2)); RNG.with(|rng| canvas1.rand_paint(&mut *rng.borrow_mut())); assert!(!canvas1.is_same(&canvas2)); diff --git a/tests/rand_fail.rs b/tests/rand_fail.rs index b4e30ac..0cd519c 100644 --- a/tests/rand_fail.rs +++ b/tests/rand_fail.rs @@ -1,26 +1,61 @@ #[cfg(test)] -extern crate growthring; -use growthring::wal::{WALLoader, WALWriter, WALStore, WALRingId, WALBytes}; - mod common; +use growthring::wal::{WALLoader, WALWriter, WALStore, WALRingId, WALBytes, WALPos}; +use common::{FailGen, SingleFailGen, Canvas, WALStoreEmulState, WALStoreEmul, PaintStrokes}; -fn test<S: WALStore>(records: Vec<String>, wal: &mut WALWriter<S>) -> Box<[WALRingId]> { - let records: Vec<WALBytes> = records.into_iter().map(|s| s.into_bytes().into_boxed_slice()).collect(); - let ret = wal.grow(&records).unwrap(); - for ring_id in ret.iter() { - println!("got ring id: {:?}", ring_id); +fn run<F: WALStore, R: rand::Rng>(n: usize, m: usize, k: usize, + canvas: &mut Canvas, wal: &mut WALWriter<F>, trace: &mut Vec<u32>, + rng: &mut R) -> Result<(), ()> { + for i in 0..n { + let s = (0..m).map(|_| + PaintStrokes::gen_rand(1000, 10, 256, 5, rng)).collect::<Vec<PaintStrokes>>(); + let recs = s.iter().map(|e| e.to_bytes()).collect::<Vec<WALBytes>>(); + // write ahead + let rids = wal.grow(recs)?; + for rid in rids.iter() { + println!("got ring id: {:?}", rid); + } + // WAL append done + // prepare data writes + for (e, rid) in s.iter().zip(rids.iter()) { + canvas.prepaint(e, &*rid) + } + // run the scheduler for a bit + for _ in 0..k { + if let Some((fin_rid, t)) = canvas.rand_paint(rng) { + if let Some(rid) = fin_rid { + wal.peel(&[rid])? + } + trace.push(t); + } else { break } + } + } + while let Some((fin_rid, t)) = canvas.rand_paint(rng) { + if let Some(rid) = fin_rid { + wal.peel(&[rid])? + } + trace.push(t); } - ret + canvas.print(40); + Ok(()) +} + +fn check<F: WALStore>(canvas: &mut Canvas, wal: &mut WALLoader<F>, trace: &Vec<u8>) -> bool { + true } #[test] fn test_rand_fail() { - let fgen = common::SingleFailGen::new(100); - let mut state = common::WALStoreEmulState::new(); - let mut wal = WALLoader::new(common::WALStoreEmul::new(&mut state, fgen), 9, 8, 1000).recover().unwrap(); - for _ in 0..3 { - test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::<Vec<String>>(), &mut wal); - } + let fgen = SingleFailGen::new(100000); + let n = 100; + let m = 10; + let k = 100; + let mut rng = rand::thread_rng(); + let mut state = WALStoreEmulState::new(); + let mut wal = WALLoader::new(WALStoreEmul::new(&mut state, fgen), 9, 8, 1000).recover().unwrap(); + let mut trace: Vec<u32> = Vec::new(); + let mut canvas = Canvas::new(1000); + run(n, m, k, &mut canvas, &mut wal, &mut trace, &mut rng).unwrap(); WALLoader::new(common::WALStoreEmul::new(&mut state, common::ZeroFailGen), 9, 8, 1000).recover().unwrap(); } |