diff options
author | Determinant <[email protected]> | 2020-06-16 18:03:12 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-16 18:03:12 -0400 |
commit | 6ee65d83d983ecc35f596f516d2739e7a91b9efa (patch) | |
tree | 59c5912f7f6fa73151c2f8a98de72e1167d64eeb /src/wal.rs | |
parent | 1089cf85a74887430d5d21c42886a4a05bac2be9 (diff) |
improve WALLoader; verify CRC32
Diffstat (limited to 'src/wal.rs')
-rw-r--r-- | src/wal.rs | 109 |
1 files changed, 83 insertions, 26 deletions
@@ -1,5 +1,6 @@ use async_trait::async_trait; use futures::future::{self, FutureExt, TryFutureExt}; +use futures::executor::block_on; use std::cell::{RefCell, UnsafeCell}; use std::collections::{BinaryHeap, HashMap, hash_map}; use std::future::Future; @@ -558,31 +559,68 @@ impl<F: WALStore> WALWriter<F> { pub fn file_pool_in_use(&self) -> usize { self.file_pool.in_use_len() } } +#[derive(Copy, Clone)] +pub enum RecoverPolicy { + /// all checksums must be correct, otherwise recovery fails + Strict, + /// stop recovering when hitting the first corrupted record + BestEffort +} + pub struct WALLoader { file_nbit: u8, block_nbit: u8, cache_size: usize, - msize: usize, - filename_fmt: regex::Regex, + recover_policy: RecoverPolicy, } impl WALLoader { - pub fn new(file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { - let msize = std::mem::size_of::<WALRingBlob>(); - assert!(file_nbit > block_nbit); - assert!(msize < 1 << block_nbit); - let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap(); + pub fn new() -> Self { WALLoader { - file_nbit, - block_nbit, - cache_size, - msize, - filename_fmt, + file_nbit: 22, // 4MB + block_nbit: 15, // 32KB, + cache_size: 16, + recover_policy: RecoverPolicy::Strict + } + } + + pub fn file_nbit(&mut self, v: u8) -> &mut Self { + self.file_nbit = v; + self + } + + pub fn block_nbit(&mut self, v: u8) -> &mut Self { + self.block_nbit = v; + self + } + + pub fn cache_size(&mut self, v: usize) -> &mut Self { + self.cache_size = v; + self + } + + pub fn recover_policy(&mut self, p: RecoverPolicy) -> &mut Self { + self.recover_policy = p; + self + } + + fn verify_checksum(&self, data: &[u8], checksum: u32) -> Result<bool, ()> { + if checksum == crc::crc32::checksum_ieee(data) { + Ok(true) + } else { + match self.recover_policy { + RecoverPolicy::Strict => Err(()), + RecoverPolicy::BestEffort => Ok(false), + } } } /// Recover by reading the WAL files. - pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> { + pub fn load<F: WALStore>(&self, store: F) -> Result<WALWriter<F>, ()> { + let msize = std::mem::size_of::<WALRingBlob>(); + assert!(self.file_nbit > self.block_nbit); + assert!(msize < 1 << self.block_nbit); + let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap(); let mut file_pool = WALFilePool::new( store, self.file_nbit, @@ -590,20 +628,24 @@ impl WALLoader { self.cache_size, ); let block_size = 1 << file_pool.block_nbit; - let msize = self.msize as u32; let mut logfiles: Vec<String> = file_pool .store .enumerate_files()? - .filter(|f| self.filename_fmt.is_match(f)) + .filter(|f| filename_fmt.is_match(f)) .collect(); // TODO: check for missing logfiles logfiles.sort(); let mut chunks = None; + let mut skip = false; for fname in logfiles.into_iter() { let fid = file_pool.get_fid(&fname); - let f = - futures::executor::block_on(file_pool.get_file(fid, false))?; + let f = block_on(file_pool.get_file(fid, false))?; let mut off = 0; + if skip { + f.truncate(0)?; + block_on(file_pool.store.remove_file(fname))?; + continue + } while let Some(header_raw) = f.read(off, msize as usize)? { let ringid_start = (fid << file_pool.file_nbit) + off; off += msize as u64; @@ -617,6 +659,11 @@ impl WALLoader { WALRingType::Full => { assert!(chunks.is_none()); let payload = f.read(off, rsize as usize)?.ok_or(())?; + // TODO: improve the behavior when CRC32 fails + if !self.verify_checksum(&payload, header.crc32)? { + skip = true; + break + } off += rsize as u64; file_pool.store.apply_payload( payload, @@ -628,25 +675,35 @@ impl WALLoader { } WALRingType::First => { assert!(chunks.is_none()); - chunks = Some(( - vec![f.read(off, rsize as usize)?.ok_or(())?], - ringid_start, - )); + let chunk = f.read(off, rsize as usize)?.ok_or(())?; + if !self.verify_checksum(&chunk, header.crc32)? { + skip = true; + break + } + chunks = Some((vec![chunk], ringid_start)); off += rsize as u64; } WALRingType::Middle => { if let Some((chunks, _)) = &mut chunks { - chunks - .push(f.read(off, rsize as usize)?.ok_or(())?); + let chunk = f.read(off, rsize as usize)?.ok_or(())?; + if !self.verify_checksum(&chunk, header.crc32)? { + skip = true; + break + } + chunks.push(chunk); } // otherwise ignore the leftover off += rsize as u64; } WALRingType::Last => { if let Some((mut chunks, ringid_start)) = chunks.take() { - chunks - .push(f.read(off, rsize as usize)?.ok_or(())?); + let chunk = f.read(off, rsize as usize)?.ok_or(())?; off += rsize as u64; + if !self.verify_checksum(&chunk, header.crc32)? { + skip = true; + break + } + chunks.push(chunk); let mut payload = Vec::new(); payload.resize( chunks.iter().fold(0, |acc, v| acc + v.len()), @@ -678,7 +735,7 @@ impl WALLoader { } } f.truncate(0)?; - futures::executor::block_on(file_pool.store.remove_file(fname))?; + block_on(file_pool.store.remove_file(fname))?; } file_pool.reset(); Ok(WALWriter::new( |