aboutsummaryrefslogtreecommitdiff
path: root/src/wal.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wal.rs')
-rw-r--r--src/wal.rs109
1 files changed, 83 insertions, 26 deletions
diff --git a/src/wal.rs b/src/wal.rs
index aefec36..f1a6e85 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use futures::future::{self, FutureExt, TryFutureExt};
+use futures::executor::block_on;
use std::cell::{RefCell, UnsafeCell};
use std::collections::{BinaryHeap, HashMap, hash_map};
use std::future::Future;
@@ -558,31 +559,68 @@ impl<F: WALStore> WALWriter<F> {
pub fn file_pool_in_use(&self) -> usize { self.file_pool.in_use_len() }
}
+#[derive(Copy, Clone)]
+pub enum RecoverPolicy {
+ /// all checksums must be correct, otherwise recovery fails
+ Strict,
+ /// stop recovering when hitting the first corrupted record
+ BestEffort
+}
+
pub struct WALLoader {
file_nbit: u8,
block_nbit: u8,
cache_size: usize,
- msize: usize,
- filename_fmt: regex::Regex,
+ recover_policy: RecoverPolicy,
}
impl WALLoader {
- pub fn new(file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
- let msize = std::mem::size_of::<WALRingBlob>();
- assert!(file_nbit > block_nbit);
- assert!(msize < 1 << block_nbit);
- let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap();
+ pub fn new() -> Self {
WALLoader {
- file_nbit,
- block_nbit,
- cache_size,
- msize,
- filename_fmt,
+ file_nbit: 22, // 4MB
+ block_nbit: 15, // 32KB,
+ cache_size: 16,
+ recover_policy: RecoverPolicy::Strict
+ }
+ }
+
+ pub fn file_nbit(&mut self, v: u8) -> &mut Self {
+ self.file_nbit = v;
+ self
+ }
+
+ pub fn block_nbit(&mut self, v: u8) -> &mut Self {
+ self.block_nbit = v;
+ self
+ }
+
+ pub fn cache_size(&mut self, v: usize) -> &mut Self {
+ self.cache_size = v;
+ self
+ }
+
+ pub fn recover_policy(&mut self, p: RecoverPolicy) -> &mut Self {
+ self.recover_policy = p;
+ self
+ }
+
+ fn verify_checksum(&self, data: &[u8], checksum: u32) -> Result<bool, ()> {
+ if checksum == crc::crc32::checksum_ieee(data) {
+ Ok(true)
+ } else {
+ match self.recover_policy {
+ RecoverPolicy::Strict => Err(()),
+ RecoverPolicy::BestEffort => Ok(false),
+ }
}
}
/// Recover by reading the WAL files.
- pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> {
+ pub fn load<F: WALStore>(&self, store: F) -> Result<WALWriter<F>, ()> {
+ let msize = std::mem::size_of::<WALRingBlob>();
+ assert!(self.file_nbit > self.block_nbit);
+ assert!(msize < 1 << self.block_nbit);
+ let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap();
let mut file_pool = WALFilePool::new(
store,
self.file_nbit,
@@ -590,20 +628,24 @@ impl WALLoader {
self.cache_size,
);
let block_size = 1 << file_pool.block_nbit;
- let msize = self.msize as u32;
let mut logfiles: Vec<String> = file_pool
.store
.enumerate_files()?
- .filter(|f| self.filename_fmt.is_match(f))
+ .filter(|f| filename_fmt.is_match(f))
.collect();
// TODO: check for missing logfiles
logfiles.sort();
let mut chunks = None;
+ let mut skip = false;
for fname in logfiles.into_iter() {
let fid = file_pool.get_fid(&fname);
- let f =
- futures::executor::block_on(file_pool.get_file(fid, false))?;
+ let f = block_on(file_pool.get_file(fid, false))?;
let mut off = 0;
+ if skip {
+ f.truncate(0)?;
+ block_on(file_pool.store.remove_file(fname))?;
+ continue
+ }
while let Some(header_raw) = f.read(off, msize as usize)? {
let ringid_start = (fid << file_pool.file_nbit) + off;
off += msize as u64;
@@ -617,6 +659,11 @@ impl WALLoader {
WALRingType::Full => {
assert!(chunks.is_none());
let payload = f.read(off, rsize as usize)?.ok_or(())?;
+ // TODO: improve the behavior when CRC32 fails
+ if !self.verify_checksum(&payload, header.crc32)? {
+ skip = true;
+ break
+ }
off += rsize as u64;
file_pool.store.apply_payload(
payload,
@@ -628,25 +675,35 @@ impl WALLoader {
}
WALRingType::First => {
assert!(chunks.is_none());
- chunks = Some((
- vec![f.read(off, rsize as usize)?.ok_or(())?],
- ringid_start,
- ));
+ let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ if !self.verify_checksum(&chunk, header.crc32)? {
+ skip = true;
+ break
+ }
+ chunks = Some((vec![chunk], ringid_start));
off += rsize as u64;
}
WALRingType::Middle => {
if let Some((chunks, _)) = &mut chunks {
- chunks
- .push(f.read(off, rsize as usize)?.ok_or(())?);
+ let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ if !self.verify_checksum(&chunk, header.crc32)? {
+ skip = true;
+ break
+ }
+ chunks.push(chunk);
} // otherwise ignore the leftover
off += rsize as u64;
}
WALRingType::Last => {
if let Some((mut chunks, ringid_start)) = chunks.take()
{
- chunks
- .push(f.read(off, rsize as usize)?.ok_or(())?);
+ let chunk = f.read(off, rsize as usize)?.ok_or(())?;
off += rsize as u64;
+ if !self.verify_checksum(&chunk, header.crc32)? {
+ skip = true;
+ break
+ }
+ chunks.push(chunk);
let mut payload = Vec::new();
payload.resize(
chunks.iter().fold(0, |acc, v| acc + v.len()),
@@ -678,7 +735,7 @@ impl WALLoader {
}
}
f.truncate(0)?;
- futures::executor::block_on(file_pool.store.remove_file(fname))?;
+ block_on(file_pool.store.remove_file(fname))?;
}
file_pool.reset();
Ok(WALWriter::new(