diff options
-rw-r--r-- | Cargo.lock | 175 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/wal.rs | 30 | ||||
-rw-r--r-- | tests/common/mod.rs | 137 | ||||
-rw-r--r-- | tests/rand_fail.rs | 115 |
5 files changed, 346 insertions, 112 deletions
@@ -84,6 +84,101 @@ dependencies = [ ] [[package]] +name = "futures" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399" + +[[package]] +name = "futures-executor" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789" + +[[package]] +name = "futures-macro" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc" + +[[package]] +name = "futures-task" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" +dependencies = [ + "once_cell", +] + +[[package]] +name = "futures-util" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + +[[package]] name = "getrandom" version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -99,6 +194,7 @@ name = "growth-ring" version = "0.1.0" dependencies = [ "crc", + "futures", "hex", "indexmap", "libc", @@ -175,6 +271,38 @@ dependencies = [ ] [[package]] +name = "once_cell" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" + +[[package]] +name = "pin-project" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e75373ff9037d112bb19bc61333a06a159eaeb217660dcfbea7d88e1db823919" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10b4b44893d3c370407a1d6a5cfde7c41ae0478e31c516c85f67eb3adc51be6d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] name = "ppv-lite86" version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -187,6 +315,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4" [[package]] +name = "proc-macro-nested" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0afe1bd463b9e9ed51d0e0f0b50b6b146aec855c56fd182bb242388710a9b6de" + +[[package]] +name = "proc-macro2" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quote" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" +dependencies = [ + "proc-macro2", +] + +[[package]] name = "rand" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -255,6 +407,23 @@ dependencies = [ ] [[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + +[[package]] +name = "syn" +version = "1.0.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5304cfdf27365b7585c25d4af91b35016ed21ef88f17ced89c7093b43dba8b6" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] name = "thread_local" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -264,6 +433,12 @@ dependencies = [ ] [[package]] +name = "unicode-xid" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c" + +[[package]] name = "void" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -18,6 +18,7 @@ libc = "0.2.44" nix = "0.17.0" rand = "0.7.3" indexmap = "1.4.0" +futures = "0.3" [lib] name = "growthring" @@ -171,13 +171,15 @@ pub struct WALWriter<F: WALStore> { block_buffer: WALBytes, block_size: u32, next_complete: WALPos, - io_complete: BinaryHeap<WALRingId> + io_complete: BinaryHeap<WALRingId>, + msize: usize } impl<F: WALStore> WALWriter<F> { fn new(state: WALState, file_pool: WALFilePool<F>) -> Self { let mut b = Vec::new(); let block_size = 1 << file_pool.block_nbit as u32; + let msize = std::mem::size_of::<WALRingBlob>(); b.resize(block_size as usize, 0); WALWriter{ state, @@ -186,6 +188,7 @@ impl<F: WALStore> WALWriter<F> { block_size, next_complete: 0, io_complete: BinaryHeap::new(), + msize } } @@ -195,7 +198,7 @@ impl<F: WALStore> WALWriter<F> { pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> (Box<[WALRingId]>, Result<(), ()>) { let mut res = Vec::new(); let mut writes = Vec::new(); - let msize = std::mem::size_of::<WALRingBlob>() as u32; + let msize = self.msize as u32; // the global offest of the begining of the block // the start of the unwritten data let mut bbuff_start = self.state.next as u32 & (self.block_size - 1); @@ -275,7 +278,7 @@ impl<F: WALStore> WALWriter<F> { /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are /// complete so that it could automatically remove obsolete WAL files. pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) -> Result<(), ()> { - let msize = std::mem::size_of::<WALRingBlob>() as u64; + let msize = self.msize as u64; let block_size = self.block_size as u64; for rec in records.as_ref() { self.io_complete.push(*rec); @@ -305,13 +308,17 @@ pub struct WALLoader { file_nbit: u8, block_nbit: u8, cache_size: usize, + msize: usize, filename_fmt: regex::Regex } impl WALLoader { pub fn new(file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { + let msize = std::mem::size_of::<WALRingBlob>(); + assert!(file_nbit > block_nbit); + assert!(msize < 1 << block_nbit); let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap(); - WALLoader{ file_nbit, block_nbit, cache_size, filename_fmt } + WALLoader{ file_nbit, block_nbit, cache_size, msize, filename_fmt } } /// Recover by reading the WAL log files. @@ -330,12 +337,7 @@ impl WALLoader { let f = file_pool.get_file(fid, false)?; let mut off = 0; while let Some(header_raw) = f.read(off, msize as usize)? { - let block_remain = block_size - (off & (block_size - 1)); - if block_remain <= msize as u64 { - off += block_remain; - continue - } - let ringid_start = (fid << file_pool.file_nbit) | off; + let ringid_start = (fid << file_pool.file_nbit) + off; off += msize as u64; let header = unsafe { std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; @@ -349,7 +351,7 @@ impl WALLoader { payload, WALRingId { start: ringid_start, - end: (fid << file_pool.file_nbit) | off + end: (fid << file_pool.file_nbit) + off })?; }, WALRingType::First => { @@ -378,13 +380,17 @@ impl WALLoader { payload.into_boxed_slice(), WALRingId { start: ringid_start, - end: (fid << file_pool.file_nbit) | off + end: (fid << file_pool.file_nbit) + off })?; } // otherwise ignore the leftover else { off += rsize as u64; } }, WALRingType::Null => break, } + let block_remain = block_size - (off & (block_size - 1)); + if block_remain <= msize as u64 { + off += block_remain; + } } f.truncate(0)?; file_pool.remove_file(fid)?; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index a5a41d5..76533eb 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -2,7 +2,7 @@ #[allow(dead_code)] extern crate growthring; -use growthring::wal::{WALFile, WALStore, WALPos, WALBytes, WALRingId}; +use growthring::wal::{WALFile, WALStore, WALLoader, WALPos, WALBytes, WALRingId}; use indexmap::{IndexMap, map::Entry}; use rand::Rng; use std::collections::{HashMap, hash_map}; @@ -241,12 +241,13 @@ impl PaintStrokes { PaintStrokes(res) } - pub fn gen_rand<R: rand::Rng>(max_pos: u32, max_len: u32, max_col: u32, n: usize, rng: &mut R) -> PaintStrokes { + pub fn gen_rand<R: rand::Rng>(max_pos: u32, max_len: u32, + max_col: u32, n: usize, rng: &mut R) -> PaintStrokes { assert!(max_pos > 0); let mut strokes = Self::new(); for _ in 0..n { let pos = rng.gen_range(0, max_pos); - let len = rng.gen_range(1, max_pos - pos + 1); + let len = rng.gen_range(1, std::cmp::min(max_len, max_pos - pos + 1)); strokes.stroke(pos, pos + len, rng.gen_range(0, max_col)) } strokes @@ -409,3 +410,133 @@ fn test_canvas() { assert!(canvas1.is_same(&canvas2)); canvas1.print(10); } + + +pub struct PaintingSim { + pub block_nbit: u8, + pub file_nbit: u8, + pub file_cache: usize, + /// number of PaintStrokes (WriteBatch) + pub n: usize, + /// number of strokes per PaintStrokes + pub m: usize, + /// number of scheduled ticks per PaintStroke submission + pub k: usize, + /// the size of canvas + pub csize: usize, + /// max length of a single stroke + pub stroke_max_len: u32, + /// max color value + pub stroke_max_col: u32, + /// max number of strokes per PaintStroke + pub stroke_max_n: usize, + /// random seed + pub seed: u64 +} + + +impl PaintingSim { + fn run<G: 'static + FailGen>( + &self, + state: &mut WALStoreEmulState, canvas: &mut Canvas, wal: WALLoader, + ops: &mut Vec<PaintStrokes>, ringid_map: &mut HashMap<WALRingId, usize>, + fgen: Rc<G>) -> Result<(), ()> { + let mut rng = <rand::rngs::StdRng as rand::SeedableRng>::seed_from_u64(self.seed); + let mut wal = wal.recover(WALStoreEmul::new(state, fgen, |_, _|{}))?; + for _ in 0..self.n { + let pss = (0..self.m).map(|_| + PaintStrokes::gen_rand( + self.csize as u32, + self.stroke_max_len, + self.stroke_max_col, + rng.gen_range(1, self.stroke_max_n + 1), &mut rng)) + .collect::<Vec<PaintStrokes>>(); + let payloads = pss.iter().map(|e| e.to_bytes()).collect::<Vec<WALBytes>>(); + // write ahead + let (rids, ok) = wal.grow(payloads); + // keep track of the operations + for (ps, rid) in pss.iter().zip(rids.iter()) { + ops.push(ps.clone()); + ringid_map.insert(*rid, ops.len() - 1); + } + ok?; + // finish appending to WAL + /* + for rid in rids.iter() { + println!("got ringid: {:?}", rid); + } + */ + // prepare data writes + for (ps, rid) in pss.into_iter().zip(rids.into_iter()) { + canvas.prepaint(&ps, &rid); + } + // run k ticks of the fine-grained scheduler + for _ in 0..self.k { + if let Some((fin_rid, _)) = canvas.rand_paint(&mut rng) { + if let Some(rid) = fin_rid { + wal.peel(&[rid])? + } + } else { break } + } + } + // keep running until all operations are finished + while let Some((fin_rid, _)) = canvas.rand_paint(&mut rng) { + if let Some(rid) = fin_rid { + wal.peel(&[rid])? + } + } + canvas.print(40); + Ok(()) + } + + fn get_walloader(&self) -> WALLoader { + WALLoader::new(self.file_nbit, self.block_nbit, self.file_cache) + } + + pub fn get_nticks(&self) -> usize { + let mut state = WALStoreEmulState::new(); + let mut canvas = Canvas::new(self.csize); + let mut ops: Vec<PaintStrokes> = Vec::new(); + let mut ringid_map = HashMap::new(); + let fgen = Rc::new(CountFailGen::new()); + self.run(&mut state, &mut canvas, self.get_walloader(), &mut ops, &mut ringid_map, fgen.clone()).unwrap(); + fgen.get_count() + } + + fn check(state: &mut WALStoreEmulState, canvas: &mut Canvas, + wal: WALLoader, + ops: &Vec<PaintStrokes>, ringid_map: &HashMap<WALRingId, usize>) -> bool { + if ops.is_empty() { return true } + let mut last_idx = 0; + canvas.clear_queued(); + wal.recover(WALStoreEmul::new(state, Rc::new(ZeroFailGen), |payload, ringid| { + let s = PaintStrokes::from_bytes(&payload); + canvas.prepaint(&s, &ringid); + if ringid_map.get(&ringid).is_none() { println!("{:?}", ringid) } + last_idx = *ringid_map.get(&ringid).unwrap() + 1; + })).unwrap(); + println!("last = {}/{}", last_idx, ops.len()); + canvas.paint_all(); + // recover complete + let canvas0 = canvas.new_reference(&ops[..last_idx]); + let res = canvas.is_same(&canvas0); + if !res { + canvas.print(40); + canvas0.print(40); + } + res + } + + pub fn test<G: 'static + FailGen>(&self, fgen: G) -> bool { + let mut state = WALStoreEmulState::new(); + let mut canvas = Canvas::new(self.csize); + let mut ops: Vec<PaintStrokes> = Vec::new(); + let mut ringid_map = HashMap::new(); + if self.run(&mut state, &mut canvas, self.get_walloader(), &mut ops, &mut ringid_map, Rc::new(fgen)).is_err() { + if !Self::check(&mut state, &mut canvas, self.get_walloader(), &ops, &ringid_map) { + return false + } + } + true + } +} diff --git a/tests/rand_fail.rs b/tests/rand_fail.rs index e01fe58..d129e4f 100644 --- a/tests/rand_fail.rs +++ b/tests/rand_fail.rs @@ -1,109 +1,30 @@ #[cfg(test)] mod common; -use std::collections::HashMap; -use std::rc::Rc; -use growthring::wal::{WALLoader, WALWriter, WALStore, WALRingId, WALBytes, WALPos}; -use common::{FailGen, SingleFailGen, CountFailGen, Canvas, WALStoreEmulState, WALStoreEmul, PaintStrokes}; -fn run<G: 'static + FailGen, R: rand::Rng>(n: usize, m: usize, k: usize, - state: &mut WALStoreEmulState, canvas: &mut Canvas, wal: WALLoader, - ops: &mut Vec<PaintStrokes>, ringid_map: &mut HashMap<WALRingId, usize>, - fgen: Rc<G>, rng: &mut R) -> Result<(), ()> { - let mut wal = wal.recover(WALStoreEmul::new(state, fgen, |_, _|{}))?; - for _ in 0..n { - let s = (0..m).map(|_| - PaintStrokes::gen_rand(1000, 10, 256, 5, rng)).collect::<Vec<PaintStrokes>>(); - let recs = s.iter().map(|e| e.to_bytes()).collect::<Vec<WALBytes>>(); - // write ahead - let (rids, ok) = wal.grow(recs); - for (e, rid) in s.iter().zip(rids.iter()) { - ops.push(e.clone()); - ringid_map.insert(*rid, ops.len() - 1); - } - ok?; - //for rid in rids.iter() { - // println!("got ring id: {:?}", rid); - //} - // WAL append done - // prepare data writes - for (e, rid) in s.into_iter().zip(rids.iter()) { - canvas.prepaint(&e, &*rid); - } - // run the scheduler for a bit - for _ in 0..k { - if let Some((fin_rid, t)) = canvas.rand_paint(rng) { - if let Some(rid) = fin_rid { - wal.peel(&[rid])? - } - //trace.push(t); - } else { break } - } - } - while let Some((fin_rid, t)) = canvas.rand_paint(rng) { - if let Some(rid) = fin_rid { - wal.peel(&[rid])? - } - //trace.push(t); - } - canvas.print(40); - Ok(()) -} - -fn check(state: &mut WALStoreEmulState, canvas: &mut Canvas, - wal: WALLoader, - ops: &Vec<PaintStrokes>, ringid_map: &HashMap<WALRingId, usize>) -> bool { - if ops.is_empty() { return true } - let mut last_idx = 0; - canvas.clear_queued(); - wal.recover(WALStoreEmul::new(state, Rc::new(common::ZeroFailGen), |payload, ringid| { - let s = PaintStrokes::from_bytes(&payload); - canvas.prepaint(&s, &ringid); - last_idx = *ringid_map.get(&ringid).unwrap() + 1; - })).unwrap(); - println!("last = {}/{}", last_idx, ops.len() - 1); - canvas.paint_all(); - // recover complete - let canvas0 = canvas.new_reference(&ops[..last_idx]); - let res = canvas.is_same(&canvas0); - if !res { - canvas.print(40); - canvas0.print(40); - } - res -} - -fn get_nticks(n: usize, m: usize, k: usize, csize: usize) -> usize { - let mut rng = <rand::rngs::StdRng as rand::SeedableRng>::from_seed([0; 32]); - let mut state = WALStoreEmulState::new(); - let wal = WALLoader::new(9, 8, 1000); - let mut ops: Vec<PaintStrokes> = Vec::new(); - let mut ringid_map = HashMap::new(); - let mut canvas = Canvas::new(csize); - let fgen = Rc::new(CountFailGen::new()); - run(n, m, k, &mut state, &mut canvas, wal, &mut ops, &mut ringid_map, fgen.clone(), &mut rng).unwrap(); - fgen.get_count() -} - -fn run_(n: usize, m: usize, k: usize, csize: usize) { - let nticks = get_nticks(n, m, k, csize); +fn single_point_failure(sim: &common::PaintingSim) { + let nticks = sim.get_nticks(); println!("nticks = {}", nticks); for i in 0..nticks { - let mut rng = <rand::rngs::StdRng as rand::SeedableRng>::from_seed([0; 32]); - let mut state = WALStoreEmulState::new(); - let wal = WALLoader::new(9, 8, 1000); - let mut ops: Vec<PaintStrokes> = Vec::new(); - let mut ringid_map = HashMap::new(); - let mut canvas = Canvas::new(csize); - let fgen = Rc::new(SingleFailGen::new(i)); - if run(n, m, k, &mut state, &mut canvas, wal, &mut ops, &mut ringid_map, fgen, &mut rng).is_err() { - let wal = WALLoader::new(9, 8, 1000); - assert!(check(&mut state, &mut canvas, wal, &ops, &ringid_map)); - } + print!("fail pos = {}, ", i); + assert!(sim.test(common::SingleFailGen::new(i))); } } #[test] fn test_rand_fail() { - run_(100, 10, 100, 1000) + let sim = common::PaintingSim { + block_nbit: 8, + file_nbit: 9, + file_cache: 1000, + n: 100, + m: 10, + k: 100, + csize: 1000, + stroke_max_len: 10, + stroke_max_col: 256, + stroke_max_n: 5, + seed: 0 + }; + single_point_failure(&sim); } |