aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rustfmt.toml8
-rw-r--r--src/lib.rs3
-rw-r--r--src/wal.rs228
-rw-r--r--tests/common/mod.rs374
-rw-r--r--tests/rand_fail.rs3
5 files changed, 419 insertions, 197 deletions
diff --git a/rustfmt.toml b/rustfmt.toml
new file mode 100644
index 0000000..768d2e3
--- /dev/null
+++ b/rustfmt.toml
@@ -0,0 +1,8 @@
+edition = "2018"
+unstable_features = true
+max_width = 80
+binop_separator = "Back"
+inline_attribute_width = 80
+fn_params_layout = "Compressed"
+hard_tabs = false
+tab_spaces = 4
diff --git a/src/lib.rs b/src/lib.rs
index 7635dab..1db80e8 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,2 +1,3 @@
-#[macro_use] extern crate scan_fmt;
+#[macro_use]
+extern crate scan_fmt;
pub mod wal;
diff --git a/src/wal.rs b/src/wal.rs
index fe85c41..39c4390 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -7,7 +7,7 @@ enum WALRingType {
Full,
First,
Middle,
- Last
+ Last,
}
#[repr(packed)]
@@ -25,18 +25,27 @@ pub type WALPos = u64;
#[derive(Eq, PartialEq, Copy, Clone, Debug, Hash)]
pub struct WALRingId {
start: WALPos,
- end: WALPos
+ end: WALPos,
}
impl WALRingId {
- pub fn empty_id() -> Self { WALRingId { start: 0, end: 0 } }
- pub fn get_start(&self) -> WALPos { self.start }
- pub fn get_end(&self) -> WALPos { self.end }
+ pub fn empty_id() -> Self {
+ WALRingId { start: 0, end: 0 }
+ }
+ pub fn get_start(&self) -> WALPos {
+ self.start
+ }
+ pub fn get_end(&self) -> WALPos {
+ self.end
+ }
}
impl Ord for WALRingId {
fn cmp(&self, other: &WALRingId) -> std::cmp::Ordering {
- other.start.cmp(&self.start).then_with(|| other.end.cmp(&self.end))
+ other
+ .start
+ .cmp(&self.start)
+ .then_with(|| other.end.cmp(&self.end))
}
}
@@ -68,14 +77,22 @@ pub trait WALFile {
/// should be _atomic_ (the entire single write should be all or nothing).
fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
/// Read data with offset.
- fn read(&self, offset: WALPos, length: usize) -> Result<Option<WALBytes>, ()>;
+ fn read(
+ &self,
+ offset: WALPos,
+ length: usize,
+ ) -> Result<Option<WALBytes>, ()>;
}
pub trait WALStore {
type FileNameIter: Iterator<Item = String>;
/// Open a file given the filename, create the file if not exists when `touch` is `true`.
- fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()>;
+ fn open_file(
+ &mut self,
+ filename: &str,
+ touch: bool,
+ ) -> Result<Box<dyn WALFile>, ()>;
/// Unlink a file given the filename.
fn remove_file(&mut self, filename: &str) -> Result<(), ()>;
/// Enumerate all WAL filenames. It should include all WAL files that are previously opened
@@ -84,7 +101,11 @@ pub trait WALStore {
/// Apply the payload during recovery. An invocation of the callback waits the application for
/// redoing the given operation to ensure its state is consistent. We assume the necessary
/// changes by the payload has already been persistent when the callback returns.
- fn apply_payload(&mut self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()>;
+ fn apply_payload(
+ &mut self,
+ payload: WALBytes,
+ ringid: WALRingId,
+ ) -> Result<(), ()>;
}
/// The middle layer that manages WAL file handles and invokes public trait functions to actually
@@ -114,15 +135,22 @@ impl<F: WALStore> WALFilePool<F> {
format!("{:08x}.log", fid)
}
- fn get_file(&mut self, fid: u64, touch: bool) -> Result<&'static dyn WALFile, ()> {
+ fn get_file(
+ &mut self,
+ fid: u64,
+ touch: bool,
+ ) -> Result<&'static dyn WALFile, ()> {
let h = match self.handles.get(&fid) {
Some(h) => &**h,
None => {
- self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch)?);
+ self.handles.put(
+ fid,
+ self.store.open_file(&Self::get_fname(fid), touch)?,
+ );
&**self.handles.get(&fid).unwrap()
}
};
- Ok(unsafe {&*(h as *const dyn WALFile)})
+ Ok(unsafe { &*(h as *const dyn WALFile) })
}
fn get_fid(&mut self, fname: &str) -> WALFileId {
@@ -135,14 +163,19 @@ impl<F: WALStore> WALFilePool<F> {
let mut fid = writes[0].0 >> self.file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
let mut alloc_end = alloc_start + writes[0].1.len() as u64;
- let files = writes.iter().map(|(off, _)|
- self.get_file((*off) >> self.file_nbit, true)).collect::<Result<Vec<&dyn WALFile>, ()>>()?;
+ let files = writes
+ .iter()
+ .map(|(off, _)| self.get_file((*off) >> self.file_nbit, true))
+ .collect::<Result<Vec<&dyn WALFile>, ()>>()?;
// prepare file handles
let mut last_h = files[0];
for ((off, w), h) in writes[1..].iter().zip(files[1..].iter()) {
let next_fid = off >> self.file_nbit;
if next_fid != fid {
- last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?;
+ last_h.allocate(
+ alloc_start,
+ (alloc_end - alloc_start) as usize,
+ )?;
last_h = *h;
alloc_start = 0;
alloc_end = alloc_start + w.len() as u64;
@@ -153,7 +186,8 @@ impl<F: WALStore> WALFilePool<F> {
}
last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?;
for (off, w) in writes.into_iter() {
- self.get_file(off >> self.file_nbit, true)?.write(off & (self.file_size - 1), w)?;
+ self.get_file(off >> self.file_nbit, true)?
+ .write(off & (self.file_size - 1), w)?;
}
Ok(())
}
@@ -162,7 +196,9 @@ impl<F: WALStore> WALFilePool<F> {
self.store.remove_file(&Self::get_fname(fid))
}
- fn reset(&mut self) { self.handles.clear() }
+ fn reset(&mut self) {
+ self.handles.clear()
+ }
}
pub struct WALWriter<F: WALStore> {
@@ -172,7 +208,7 @@ pub struct WALWriter<F: WALStore> {
block_size: u32,
next_complete: WALPos,
io_complete: BinaryHeap<WALRingId>,
- msize: usize
+ msize: usize,
}
impl<F: WALStore> WALWriter<F> {
@@ -181,21 +217,24 @@ impl<F: WALStore> WALWriter<F> {
let block_size = 1 << file_pool.block_nbit as u32;
let msize = std::mem::size_of::<WALRingBlob>();
b.resize(block_size as usize, 0);
- WALWriter{
+ WALWriter {
state,
file_pool,
block_buffer: b.into_boxed_slice(),
block_size,
next_complete: 0,
io_complete: BinaryHeap::new(),
- msize
+ msize,
}
}
/// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
/// function returns. The caller then has the knowledge of WAL writes so it should defer
/// actual data writes after WAL writes.
- pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> (Box<[WALRingId]>, Result<(), ()>) {
+ pub fn grow<T: AsRef<[WALBytes]>>(
+ &mut self,
+ records: T,
+ ) -> (Box<[WALRingId]>, Result<(), ()>) {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = self.msize as u32;
@@ -213,9 +252,14 @@ impl<F: WALStore> WALWriter<F> {
let remain = self.block_size - bbuff_cur;
if remain > msize {
let d = remain - msize;
- let rs0 = self.state.next + (bbuff_cur - bbuff_start) as u64;
- let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>(
- (&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())};
+ let rs0 =
+ self.state.next + (bbuff_cur - bbuff_start) as u64;
+ let blob = unsafe {
+ std::mem::transmute::<*mut u8, &mut WALRingBlob>(
+ (&mut self.block_buffer[bbuff_cur as usize..])
+ .as_mut_ptr(),
+ )
+ };
bbuff_cur += msize;
if d >= rsize {
// the remaining rec fits in the block
@@ -228,25 +272,28 @@ impl<F: WALStore> WALWriter<F> {
(rs0, WALRingType::Full)
};
blob.rtype = rt;
- &mut self.block_buffer[
- bbuff_cur as usize..
- bbuff_cur as usize + payload.len()].copy_from_slice(payload);
+ &mut self.block_buffer[bbuff_cur as usize..
+ bbuff_cur as usize + payload.len()]
+ .copy_from_slice(payload);
bbuff_cur += rsize;
rsize = 0;
- let end = self.state.next + (bbuff_cur - bbuff_start) as u64;
- res.push(WALRingId{start: rs, end });
+ let end =
+ self.state.next + (bbuff_cur - bbuff_start) as u64;
+ res.push(WALRingId { start: rs, end });
} else {
// the remaining block can only accommodate partial rec
let payload = &rec[..d as usize];
blob.crc32 = crc::crc32::checksum_ieee(payload);
blob.rsize = d;
- blob.rtype = if ring_start.is_some() {WALRingType::Middle} else {
+ blob.rtype = if ring_start.is_some() {
+ WALRingType::Middle
+ } else {
ring_start = Some(rs0);
WALRingType::First
};
- &mut self.block_buffer[
- bbuff_cur as usize..
- bbuff_cur as usize + payload.len()].copy_from_slice(payload);
+ &mut self.block_buffer[bbuff_cur as usize..
+ bbuff_cur as usize + payload.len()]
+ .copy_from_slice(payload);
bbuff_cur += d;
rsize -= d;
rec = &rec[d as usize..];
@@ -256,9 +303,12 @@ impl<F: WALStore> WALWriter<F> {
bbuff_cur = self.block_size;
}
if bbuff_cur == self.block_size {
- writes.push((self.state.next,
- self.block_buffer[bbuff_start as usize..]
- .to_vec().into_boxed_slice()));
+ writes.push((
+ self.state.next,
+ self.block_buffer[bbuff_start as usize..]
+ .to_vec()
+ .into_boxed_slice(),
+ ));
self.state.next += (self.block_size - bbuff_start) as u64;
bbuff_start = 0;
bbuff_cur = 0;
@@ -266,9 +316,12 @@ impl<F: WALStore> WALWriter<F> {
}
}
if bbuff_cur > bbuff_start {
- writes.push((self.state.next,
- self.block_buffer[bbuff_start as usize..bbuff_cur as usize]
- .to_vec().into_boxed_slice()));
+ writes.push((
+ self.state.next,
+ self.block_buffer[bbuff_start as usize..bbuff_cur as usize]
+ .to_vec()
+ .into_boxed_slice(),
+ ));
self.state.next += (bbuff_cur - bbuff_start) as u64;
}
@@ -277,16 +330,20 @@ impl<F: WALStore> WALWriter<F> {
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
/// complete so that it could automatically remove obsolete WAL files.
- pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) -> Result<(), ()> {
+ pub fn peel<T: AsRef<[WALRingId]>>(
+ &mut self,
+ records: T,
+ ) -> Result<(), ()> {
let msize = self.msize as u64;
let block_size = self.block_size as u64;
for rec in records.as_ref() {
self.io_complete.push(*rec);
}
let orig_fid = self.state.first_fid;
- while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) {
+ while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start))
+ {
if s != self.next_complete {
- break
+ break;
}
let mut m = self.io_complete.pop().unwrap();
let block_remain = block_size - (m.end & (block_size - 1));
@@ -309,7 +366,7 @@ pub struct WALLoader {
block_nbit: u8,
cache_size: usize,
msize: usize,
- filename_fmt: regex::Regex
+ filename_fmt: regex::Regex,
}
impl WALLoader {
@@ -318,17 +375,30 @@ impl WALLoader {
assert!(file_nbit > block_nbit);
assert!(msize < 1 << block_nbit);
let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap();
- WALLoader{ file_nbit, block_nbit, cache_size, msize, filename_fmt }
+ WALLoader {
+ file_nbit,
+ block_nbit,
+ cache_size,
+ msize,
+ filename_fmt,
+ }
}
/// Recover by reading the WAL log files.
pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> {
- let mut file_pool = WALFilePool::new(store, self.file_nbit, self.block_nbit, self.cache_size);
+ let mut file_pool = WALFilePool::new(
+ store,
+ self.file_nbit,
+ self.block_nbit,
+ self.cache_size,
+ );
let block_size = 1 << file_pool.block_nbit;
- let msize = std::mem::size_of::<WALRingBlob>() as u32;
- let mut logfiles: Vec<String> = file_pool.store
+ let msize = self.msize as u32;
+ let mut logfiles: Vec<String> = file_pool
+ .store
.enumerate_files()?
- .filter(|f| self.filename_fmt.is_match(f)).collect();
+ .filter(|f| self.filename_fmt.is_match(f))
+ .collect();
// TODO: check for missing logfiles
logfiles.sort();
let mut chunks = None;
@@ -340,7 +410,10 @@ impl WALLoader {
let ringid_start = (fid << file_pool.file_nbit) + off;
off += msize as u64;
let header = unsafe {
- std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
+ std::mem::transmute::<*const u8, &WALRingBlob>(
+ header_raw.as_ptr(),
+ )
+ };
let rsize = header.rsize;
match header.rtype {
WALRingType::Full => {
@@ -351,26 +424,36 @@ impl WALLoader {
payload,
WALRingId {
start: ringid_start,
- end: (fid << file_pool.file_nbit) + off
- })?;
- },
+ end: (fid << file_pool.file_nbit) + off,
+ },
+ )?;
+ }
WALRingType::First => {
assert!(chunks.is_none());
- chunks = Some((vec![f.read(off, rsize as usize)?.ok_or(())?], ringid_start));
+ chunks = Some((
+ vec![f.read(off, rsize as usize)?.ok_or(())?],
+ ringid_start,
+ ));
off += rsize as u64;
- },
+ }
WALRingType::Middle => {
if let Some((chunks, _)) = &mut chunks {
- chunks.push(f.read(off, rsize as usize)?.ok_or(())?);
+ chunks
+ .push(f.read(off, rsize as usize)?.ok_or(())?);
} // otherwise ignore the leftover
off += rsize as u64;
- },
+ }
WALRingType::Last => {
- if let Some((mut chunks, ringid_start)) = chunks.take() {
- chunks.push(f.read(off, rsize as usize)?.ok_or(())?);
+ if let Some((mut chunks, ringid_start)) = chunks.take()
+ {
+ chunks
+ .push(f.read(off, rsize as usize)?.ok_or(())?);
off += rsize as u64;
let mut payload = Vec::new();
- payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
+ payload.resize(
+ chunks.iter().fold(0, |acc, v| acc + v.len()),
+ 0,
+ );
let mut ps = &mut payload[..];
for c in chunks {
ps[..c.len()].copy_from_slice(&*c);
@@ -380,11 +463,15 @@ impl WALLoader {
payload.into_boxed_slice(),
WALRingId {
start: ringid_start,
- end: (fid << file_pool.file_nbit) + off
- })?;
- } // otherwise ignore the leftover
- else { off += rsize as u64; }
- },
+ end: (fid << file_pool.file_nbit) + off,
+ },
+ )?;
+ }
+ // otherwise ignore the leftover
+ else {
+ off += rsize as u64;
+ }
+ }
WALRingType::Null => break,
}
let block_remain = block_size - (off & (block_size - 1));
@@ -396,10 +483,13 @@ impl WALLoader {
file_pool.remove_file(fid)?;
}
file_pool.reset();
- Ok(WALWriter::new(WALState {
- first_fid: 0,
- next: 0,
- file_nbit: file_pool.file_nbit,
- }, file_pool))
+ Ok(WALWriter::new(
+ WALState {
+ first_fid: 0,
+ next: 0,
+ file_nbit: file_pool.file_nbit,
+ },
+ file_pool,
+ ))
}
}
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index 76533eb..f230edb 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -1,81 +1,92 @@
#[cfg(test)]
#[allow(dead_code)]
-
extern crate growthring;
-use growthring::wal::{WALFile, WALStore, WALLoader, WALPos, WALBytes, WALRingId};
-use indexmap::{IndexMap, map::Entry};
+use growthring::wal::{
+ WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore,
+};
+use indexmap::{map::Entry, IndexMap};
use rand::Rng;
-use std::collections::{HashMap, hash_map};
use std::cell::RefCell;
-use std::rc::Rc;
-use std::convert::TryInto;
use std::collections::VecDeque;
+use std::collections::{hash_map, HashMap};
+use std::convert::TryInto;
+use std::rc::Rc;
-thread_local! {
- //pub static RNG: RefCell<rand::rngs::StdRng> = RefCell::new(<rand::rngs::StdRng as rand::SeedableRng>::from_seed([0; 32]));
- pub static RNG: RefCell<rand::rngs::ThreadRng> = RefCell::new(rand::thread_rng());
-}
-
-/*
-pub fn gen_rand_letters(i: usize) -> String {
- RNG.with(|rng| {
- (0..i).map(|_| (rng.borrow_mut().gen_range(0, 26) + 'a' as u8) as char).collect()
- })
+pub trait FailGen {
+ fn next_fail(&self) -> bool;
}
-*/
struct FileContentEmul(RefCell<Vec<u8>>);
impl FileContentEmul {
- pub fn new() -> Self { FileContentEmul(RefCell::new(Vec::new())) }
+ pub fn new() -> Self {
+ FileContentEmul(RefCell::new(Vec::new()))
+ }
}
impl std::ops::Deref for FileContentEmul {
type Target = RefCell<Vec<u8>>;
- fn deref(&self) -> &Self::Target {&self.0}
-}
-
-pub trait FailGen {
- fn next_fail(&self) -> bool;
+ fn deref(&self) -> &Self::Target {
+ &self.0
+ }
}
/// Emulate the a virtual file handle.
pub struct WALFileEmul<G: FailGen> {
file: Rc<FileContentEmul>,
- fgen: Rc<G>
+ fgen: Rc<G>,
}
impl<G: FailGen> WALFile for WALFileEmul<G> {
fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> {
- if self.fgen.next_fail() { return Err(()) }
+ if self.fgen.next_fail() {
+ return Err(());
+ }
let offset = offset as usize;
if offset + length > self.file.borrow().len() {
self.file.borrow_mut().resize(offset + length, 0)
}
- for v in &mut self.file.borrow_mut()[offset..offset + length] { *v = 0 }
+ for v in &mut self.file.borrow_mut()[offset..offset + length] {
+ *v = 0
+ }
Ok(())
}
fn truncate(&self, length: usize) -> Result<(), ()> {
- if self.fgen.next_fail() { return Err(()) }
+ if self.fgen.next_fail() {
+ return Err(());
+ }
self.file.borrow_mut().resize(length, 0);
Ok(())
}
fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
- if self.fgen.next_fail() { return Err(()) }
+ if self.fgen.next_fail() {
+ return Err(());
+ }
let offset = offset as usize;
- &self.file.borrow_mut()[offset..offset + data.len()].copy_from_slice(&data);
+ &self.file.borrow_mut()[offset..offset + data.len()]
+ .copy_from_slice(&data);
Ok(())
}
- fn read(&self, offset: WALPos, length: usize) -> Result<Option<WALBytes>, ()> {
- if self.fgen.next_fail() { return Err(()) }
+ fn read(
+ &self,
+ offset: WALPos,
+ length: usize,
+ ) -> Result<Option<WALBytes>, ()> {
+ if self.fgen.next_fail() {
+ return Err(());
+ }
+
let offset = offset as usize;
let file = self.file.borrow();
- if offset + length > file.len() { Ok(None) }
- else {
- Ok(Some((&file[offset..offset + length]).to_vec().into_boxed_slice()))
+ if offset + length > file.len() {
+ Ok(None)
+ } else {
+ Ok(Some(
+ (&file[offset..offset + length]).to_vec().into_boxed_slice(),
+ ))
}
}
}
@@ -85,61 +96,87 @@ pub struct WALStoreEmulState {
}
impl WALStoreEmulState {
- pub fn new() -> Self { WALStoreEmulState { files: HashMap::new() } }
+ pub fn new() -> Self {
+ WALStoreEmulState {
+ files: HashMap::new(),
+ }
+ }
}
/// Emulate the persistent storage state.
pub struct WALStoreEmul<'a, G, F>
where
G: FailGen,
- F: FnMut(WALBytes, WALRingId) {
+ F: FnMut(WALBytes, WALRingId),
+{
state: &'a mut WALStoreEmulState,
fgen: Rc<G>,
- recover: F
+ recover: F,
}
impl<'a, G: FailGen, F: FnMut(WALBytes, WALRingId)> WALStoreEmul<'a, G, F> {
- pub fn new(state: &'a mut WALStoreEmulState, fgen: Rc<G>,
- recover: F) -> Self {
+ pub fn new(
+ state: &'a mut WALStoreEmulState,
+ fgen: Rc<G>,
+ recover: F,
+ ) -> Self {
WALStoreEmul {
state,
fgen,
- recover
+ recover,
}
}
}
-impl<'a, G, F> WALStore for WALStoreEmul<'a, G, F>
+impl<'a, G, F> WALStore for WALStoreEmul<'a, G, F>
where
- G: 'static + FailGen, F: FnMut(WALBytes, WALRingId) {
+ G: 'static + FailGen,
+ F: FnMut(WALBytes, WALRingId),
+{
type FileNameIter = std::vec::IntoIter<String>;
- fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()> {
- if self.fgen.next_fail() { return Err(()) }
+ fn open_file(
+ &mut self,
+ filename: &str,
+ touch: bool,
+ ) -> Result<Box<dyn WALFile>, ()> {
+ if self.fgen.next_fail() {
+ return Err(());
+ }
match self.state.files.entry(filename.to_string()) {
hash_map::Entry::Occupied(e) => Ok(Box::new(WALFileEmul {
file: e.get().clone(),
- fgen: self.fgen.clone()
+ fgen: self.fgen.clone(),
})),
- hash_map::Entry::Vacant(e) => if touch {
- Ok(Box::new(WALFileEmul {
- file: e.insert(Rc::new(FileContentEmul::new())).clone(),
- fgen: self.fgen.clone()
- }))
- } else {
- Err(())
+ hash_map::Entry::Vacant(e) => {
+ if touch {
+ Ok(Box::new(WALFileEmul {
+ file: e.insert(Rc::new(FileContentEmul::new())).clone(),
+ fgen: self.fgen.clone(),
+ }))
+ } else {
+ Err(())
+ }
}
}
}
fn remove_file(&mut self, filename: &str) -> Result<(), ()> {
//println!("remove_file(filename={})", filename);
- if self.fgen.next_fail() { return Err(()) }
- self.state.files.remove(filename).ok_or(()).and_then(|_| Ok(()))
+ if self.fgen.next_fail() {
+ return Err(());
+ }
+ self.state
+ .files
+ .remove(filename)
+ .ok_or(())
+ .and_then(|_| Ok(()))
}
fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
- if self.fgen.next_fail() { return Err(()) }
+ if self.fgen.next_fail() {
+ return Err(());
+ }
let mut logfiles = Vec::new();
for (fname, _) in self.state.files.iter() {
logfiles.push(fname.clone())
@@ -147,11 +184,19 @@ where
Ok(logfiles.into_iter())
}
- fn apply_payload(&mut self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()> {
- if self.fgen.next_fail() { return Err(()) }
- //println!("apply_payload(payload=0x{}, ringid={:?})",
- // hex::encode(&payload),
- // ringid);
+ fn apply_payload(
+ &mut self,
+ payload: WALBytes,
+ ringid: WALRingId,
+ ) -> Result<(), ()> {
+ if self.fgen.next_fail() {
+ return Err(());
+ }
+ /*
+ println!("apply_payload(payload=0x{}, ringid={:?})",
+ hex::encode(&payload),
+ ringid);
+ */
(self.recover)(payload, ringid);
Ok(())
}
@@ -159,14 +204,14 @@ where
pub struct SingleFailGen {
cnt: std::cell::Cell<usize>,
- fail_point: usize
+ fail_point: usize,
}
impl SingleFailGen {
pub fn new(fail_point: usize) -> Self {
SingleFailGen {
cnt: std::cell::Cell::new(0),
- fail_point
+ fail_point,
}
}
}
@@ -182,14 +227,20 @@ impl FailGen for SingleFailGen {
pub struct ZeroFailGen;
impl FailGen for ZeroFailGen {
- fn next_fail(&self) -> bool { false }
+ fn next_fail(&self) -> bool {
+ false
+ }
}
pub struct CountFailGen(std::cell::Cell<usize>);
impl CountFailGen {
- pub fn new() -> Self { CountFailGen(std::cell::Cell::new(0)) }
- pub fn get_count(&self) -> usize { self.0.get() }
+ pub fn new() -> Self {
+ CountFailGen(std::cell::Cell::new(0))
+ }
+ pub fn get_count(&self) -> usize {
+ self.0.get()
+ }
}
impl FailGen for CountFailGen {
@@ -203,8 +254,12 @@ impl FailGen for CountFailGen {
pub struct PaintStrokes(Vec<(u32, u32, u32)>);
impl PaintStrokes {
- pub fn new() -> Self { PaintStrokes(Vec::new()) }
- pub fn clone(&self) -> Self { PaintStrokes(self.0.clone()) }
+ pub fn new() -> Self {
+ PaintStrokes(Vec::new())
+ }
+ pub fn clone(&self) -> Self {
+ PaintStrokes(self.0.clone())
+ }
pub fn to_bytes(&self) -> WALBytes {
let mut res: Vec<u8> = Vec::new();
let is = std::mem::size_of::<u32>();
@@ -233,31 +288,41 @@ impl PaintStrokes {
let (s_raw, rest1) = rest.split_at(is);
let (e_raw, rest2) = rest1.split_at(is);
let (c_raw, rest3) = rest2.split_at(is);
- res.push((u32::from_le_bytes(s_raw.try_into().unwrap()),
- u32::from_le_bytes(e_raw.try_into().unwrap()),
- u32::from_le_bytes(c_raw.try_into().unwrap())));
+ res.push((
+ u32::from_le_bytes(s_raw.try_into().unwrap()),
+ u32::from_le_bytes(e_raw.try_into().unwrap()),
+ u32::from_le_bytes(c_raw.try_into().unwrap()),
+ ));
rest = rest3
}
PaintStrokes(res)
}
- pub fn gen_rand<R: rand::Rng>(max_pos: u32, max_len: u32,
- max_col: u32, n: usize, rng: &mut R) -> PaintStrokes {
+ pub fn gen_rand<R: rand::Rng>(
+ max_pos: u32,
+ max_len: u32,
+ max_col: u32,
+ n: usize,
+ rng: &mut R,
+ ) -> PaintStrokes {
assert!(max_pos > 0);
let mut strokes = Self::new();
for _ in 0..n {
let pos = rng.gen_range(0, max_pos);
- let len = rng.gen_range(1, std::cmp::min(max_len, max_pos - pos + 1));
+ let len =
+ rng.gen_range(1, std::cmp::min(max_len, max_pos - pos + 1));
strokes.stroke(pos, pos + len, rng.gen_range(0, max_col))
}
strokes
}
-
+
pub fn stroke(&mut self, start: u32, end: u32, color: u32) {
self.0.push((start, end, color))
}
- pub fn into_vec(self) -> Vec<(u32, u32, u32)> { self.0 }
+ pub fn into_vec(self) -> Vec<(u32, u32, u32)> {
+ self.0
+ }
}
#[test]
@@ -268,7 +333,10 @@ fn test_paint_strokes() {
}
let pr = p.to_bytes();
for ((s, e, c), i) in PaintStrokes::from_bytes(&pr)
- .into_vec().into_iter().zip(0..) {
+ .into_vec()
+ .into_iter()
+ .zip(0..)
+ {
assert_eq!(s, i);
assert_eq!(e, i + 3);
assert_eq!(c, i + 10);
@@ -278,7 +346,7 @@ fn test_paint_strokes() {
pub struct Canvas {
waiting: HashMap<WALRingId, usize>,
queue: IndexMap<u32, VecDeque<(u32, WALRingId)>>,
- canvas: Box<[u32]>
+ canvas: Box<[u32]>,
}
impl Canvas {
@@ -290,7 +358,7 @@ impl Canvas {
Canvas {
waiting: HashMap::new(),
queue: IndexMap::new(),
- canvas
+ canvas,
}
}
@@ -309,14 +377,14 @@ impl Canvas {
fn get_waiting(&mut self, rid: WALRingId) -> &mut usize {
match self.waiting.entry(rid) {
hash_map::Entry::Occupied(e) => e.into_mut(),
- hash_map::Entry::Vacant(e) => e.insert(0)
+ hash_map::Entry::Vacant(e) => e.insert(0),
}
}
fn get_queued(&mut self, pos: u32) -> &mut VecDeque<(u32, WALRingId)> {
match self.queue.entry(pos) {
Entry::Occupied(e) => e.into_mut(),
- Entry::Vacant(e) => e.insert(VecDeque::new())
+ Entry::Vacant(e) => e.insert(VecDeque::new()),
}
}
@@ -335,8 +403,13 @@ impl Canvas {
// TODO: allow customized scheduler
/// Schedule to paint one position, randomly. It optionally returns a finished batch write
/// identified by its start position of WALRingId.
- pub fn rand_paint<R: rand::Rng>(&mut self, rng: &mut R) -> Option<(Option<WALRingId>, u32)> {
- if self.is_empty() { return None }
+ pub fn rand_paint<R: rand::Rng>(
+ &mut self,
+ rng: &mut R,
+ ) -> Option<(Option<WALRingId>, u32)> {
+ if self.is_empty() {
+ return None;
+ }
let idx = rng.gen_range(0, self.queue.len());
let (pos, _) = self.queue.get_index_mut(idx).unwrap();
let pos = *pos;
@@ -355,18 +428,24 @@ impl Canvas {
self.clear_queued()
}
- pub fn is_empty(&self) -> bool { self.queue.is_empty() }
+ pub fn is_empty(&self) -> bool {
+ self.queue.is_empty()
+ }
pub fn paint(&mut self, pos: u32) -> Option<WALRingId> {
let q = self.queue.get_mut(&pos).unwrap();
let (c, rid) = q.pop_front().unwrap();
- if q.is_empty() { self.queue.remove(&pos); }
+ if q.is_empty() {
+ self.queue.remove(&pos);
+ }
self.canvas[pos as usize] = c;
let cnt = self.waiting.get_mut(&rid).unwrap();
*cnt -= 1;
if *cnt == 0 {
Some(rid)
- } else { None }
+ } else {
+ None
+ }
}
pub fn is_same(&self, other: &Canvas) -> bool {
@@ -387,15 +466,13 @@ impl Canvas {
#[test]
fn test_canvas() {
+ let mut rng = <rand::rngs::StdRng as rand::SeedableRng>::seed_from_u64(42);
let mut canvas1 = Canvas::new(100);
let mut canvas2 = Canvas::new(100);
let canvas3 = Canvas::new(101);
let dummy = WALRingId::empty_id();
- let (s1, s2) = RNG.with(|rng| {
- let rng = &mut *rng.borrow_mut();
- (PaintStrokes::gen_rand(100, 10, 256, 2, rng),
- PaintStrokes::gen_rand(100, 10, 256, 2, rng))
- });
+ let s1 = PaintStrokes::gen_rand(100, 10, 256, 2, &mut rng);
+ let s2 = PaintStrokes::gen_rand(100, 10, 256, 2, &mut rng);
assert!(canvas1.is_same(&canvas2));
assert!(!canvas2.is_same(&canvas3));
canvas1.prepaint(&s1, &dummy);
@@ -403,15 +480,14 @@ fn test_canvas() {
canvas2.prepaint(&s1, &dummy);
canvas2.prepaint(&s2, &dummy);
assert!(canvas1.is_same(&canvas2));
- RNG.with(|rng| canvas1.rand_paint(&mut *rng.borrow_mut()));
+ canvas1.rand_paint(&mut rng);
assert!(!canvas1.is_same(&canvas2));
- RNG.with(|rng| while let Some(_) = canvas1.rand_paint(&mut *rng.borrow_mut()) {});
- RNG.with(|rng| while let Some(_) = canvas2.rand_paint(&mut *rng.borrow_mut()) {});
+ while let Some(_) = canvas1.rand_paint(&mut rng) {}
+ while let Some(_) = canvas2.rand_paint(&mut rng) {}
assert!(canvas1.is_same(&canvas2));
canvas1.print(10);
}
-
pub struct PaintingSim {
pub block_nbit: u8,
pub file_nbit: u8,
@@ -431,27 +507,36 @@ pub struct PaintingSim {
/// max number of strokes per PaintStroke
pub stroke_max_n: usize,
/// random seed
- pub seed: u64
+ pub seed: u64,
}
-
impl PaintingSim {
fn run<G: 'static + FailGen>(
- &self,
- state: &mut WALStoreEmulState, canvas: &mut Canvas, wal: WALLoader,
- ops: &mut Vec<PaintStrokes>, ringid_map: &mut HashMap<WALRingId, usize>,
- fgen: Rc<G>) -> Result<(), ()> {
- let mut rng = <rand::rngs::StdRng as rand::SeedableRng>::seed_from_u64(self.seed);
- let mut wal = wal.recover(WALStoreEmul::new(state, fgen, |_, _|{}))?;
+ &self,
+ state: &mut WALStoreEmulState,
+ canvas: &mut Canvas,
+ wal: WALLoader,
+ ops: &mut Vec<PaintStrokes>,
+ ringid_map: &mut HashMap<WALRingId, usize>,
+ fgen: Rc<G>,
+ ) -> Result<(), ()> {
+ let mut rng =
+ <rand::rngs::StdRng as rand::SeedableRng>::seed_from_u64(self.seed);
+ let mut wal = wal.recover(WALStoreEmul::new(state, fgen, |_, _| {}))?;
for _ in 0..self.n {
- let pss = (0..self.m).map(|_|
- PaintStrokes::gen_rand(
- self.csize as u32,
- self.stroke_max_len,
- self.stroke_max_col,
- rng.gen_range(1, self.stroke_max_n + 1), &mut rng))
- .collect::<Vec<PaintStrokes>>();
- let payloads = pss.iter().map(|e| e.to_bytes()).collect::<Vec<WALBytes>>();
+ let pss = (0..self.m)
+ .map(|_| {
+ PaintStrokes::gen_rand(
+ self.csize as u32,
+ self.stroke_max_len,
+ self.stroke_max_col,
+ rng.gen_range(1, self.stroke_max_n + 1),
+ &mut rng,
+ )
+ })
+ .collect::<Vec<PaintStrokes>>();
+ let payloads =
+ pss.iter().map(|e| e.to_bytes()).collect::<Vec<WALBytes>>();
// write ahead
let (rids, ok) = wal.grow(payloads);
// keep track of the operations
@@ -476,7 +561,9 @@ impl PaintingSim {
if let Some(rid) = fin_rid {
wal.peel(&[rid])?
}
- } else { break }
+ } else {
+ break;
+ }
}
}
// keep running until all operations are finished
@@ -499,22 +586,43 @@ impl PaintingSim {
let mut ops: Vec<PaintStrokes> = Vec::new();
let mut ringid_map = HashMap::new();
let fgen = Rc::new(CountFailGen::new());
- self.run(&mut state, &mut canvas, self.get_walloader(), &mut ops, &mut ringid_map, fgen.clone()).unwrap();
+ self.run(
+ &mut state,
+ &mut canvas,
+ self.get_walloader(),
+ &mut ops,
+ &mut ringid_map,
+ fgen.clone(),
+ )
+ .unwrap();
fgen.get_count()
}
- fn check(state: &mut WALStoreEmulState, canvas: &mut Canvas,
- wal: WALLoader,
- ops: &Vec<PaintStrokes>, ringid_map: &HashMap<WALRingId, usize>) -> bool {
- if ops.is_empty() { return true }
+ fn check(
+ state: &mut WALStoreEmulState,
+ canvas: &mut Canvas,
+ wal: WALLoader,
+ ops: &Vec<PaintStrokes>,
+ ringid_map: &HashMap<WALRingId, usize>,
+ ) -> bool {
+ if ops.is_empty() {
+ return true;
+ }
let mut last_idx = 0;
canvas.clear_queued();
- wal.recover(WALStoreEmul::new(state, Rc::new(ZeroFailGen), |payload, ringid| {
- let s = PaintStrokes::from_bytes(&payload);
- canvas.prepaint(&s, &ringid);
- if ringid_map.get(&ringid).is_none() { println!("{:?}", ringid) }
- last_idx = *ringid_map.get(&ringid).unwrap() + 1;
- })).unwrap();
+ wal.recover(WALStoreEmul::new(
+ state,
+ Rc::new(ZeroFailGen),
+ |payload, ringid| {
+ let s = PaintStrokes::from_bytes(&payload);
+ canvas.prepaint(&s, &ringid);
+ if ringid_map.get(&ringid).is_none() {
+ println!("{:?}", ringid)
+ }
+ last_idx = *ringid_map.get(&ringid).unwrap() + 1;
+ },
+ ))
+ .unwrap();
println!("last = {}/{}", last_idx, ops.len());
canvas.paint_all();
// recover complete
@@ -532,9 +640,25 @@ impl PaintingSim {
let mut canvas = Canvas::new(self.csize);
let mut ops: Vec<PaintStrokes> = Vec::new();
let mut ringid_map = HashMap::new();
- if self.run(&mut state, &mut canvas, self.get_walloader(), &mut ops, &mut ringid_map, Rc::new(fgen)).is_err() {
- if !Self::check(&mut state, &mut canvas, self.get_walloader(), &ops, &ringid_map) {
- return false
+ if self
+ .run(
+ &mut state,
+ &mut canvas,
+ self.get_walloader(),
+ &mut ops,
+ &mut ringid_map,
+ Rc::new(fgen),
+ )
+ .is_err()
+ {
+ if !Self::check(
+ &mut state,
+ &mut canvas,
+ self.get_walloader(),
+ &ops,
+ &ringid_map,
+ ) {
+ return false;
}
}
true
diff --git a/tests/rand_fail.rs b/tests/rand_fail.rs
index d129e4f..07edb39 100644
--- a/tests/rand_fail.rs
+++ b/tests/rand_fail.rs
@@ -1,5 +1,4 @@
#[cfg(test)]
-
mod common;
fn single_point_failure(sim: &common::PaintingSim) {
@@ -24,7 +23,7 @@ fn test_rand_fail() {
stroke_max_len: 10,
stroke_max_col: 256,
stroke_max_n: 5,
- seed: 0
+ seed: 0,
};
single_point_failure(&sim);
}