summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock175
-rw-r--r--Cargo.toml1
-rw-r--r--src/wal.rs30
-rw-r--r--tests/common/mod.rs137
-rw-r--r--tests/rand_fail.rs115
5 files changed, 346 insertions, 112 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 0ba1546..7e4e668 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -84,6 +84,101 @@ dependencies = [
]
[[package]]
+name = "futures"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-channel"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
+dependencies = [
+ "futures-core",
+ "futures-sink",
+]
+
+[[package]]
+name = "futures-core"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
+
+[[package]]
+name = "futures-executor"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789"
+
+[[package]]
+name = "futures-macro"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39"
+dependencies = [
+ "proc-macro-hack",
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "futures-sink"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc"
+
+[[package]]
+name = "futures-task"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626"
+dependencies = [
+ "once_cell",
+]
+
+[[package]]
+name = "futures-util"
+version = "0.3.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-io",
+ "futures-macro",
+ "futures-sink",
+ "futures-task",
+ "memchr",
+ "pin-project",
+ "pin-utils",
+ "proc-macro-hack",
+ "proc-macro-nested",
+ "slab",
+]
+
+[[package]]
name = "getrandom"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -99,6 +194,7 @@ name = "growth-ring"
version = "0.1.0"
dependencies = [
"crc",
+ "futures",
"hex",
"indexmap",
"libc",
@@ -175,6 +271,38 @@ dependencies = [
]
[[package]]
+name = "once_cell"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
+
+[[package]]
+name = "pin-project"
+version = "0.4.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e75373ff9037d112bb19bc61333a06a159eaeb217660dcfbea7d88e1db823919"
+dependencies = [
+ "pin-project-internal",
+]
+
+[[package]]
+name = "pin-project-internal"
+version = "0.4.20"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "10b4b44893d3c370407a1d6a5cfde7c41ae0478e31c516c85f67eb3adc51be6d"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
+name = "pin-utils"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
+
+[[package]]
name = "ppv-lite86"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -187,6 +315,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4"
[[package]]
+name = "proc-macro-nested"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0afe1bd463b9e9ed51d0e0f0b50b6b146aec855c56fd182bb242388710a9b6de"
+
+[[package]]
+name = "proc-macro2"
+version = "1.0.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa"
+dependencies = [
+ "unicode-xid",
+]
+
+[[package]]
+name = "quote"
+version = "1.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37"
+dependencies = [
+ "proc-macro2",
+]
+
+[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -255,6 +407,23 @@ dependencies = [
]
[[package]]
+name = "slab"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
+
+[[package]]
+name = "syn"
+version = "1.0.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b5304cfdf27365b7585c25d4af91b35016ed21ef88f17ced89c7093b43dba8b6"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "unicode-xid",
+]
+
+[[package]]
name = "thread_local"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -264,6 +433,12 @@ dependencies = [
]
[[package]]
+name = "unicode-xid"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
+
+[[package]]
name = "void"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 7eae306..50f0376 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,6 +18,7 @@ libc = "0.2.44"
nix = "0.17.0"
rand = "0.7.3"
indexmap = "1.4.0"
+futures = "0.3"
[lib]
name = "growthring"
diff --git a/src/wal.rs b/src/wal.rs
index 46baeb8..fe85c41 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -171,13 +171,15 @@ pub struct WALWriter<F: WALStore> {
block_buffer: WALBytes,
block_size: u32,
next_complete: WALPos,
- io_complete: BinaryHeap<WALRingId>
+ io_complete: BinaryHeap<WALRingId>,
+ msize: usize
}
impl<F: WALStore> WALWriter<F> {
fn new(state: WALState, file_pool: WALFilePool<F>) -> Self {
let mut b = Vec::new();
let block_size = 1 << file_pool.block_nbit as u32;
+ let msize = std::mem::size_of::<WALRingBlob>();
b.resize(block_size as usize, 0);
WALWriter{
state,
@@ -186,6 +188,7 @@ impl<F: WALStore> WALWriter<F> {
block_size,
next_complete: 0,
io_complete: BinaryHeap::new(),
+ msize
}
}
@@ -195,7 +198,7 @@ impl<F: WALStore> WALWriter<F> {
pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> (Box<[WALRingId]>, Result<(), ()>) {
let mut res = Vec::new();
let mut writes = Vec::new();
- let msize = std::mem::size_of::<WALRingBlob>() as u32;
+ let msize = self.msize as u32;
// the global offest of the begining of the block
// the start of the unwritten data
let mut bbuff_start = self.state.next as u32 & (self.block_size - 1);
@@ -275,7 +278,7 @@ impl<F: WALStore> WALWriter<F> {
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
/// complete so that it could automatically remove obsolete WAL files.
pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) -> Result<(), ()> {
- let msize = std::mem::size_of::<WALRingBlob>() as u64;
+ let msize = self.msize as u64;
let block_size = self.block_size as u64;
for rec in records.as_ref() {
self.io_complete.push(*rec);
@@ -305,13 +308,17 @@ pub struct WALLoader {
file_nbit: u8,
block_nbit: u8,
cache_size: usize,
+ msize: usize,
filename_fmt: regex::Regex
}
impl WALLoader {
pub fn new(file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
+ let msize = std::mem::size_of::<WALRingBlob>();
+ assert!(file_nbit > block_nbit);
+ assert!(msize < 1 << block_nbit);
let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap();
- WALLoader{ file_nbit, block_nbit, cache_size, filename_fmt }
+ WALLoader{ file_nbit, block_nbit, cache_size, msize, filename_fmt }
}
/// Recover by reading the WAL log files.
@@ -330,12 +337,7 @@ impl WALLoader {
let f = file_pool.get_file(fid, false)?;
let mut off = 0;
while let Some(header_raw) = f.read(off, msize as usize)? {
- let block_remain = block_size - (off & (block_size - 1));
- if block_remain <= msize as u64 {
- off += block_remain;
- continue
- }
- let ringid_start = (fid << file_pool.file_nbit) | off;
+ let ringid_start = (fid << file_pool.file_nbit) + off;
off += msize as u64;
let header = unsafe {
std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
@@ -349,7 +351,7 @@ impl WALLoader {
payload,
WALRingId {
start: ringid_start,
- end: (fid << file_pool.file_nbit) | off
+ end: (fid << file_pool.file_nbit) + off
})?;
},
WALRingType::First => {
@@ -378,13 +380,17 @@ impl WALLoader {
payload.into_boxed_slice(),
WALRingId {
start: ringid_start,
- end: (fid << file_pool.file_nbit) | off
+ end: (fid << file_pool.file_nbit) + off
})?;
} // otherwise ignore the leftover
else { off += rsize as u64; }
},
WALRingType::Null => break,
}
+ let block_remain = block_size - (off & (block_size - 1));
+ if block_remain <= msize as u64 {
+ off += block_remain;
+ }
}
f.truncate(0)?;
file_pool.remove_file(fid)?;
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index a5a41d5..76533eb 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -2,7 +2,7 @@
#[allow(dead_code)]
extern crate growthring;
-use growthring::wal::{WALFile, WALStore, WALPos, WALBytes, WALRingId};
+use growthring::wal::{WALFile, WALStore, WALLoader, WALPos, WALBytes, WALRingId};
use indexmap::{IndexMap, map::Entry};
use rand::Rng;
use std::collections::{HashMap, hash_map};
@@ -241,12 +241,13 @@ impl PaintStrokes {
PaintStrokes(res)
}
- pub fn gen_rand<R: rand::Rng>(max_pos: u32, max_len: u32, max_col: u32, n: usize, rng: &mut R) -> PaintStrokes {
+ pub fn gen_rand<R: rand::Rng>(max_pos: u32, max_len: u32,
+ max_col: u32, n: usize, rng: &mut R) -> PaintStrokes {
assert!(max_pos > 0);
let mut strokes = Self::new();
for _ in 0..n {
let pos = rng.gen_range(0, max_pos);
- let len = rng.gen_range(1, max_pos - pos + 1);
+ let len = rng.gen_range(1, std::cmp::min(max_len, max_pos - pos + 1));
strokes.stroke(pos, pos + len, rng.gen_range(0, max_col))
}
strokes
@@ -409,3 +410,133 @@ fn test_canvas() {
assert!(canvas1.is_same(&canvas2));
canvas1.print(10);
}
+
+
+pub struct PaintingSim {
+ pub block_nbit: u8,
+ pub file_nbit: u8,
+ pub file_cache: usize,
+ /// number of PaintStrokes (WriteBatch)
+ pub n: usize,
+ /// number of strokes per PaintStrokes
+ pub m: usize,
+ /// number of scheduled ticks per PaintStroke submission
+ pub k: usize,
+ /// the size of canvas
+ pub csize: usize,
+ /// max length of a single stroke
+ pub stroke_max_len: u32,
+ /// max color value
+ pub stroke_max_col: u32,
+ /// max number of strokes per PaintStroke
+ pub stroke_max_n: usize,
+ /// random seed
+ pub seed: u64
+}
+
+
+impl PaintingSim {
+ fn run<G: 'static + FailGen>(
+ &self,
+ state: &mut WALStoreEmulState, canvas: &mut Canvas, wal: WALLoader,
+ ops: &mut Vec<PaintStrokes>, ringid_map: &mut HashMap<WALRingId, usize>,
+ fgen: Rc<G>) -> Result<(), ()> {
+ let mut rng = <rand::rngs::StdRng as rand::SeedableRng>::seed_from_u64(self.seed);
+ let mut wal = wal.recover(WALStoreEmul::new(state, fgen, |_, _|{}))?;
+ for _ in 0..self.n {
+ let pss = (0..self.m).map(|_|
+ PaintStrokes::gen_rand(
+ self.csize as u32,
+ self.stroke_max_len,
+ self.stroke_max_col,
+ rng.gen_range(1, self.stroke_max_n + 1), &mut rng))
+ .collect::<Vec<PaintStrokes>>();
+ let payloads = pss.iter().map(|e| e.to_bytes()).collect::<Vec<WALBytes>>();
+ // write ahead
+ let (rids, ok) = wal.grow(payloads);
+ // keep track of the operations
+ for (ps, rid) in pss.iter().zip(rids.iter()) {
+ ops.push(ps.clone());
+ ringid_map.insert(*rid, ops.len() - 1);
+ }
+ ok?;
+ // finish appending to WAL
+ /*
+ for rid in rids.iter() {
+ println!("got ringid: {:?}", rid);
+ }
+ */
+ // prepare data writes
+ for (ps, rid) in pss.into_iter().zip(rids.into_iter()) {
+ canvas.prepaint(&ps, &rid);
+ }
+ // run k ticks of the fine-grained scheduler
+ for _ in 0..self.k {
+ if let Some((fin_rid, _)) = canvas.rand_paint(&mut rng) {
+ if let Some(rid) = fin_rid {
+ wal.peel(&[rid])?
+ }
+ } else { break }
+ }
+ }
+ // keep running until all operations are finished
+ while let Some((fin_rid, _)) = canvas.rand_paint(&mut rng) {
+ if let Some(rid) = fin_rid {
+ wal.peel(&[rid])?
+ }
+ }
+ canvas.print(40);
+ Ok(())
+ }
+
+ fn get_walloader(&self) -> WALLoader {
+ WALLoader::new(self.file_nbit, self.block_nbit, self.file_cache)
+ }
+
+ pub fn get_nticks(&self) -> usize {
+ let mut state = WALStoreEmulState::new();
+ let mut canvas = Canvas::new(self.csize);
+ let mut ops: Vec<PaintStrokes> = Vec::new();
+ let mut ringid_map = HashMap::new();
+ let fgen = Rc::new(CountFailGen::new());
+ self.run(&mut state, &mut canvas, self.get_walloader(), &mut ops, &mut ringid_map, fgen.clone()).unwrap();
+ fgen.get_count()
+ }
+
+ fn check(state: &mut WALStoreEmulState, canvas: &mut Canvas,
+ wal: WALLoader,
+ ops: &Vec<PaintStrokes>, ringid_map: &HashMap<WALRingId, usize>) -> bool {
+ if ops.is_empty() { return true }
+ let mut last_idx = 0;
+ canvas.clear_queued();
+ wal.recover(WALStoreEmul::new(state, Rc::new(ZeroFailGen), |payload, ringid| {
+ let s = PaintStrokes::from_bytes(&payload);
+ canvas.prepaint(&s, &ringid);
+ if ringid_map.get(&ringid).is_none() { println!("{:?}", ringid) }
+ last_idx = *ringid_map.get(&ringid).unwrap() + 1;
+ })).unwrap();
+ println!("last = {}/{}", last_idx, ops.len());
+ canvas.paint_all();
+ // recover complete
+ let canvas0 = canvas.new_reference(&ops[..last_idx]);
+ let res = canvas.is_same(&canvas0);
+ if !res {
+ canvas.print(40);
+ canvas0.print(40);
+ }
+ res
+ }
+
+ pub fn test<G: 'static + FailGen>(&self, fgen: G) -> bool {
+ let mut state = WALStoreEmulState::new();
+ let mut canvas = Canvas::new(self.csize);
+ let mut ops: Vec<PaintStrokes> = Vec::new();
+ let mut ringid_map = HashMap::new();
+ if self.run(&mut state, &mut canvas, self.get_walloader(), &mut ops, &mut ringid_map, Rc::new(fgen)).is_err() {
+ if !Self::check(&mut state, &mut canvas, self.get_walloader(), &ops, &ringid_map) {
+ return false
+ }
+ }
+ true
+ }
+}
diff --git a/tests/rand_fail.rs b/tests/rand_fail.rs
index e01fe58..d129e4f 100644
--- a/tests/rand_fail.rs
+++ b/tests/rand_fail.rs
@@ -1,109 +1,30 @@
#[cfg(test)]
mod common;
-use std::collections::HashMap;
-use std::rc::Rc;
-use growthring::wal::{WALLoader, WALWriter, WALStore, WALRingId, WALBytes, WALPos};
-use common::{FailGen, SingleFailGen, CountFailGen, Canvas, WALStoreEmulState, WALStoreEmul, PaintStrokes};
-fn run<G: 'static + FailGen, R: rand::Rng>(n: usize, m: usize, k: usize,
- state: &mut WALStoreEmulState, canvas: &mut Canvas, wal: WALLoader,
- ops: &mut Vec<PaintStrokes>, ringid_map: &mut HashMap<WALRingId, usize>,
- fgen: Rc<G>, rng: &mut R) -> Result<(), ()> {
- let mut wal = wal.recover(WALStoreEmul::new(state, fgen, |_, _|{}))?;
- for _ in 0..n {
- let s = (0..m).map(|_|
- PaintStrokes::gen_rand(1000, 10, 256, 5, rng)).collect::<Vec<PaintStrokes>>();
- let recs = s.iter().map(|e| e.to_bytes()).collect::<Vec<WALBytes>>();
- // write ahead
- let (rids, ok) = wal.grow(recs);
- for (e, rid) in s.iter().zip(rids.iter()) {
- ops.push(e.clone());
- ringid_map.insert(*rid, ops.len() - 1);
- }
- ok?;
- //for rid in rids.iter() {
- // println!("got ring id: {:?}", rid);
- //}
- // WAL append done
- // prepare data writes
- for (e, rid) in s.into_iter().zip(rids.iter()) {
- canvas.prepaint(&e, &*rid);
- }
- // run the scheduler for a bit
- for _ in 0..k {
- if let Some((fin_rid, t)) = canvas.rand_paint(rng) {
- if let Some(rid) = fin_rid {
- wal.peel(&[rid])?
- }
- //trace.push(t);
- } else { break }
- }
- }
- while let Some((fin_rid, t)) = canvas.rand_paint(rng) {
- if let Some(rid) = fin_rid {
- wal.peel(&[rid])?
- }
- //trace.push(t);
- }
- canvas.print(40);
- Ok(())
-}
-
-fn check(state: &mut WALStoreEmulState, canvas: &mut Canvas,
- wal: WALLoader,
- ops: &Vec<PaintStrokes>, ringid_map: &HashMap<WALRingId, usize>) -> bool {
- if ops.is_empty() { return true }
- let mut last_idx = 0;
- canvas.clear_queued();
- wal.recover(WALStoreEmul::new(state, Rc::new(common::ZeroFailGen), |payload, ringid| {
- let s = PaintStrokes::from_bytes(&payload);
- canvas.prepaint(&s, &ringid);
- last_idx = *ringid_map.get(&ringid).unwrap() + 1;
- })).unwrap();
- println!("last = {}/{}", last_idx, ops.len() - 1);
- canvas.paint_all();
- // recover complete
- let canvas0 = canvas.new_reference(&ops[..last_idx]);
- let res = canvas.is_same(&canvas0);
- if !res {
- canvas.print(40);
- canvas0.print(40);
- }
- res
-}
-
-fn get_nticks(n: usize, m: usize, k: usize, csize: usize) -> usize {
- let mut rng = <rand::rngs::StdRng as rand::SeedableRng>::from_seed([0; 32]);
- let mut state = WALStoreEmulState::new();
- let wal = WALLoader::new(9, 8, 1000);
- let mut ops: Vec<PaintStrokes> = Vec::new();
- let mut ringid_map = HashMap::new();
- let mut canvas = Canvas::new(csize);
- let fgen = Rc::new(CountFailGen::new());
- run(n, m, k, &mut state, &mut canvas, wal, &mut ops, &mut ringid_map, fgen.clone(), &mut rng).unwrap();
- fgen.get_count()
-}
-
-fn run_(n: usize, m: usize, k: usize, csize: usize) {
- let nticks = get_nticks(n, m, k, csize);
+fn single_point_failure(sim: &common::PaintingSim) {
+ let nticks = sim.get_nticks();
println!("nticks = {}", nticks);
for i in 0..nticks {
- let mut rng = <rand::rngs::StdRng as rand::SeedableRng>::from_seed([0; 32]);
- let mut state = WALStoreEmulState::new();
- let wal = WALLoader::new(9, 8, 1000);
- let mut ops: Vec<PaintStrokes> = Vec::new();
- let mut ringid_map = HashMap::new();
- let mut canvas = Canvas::new(csize);
- let fgen = Rc::new(SingleFailGen::new(i));
- if run(n, m, k, &mut state, &mut canvas, wal, &mut ops, &mut ringid_map, fgen, &mut rng).is_err() {
- let wal = WALLoader::new(9, 8, 1000);
- assert!(check(&mut state, &mut canvas, wal, &ops, &ringid_map));
- }
+ print!("fail pos = {}, ", i);
+ assert!(sim.test(common::SingleFailGen::new(i)));
}
}
#[test]
fn test_rand_fail() {
- run_(100, 10, 100, 1000)
+ let sim = common::PaintingSim {
+ block_nbit: 8,
+ file_nbit: 9,
+ file_cache: 1000,
+ n: 100,
+ m: 10,
+ k: 100,
+ csize: 1000,
+ stroke_max_len: 10,
+ stroke_max_col: 256,
+ stroke_max_n: 5,
+ seed: 0
+ };
+ single_point_failure(&sim);
}