aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2020-06-16 18:03:12 -0400
committerDeterminant <tederminant@gmail.com>2020-06-16 18:03:12 -0400
commit6ee65d83d983ecc35f596f516d2739e7a91b9efa (patch)
tree59c5912f7f6fa73151c2f8a98de72e1167d64eeb
parent1089cf85a74887430d5d21c42886a4a05bac2be9 (diff)
improve WALLoader; verify CRC32
-rw-r--r--Cargo.toml2
-rw-r--r--examples/demo1.rs11
-rw-r--r--src/lib.rs10
-rw-r--r--src/wal.rs109
-rw-r--r--tests/common/mod.rs15
5 files changed, 106 insertions, 41 deletions
diff --git a/Cargo.toml b/Cargo.toml
index ca270ef..2c7f57e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -23,8 +23,6 @@ libc = "0.2.71"
[dev-dependencies]
hex = "0.4.2"
-libc = "0.2.44"
-nix = "0.17.0"
rand = "0.7.3"
indexmap = "1.4.0"
diff --git a/examples/demo1.rs b/examples/demo1.rs
index 4a923c6..e214177 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -1,5 +1,5 @@
use growthring::{
- wal::{WALBytes, WALLoader, WALRingId, WALWriter},
+ wal::{WALBytes, WALRingId, WALLoader, WALWriter},
WALStoreAIO,
};
use rand::{seq::SliceRandom, Rng};
@@ -29,8 +29,11 @@ fn recover(payload: WALBytes, ringid: WALRingId) -> Result<(), ()> {
fn main() {
let wal_dir = "./wal_demo1";
let mut rng = rand::thread_rng();
+ let mut loader = WALLoader::new();
+ loader.file_nbit(9).block_nbit(8);
+
let store = WALStoreAIO::new(&wal_dir, true, recover);
- let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
+ let mut wal = loader.load(store).unwrap();
for _ in 0..3 {
test(
["hi", "hello", "lol"]
@@ -48,7 +51,7 @@ fn main() {
}
let store = WALStoreAIO::new(&wal_dir, false, recover);
- let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
+ let mut wal = loader.load(store).unwrap();
for _ in 0..3 {
test(
vec![
@@ -62,7 +65,7 @@ fn main() {
}
let store = WALStoreAIO::new(&wal_dir, false, recover);
- let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
+ let mut wal = loader.load(store).unwrap();
for _ in 0..3 {
let mut ids = Vec::new();
for _ in 0..3 {
diff --git a/src/lib.rs b/src/lib.rs
index 61aeb72..447da08 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -5,9 +5,13 @@
//! ```
//! use growthring::{WALStoreAIO, wal::WALLoader};
//! use futures::executor::block_on;
+//! let mut loader = WALLoader::new();
+//! loader.file_nbit(9).block_nbit(8);
+//!
+//!
//! // Start with empty WAL (truncate = true).
//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())});
-//! let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
+//! let mut wal = loader.load(store).unwrap();
//! // Write a vector of records to WAL.
//! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() {
//! let ring_id = block_on(f).unwrap().1;
@@ -23,7 +27,7 @@
//! ringid);
//! Ok(())
//! });
-//! let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
+//! let mut wal = loader.load(store).unwrap();
//! // We saw some log playback, even there is no failure.
//! // Let's try to grow the WAL to create many files.
//! let ring_ids = wal.grow((0..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>())
@@ -37,7 +41,7 @@
//! println!("payload.len() = {}", payload.len());
//! Ok(())
//! });
-//! let wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
+//! let wal = loader.load(store).unwrap();
//! // After each recovery, the ./walfiles is empty.
//! ```
diff --git a/src/wal.rs b/src/wal.rs
index aefec36..f1a6e85 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use futures::future::{self, FutureExt, TryFutureExt};
+use futures::executor::block_on;
use std::cell::{RefCell, UnsafeCell};
use std::collections::{BinaryHeap, HashMap, hash_map};
use std::future::Future;
@@ -558,31 +559,68 @@ impl<F: WALStore> WALWriter<F> {
pub fn file_pool_in_use(&self) -> usize { self.file_pool.in_use_len() }
}
+#[derive(Copy, Clone)]
+pub enum RecoverPolicy {
+ /// all checksums must be correct, otherwise recovery fails
+ Strict,
+ /// stop recovering when hitting the first corrupted record
+ BestEffort
+}
+
pub struct WALLoader {
file_nbit: u8,
block_nbit: u8,
cache_size: usize,
- msize: usize,
- filename_fmt: regex::Regex,
+ recover_policy: RecoverPolicy,
}
impl WALLoader {
- pub fn new(file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
- let msize = std::mem::size_of::<WALRingBlob>();
- assert!(file_nbit > block_nbit);
- assert!(msize < 1 << block_nbit);
- let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap();
+ pub fn new() -> Self {
WALLoader {
- file_nbit,
- block_nbit,
- cache_size,
- msize,
- filename_fmt,
+ file_nbit: 22, // 4MB
+ block_nbit: 15, // 32KB,
+ cache_size: 16,
+ recover_policy: RecoverPolicy::Strict
+ }
+ }
+
+ pub fn file_nbit(&mut self, v: u8) -> &mut Self {
+ self.file_nbit = v;
+ self
+ }
+
+ pub fn block_nbit(&mut self, v: u8) -> &mut Self {
+ self.block_nbit = v;
+ self
+ }
+
+ pub fn cache_size(&mut self, v: usize) -> &mut Self {
+ self.cache_size = v;
+ self
+ }
+
+ pub fn recover_policy(&mut self, p: RecoverPolicy) -> &mut Self {
+ self.recover_policy = p;
+ self
+ }
+
+ fn verify_checksum(&self, data: &[u8], checksum: u32) -> Result<bool, ()> {
+ if checksum == crc::crc32::checksum_ieee(data) {
+ Ok(true)
+ } else {
+ match self.recover_policy {
+ RecoverPolicy::Strict => Err(()),
+ RecoverPolicy::BestEffort => Ok(false),
+ }
}
}
/// Recover by reading the WAL files.
- pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> {
+ pub fn load<F: WALStore>(&self, store: F) -> Result<WALWriter<F>, ()> {
+ let msize = std::mem::size_of::<WALRingBlob>();
+ assert!(self.file_nbit > self.block_nbit);
+ assert!(msize < 1 << self.block_nbit);
+ let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap();
let mut file_pool = WALFilePool::new(
store,
self.file_nbit,
@@ -590,20 +628,24 @@ impl WALLoader {
self.cache_size,
);
let block_size = 1 << file_pool.block_nbit;
- let msize = self.msize as u32;
let mut logfiles: Vec<String> = file_pool
.store
.enumerate_files()?
- .filter(|f| self.filename_fmt.is_match(f))
+ .filter(|f| filename_fmt.is_match(f))
.collect();
// TODO: check for missing logfiles
logfiles.sort();
let mut chunks = None;
+ let mut skip = false;
for fname in logfiles.into_iter() {
let fid = file_pool.get_fid(&fname);
- let f =
- futures::executor::block_on(file_pool.get_file(fid, false))?;
+ let f = block_on(file_pool.get_file(fid, false))?;
let mut off = 0;
+ if skip {
+ f.truncate(0)?;
+ block_on(file_pool.store.remove_file(fname))?;
+ continue
+ }
while let Some(header_raw) = f.read(off, msize as usize)? {
let ringid_start = (fid << file_pool.file_nbit) + off;
off += msize as u64;
@@ -617,6 +659,11 @@ impl WALLoader {
WALRingType::Full => {
assert!(chunks.is_none());
let payload = f.read(off, rsize as usize)?.ok_or(())?;
+ // TODO: improve the behavior when CRC32 fails
+ if !self.verify_checksum(&payload, header.crc32)? {
+ skip = true;
+ break
+ }
off += rsize as u64;
file_pool.store.apply_payload(
payload,
@@ -628,25 +675,35 @@ impl WALLoader {
}
WALRingType::First => {
assert!(chunks.is_none());
- chunks = Some((
- vec![f.read(off, rsize as usize)?.ok_or(())?],
- ringid_start,
- ));
+ let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ if !self.verify_checksum(&chunk, header.crc32)? {
+ skip = true;
+ break
+ }
+ chunks = Some((vec![chunk], ringid_start));
off += rsize as u64;
}
WALRingType::Middle => {
if let Some((chunks, _)) = &mut chunks {
- chunks
- .push(f.read(off, rsize as usize)?.ok_or(())?);
+ let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ if !self.verify_checksum(&chunk, header.crc32)? {
+ skip = true;
+ break
+ }
+ chunks.push(chunk);
} // otherwise ignore the leftover
off += rsize as u64;
}
WALRingType::Last => {
if let Some((mut chunks, ringid_start)) = chunks.take()
{
- chunks
- .push(f.read(off, rsize as usize)?.ok_or(())?);
+ let chunk = f.read(off, rsize as usize)?.ok_or(())?;
off += rsize as u64;
+ if !self.verify_checksum(&chunk, header.crc32)? {
+ skip = true;
+ break
+ }
+ chunks.push(chunk);
let mut payload = Vec::new();
payload.resize(
chunks.iter().fold(0, |acc, v| acc + v.len()),
@@ -678,7 +735,7 @@ impl WALLoader {
}
}
f.truncate(0)?;
- futures::executor::block_on(file_pool.store.remove_file(fname))?;
+ block_on(file_pool.store.remove_file(fname))?;
}
file_pool.reset();
Ok(WALWriter::new(
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index b61233b..247b2ee 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -3,7 +3,7 @@
#[allow(dead_code)]
use async_trait::async_trait;
use growthring::wal::{
- WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore,
+ WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, RecoverPolicy
};
use indexmap::{map::Entry, IndexMap};
use rand::Rng;
@@ -531,15 +531,14 @@ impl PaintingSim {
&self,
state: &mut WALStoreEmulState,
canvas: &mut Canvas,
- wal: WALLoader,
+ loader: WALLoader,
ops: &mut Vec<PaintStrokes>,
ringid_map: &mut HashMap<WALRingId, usize>,
fgen: Rc<G>,
) -> Result<(), ()> {
let mut rng =
<rand::rngs::StdRng as rand::SeedableRng>::seed_from_u64(self.seed);
- let mut wal =
- wal.recover(WALStoreEmul::new(state, fgen.clone(), |_, _| {}))?;
+ let mut wal = loader.load(WALStoreEmul::new(state, fgen.clone(), |_, _| {}))?;
for _ in 0..self.n {
let pss = (0..self.m)
.map(|_| {
@@ -597,7 +596,11 @@ impl PaintingSim {
}
pub fn get_walloader(&self) -> WALLoader {
- WALLoader::new(self.file_nbit, self.block_nbit, self.file_cache)
+ let mut loader = WALLoader::new();
+ loader.file_nbit(self.file_nbit)
+ .block_nbit(self.block_nbit)
+ .cache_size(self.file_cache);
+ loader
}
pub fn get_nticks(&self, state: &mut WALStoreEmulState) -> usize {
@@ -631,7 +634,7 @@ impl PaintingSim {
let mut last_idx = 0;
let mut napplied = 0;
canvas.clear_queued();
- wal.recover(WALStoreEmul::new(
+ wal.load(WALStoreEmul::new(
state,
Rc::new(ZeroFailGen),
|payload, ringid| {