diff options
Diffstat (limited to 'src/wal.rs')
-rw-r--r-- | src/wal.rs | 115 |
1 files changed, 73 insertions, 42 deletions
@@ -1,8 +1,8 @@ use async_trait::async_trait; -use futures::future::{self, FutureExt, TryFutureExt}; use futures::executor::block_on; +use futures::future::{self, FutureExt, TryFutureExt}; use std::cell::{RefCell, UnsafeCell}; -use std::collections::{BinaryHeap, HashMap, hash_map}; +use std::collections::{hash_map, BinaryHeap, HashMap}; use std::future::Future; use std::mem::MaybeUninit; use std::pin::Pin; @@ -67,15 +67,21 @@ pub trait Record { } impl Record for WALBytes { - fn serialize(&self) -> WALBytes { self[..].into() } + fn serialize(&self) -> WALBytes { + self[..].into() + } } impl Record for String { - fn serialize(&self) -> WALBytes { self.as_bytes().into() } + fn serialize(&self) -> WALBytes { + self.as_bytes().into() + } } impl Record for &str { - fn serialize(&self) -> WALBytes { self.as_bytes().into() } + fn serialize(&self) -> WALBytes { + self.as_bytes().into() + } } /// the state for a WAL writer @@ -124,14 +130,6 @@ pub trait WALStore { /// Enumerate all WAL filenames. It should include all WAL files that are previously opened /// (created) but not removed. The list could be unordered. fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>; - /// Apply the payload during recovery. An invocation of the callback waits the application for - /// redoing the given operation to ensure its state is consistent. We assume the necessary - /// changes by the payload has already been persistent when the callback returns. - fn apply_payload( - &self, - payload: WALBytes, - ringid: WALRingId, - ) -> Result<(), ()>; } struct WALFileHandle<'a, F: WALStore> { @@ -142,7 +140,9 @@ struct WALFileHandle<'a, F: WALStore> { impl<'a, F: WALStore> std::ops::Deref for WALFileHandle<'a, F> { type Target = dyn WALFile + 'a; - fn deref(&self) -> &Self::Target { self.handle } + fn deref(&self) -> &Self::Target { + self.handle + } } impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> { @@ -158,7 +158,8 @@ impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> { struct WALFilePool<F: WALStore> { store: F, handle_cache: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>, - handle_used: RefCell<HashMap<WALFileId, UnsafeCell<(Box<dyn WALFile>, usize)>>>, + handle_used: + RefCell<HashMap<WALFileId, UnsafeCell<(Box<dyn WALFile>, usize)>>>, last_write: UnsafeCell<MaybeUninit<Pin<Box<dyn Future<Output = Result<(), ()>>>>>>, last_peel: @@ -201,18 +202,33 @@ impl<F: WALStore> WALFilePool<F> { let pool = self as *const WALFilePool<F>; if let Some(h) = self.handle_cache.borrow_mut().pop(&fid) { let handle = match self.handle_used.borrow_mut().entry(fid) { - hash_map::Entry::Vacant(e) => unsafe {&*(*e.insert(UnsafeCell::new((h, 1))).get()).0}, + hash_map::Entry::Vacant(e) => unsafe { + &*(*e.insert(UnsafeCell::new((h, 1))).get()).0 + }, _ => unreachable!(), }; Ok(WALFileHandle { fid, handle, pool }) } else { - let v = unsafe{&mut *match self.handle_used.borrow_mut().entry(fid) { - hash_map::Entry::Occupied(e) => e.into_mut(), - hash_map::Entry::Vacant(e) => e.insert( - UnsafeCell::new((self.store.open_file(&Self::get_fname(fid), touch).await?, 0))) - }.get()}; + let v = unsafe { + &mut *match self.handle_used.borrow_mut().entry(fid) { + hash_map::Entry::Occupied(e) => e.into_mut(), + hash_map::Entry::Vacant(e) => { + e.insert(UnsafeCell::new(( + self.store + .open_file(&Self::get_fname(fid), touch) + .await?, + 0, + ))) + } + } + .get() + }; v.1 += 1; - Ok(WALFileHandle { fid, handle: &*v.0, pool }) + Ok(WALFileHandle { + fid, + handle: &*v.0, + pool, + }) } } } @@ -220,13 +236,15 @@ impl<F: WALStore> WALFilePool<F> { fn release_file(&self, fid: WALFileId) { match self.handle_used.borrow_mut().entry(fid) { hash_map::Entry::Occupied(e) => { - let v = unsafe{&mut *e.get().get()}; + let v = unsafe { &mut *e.get().get() }; v.1 -= 1; if v.1 == 0 { - self.handle_cache.borrow_mut().put(fid, e.remove().into_inner().0); + self.handle_cache + .borrow_mut() + .put(fid, e.remove().into_inner().0); } - }, - _ => unreachable!() + } + _ => unreachable!(), } } @@ -263,7 +281,12 @@ impl<F: WALStore> WALFilePool<F> { let alloc = async move { last_write.await?; let mut last_h: Option< - Pin<Box<dyn Future<Output = Result<WALFileHandle<'a, F>, ()>> + 'a>>, + Pin< + Box< + dyn Future<Output = Result<WALFileHandle<'a, F>, ()>> + + 'a, + >, + >, > = None; for ((next_fid, wl), h) in meta.into_iter().zip(files.into_iter()) { if let Some(lh) = last_h.take() { @@ -349,7 +372,7 @@ impl<F: WALStore> WALFilePool<F> { } fn in_use_len(&self) -> usize { - self.handle_used.borrow().len() + self.handle_used.borrow().len() } fn reset(&mut self) { @@ -556,7 +579,9 @@ impl<F: WALStore> WALWriter<F> { self.file_pool.remove_files(orig_fid, next_fid) } - pub fn file_pool_in_use(&self) -> usize { self.file_pool.in_use_len() } + pub fn file_pool_in_use(&self) -> usize { + self.file_pool.in_use_len() + } } #[derive(Copy, Clone)] @@ -564,7 +589,7 @@ pub enum RecoverPolicy { /// all checksums must be correct, otherwise recovery fails Strict, /// stop recovering when hitting the first corrupted record - BestEffort + BestEffort, } pub struct WALLoader { @@ -577,10 +602,10 @@ pub struct WALLoader { impl Default for WALLoader { fn default() -> Self { WALLoader { - file_nbit: 22, // 4MB + file_nbit: 22, // 4MB block_nbit: 15, // 32KB, cache_size: 16, - recover_policy: RecoverPolicy::Strict + recover_policy: RecoverPolicy::Strict, } } } @@ -622,7 +647,11 @@ impl WALLoader { } /// Recover by reading the WAL files. - pub fn load<F: WALStore>(&self, store: F) -> Result<WALWriter<F>, ()> { + pub fn load<S: WALStore, F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>( + &self, + store: S, + mut recover_func: F, + ) -> Result<WALWriter<S>, ()> { let msize = std::mem::size_of::<WALRingBlob>(); assert!(self.file_nbit > self.block_nbit); assert!(msize < 1 << self.block_nbit); @@ -650,7 +679,7 @@ impl WALLoader { if skip { f.truncate(0)?; block_on(file_pool.store.remove_file(fname))?; - continue + continue; } while let Some(header_raw) = f.read(off, msize as usize)? { let ringid_start = (fid << file_pool.file_nbit) + off; @@ -668,10 +697,10 @@ impl WALLoader { // TODO: improve the behavior when CRC32 fails if !self.verify_checksum(&payload, header.crc32)? { skip = true; - break + break; } off += rsize as u64; - file_pool.store.apply_payload( + recover_func( payload, WALRingId { start: ringid_start, @@ -684,17 +713,18 @@ impl WALLoader { let chunk = f.read(off, rsize as usize)?.ok_or(())?; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; - break + break; } chunks = Some((vec![chunk], ringid_start)); off += rsize as u64; } WALRingType::Middle => { if let Some((chunks, _)) = &mut chunks { - let chunk = f.read(off, rsize as usize)?.ok_or(())?; + let chunk = + f.read(off, rsize as usize)?.ok_or(())?; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; - break + break; } chunks.push(chunk); } // otherwise ignore the leftover @@ -703,11 +733,12 @@ impl WALLoader { WALRingType::Last => { if let Some((mut chunks, ringid_start)) = chunks.take() { - let chunk = f.read(off, rsize as usize)?.ok_or(())?; + let chunk = + f.read(off, rsize as usize)?.ok_or(())?; off += rsize as u64; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; - break + break; } chunks.push(chunk); let mut payload = Vec::new(); @@ -720,7 +751,7 @@ impl WALLoader { ps[..c.len()].copy_from_slice(&*c); ps = &mut ps[c.len()..]; } - file_pool.store.apply_payload( + recover_func( payload.into_boxed_slice(), WALRingId { start: ringid_start, |