From 395bd19d51c5f5e0bfd3b5897ddce5f6bef5ec79 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 25 Aug 2020 17:35:17 -0400 Subject: simplify API by moving `recover_func` to `load()` --- src/lib.rs | 39 +++++++-------------- src/wal.rs | 115 +++++++++++++++++++++++++++++++++++++++---------------------- 2 files changed, 85 insertions(+), 69 deletions(-) (limited to 'src') diff --git a/src/lib.rs b/src/lib.rs index be8993b..2a67cd3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,8 @@ //! //! //! // Start with empty WAL (truncate = true). -//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}, None).unwrap(); -//! let mut wal = loader.load(store).unwrap(); +//! let store = WALStoreAIO::new("./walfiles", true, None).unwrap(); +//! let mut wal = loader.load(store, |_, _| {Ok(())}).unwrap(); //! // Write a vector of records to WAL. //! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() { //! let ring_id = block_on(f).unwrap().1; @@ -20,14 +20,14 @@ //! //! //! // Load from WAL (truncate = false). -//! let store = WALStoreAIO::new("./walfiles", false, |payload, ringid| { +//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap(); +//! let mut wal = loader.load(store, |payload, ringid| { //! // redo the operations in your application //! println!("recover(payload={}, ringid={:?})", //! std::str::from_utf8(&payload).unwrap(), //! ringid); //! Ok(()) -//! }, None).unwrap(); -//! let mut wal = loader.load(store).unwrap(); +//! }).unwrap(); //! // We saw some log playback, even there is no failure. //! // Let's try to grow the WAL to create many files. //! let ring_ids = wal.grow((0..100).into_iter().map(|i| "a".repeat(i)).collect::>()) @@ -37,11 +37,11 @@ //! block_on(wal.peel(ring_ids)).unwrap(); //! // There will only be one remaining file in ./walfiles. //! -//! let store = WALStoreAIO::new("./walfiles", false, |payload, _| { +//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap(); +//! let wal = loader.load(store, |payload, _| { //! println!("payload.len() = {}", payload.len()); //! Ok(()) -//! }, None).unwrap(); -//! let wal = loader.load(store).unwrap(); +//! }).unwrap(); //! // After each recovery, the ./walfiles is empty. //! ``` @@ -55,10 +55,9 @@ use libc::off_t; use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag}; use nix::sys::stat::Mode; use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags}; -use std::cell::RefCell; use std::os::unix::io::RawFd; use std::rc::Rc; -use wal::{WALBytes, WALFile, WALPos, WALRingId, WALStore}; +use wal::{WALBytes, WALFile, WALPos, WALStore}; pub struct WALFileAIO { fd: RawFd, @@ -130,21 +129,18 @@ impl WALFile for WALFileAIO { } } -pub struct WALStoreAIO Result<(), ()>> { +pub struct WALStoreAIO { rootfd: RawFd, rootpath: String, - recover_func: RefCell, aiomgr: Rc, } -impl Result<(), ()>> WALStoreAIO { +impl WALStoreAIO { pub fn new( wal_dir: &str, truncate: bool, - recover_func: F, aiomgr: Option, ) -> Result { - let recover_func = RefCell::new(recover_func); let rootpath = wal_dir.to_string(); let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else( |_: Result| { @@ -174,16 +170,13 @@ impl Result<(), ()>> WALStoreAIO { Ok(WALStoreAIO { rootfd, rootpath, - recover_func, aiomgr, }) } } #[async_trait(?Send)] -impl Result<(), ()>> WALStore - for WALStoreAIO -{ +impl WALStore for WALStoreAIO { type FileNameIter = std::vec::IntoIter; async fn open_file( @@ -212,12 +205,4 @@ impl Result<(), ()>> WALStore } Ok(logfiles.into_iter()) } - - fn apply_payload( - &self, - payload: WALBytes, - ringid: WALRingId, - ) -> Result<(), ()> { - (&mut *self.recover_func.borrow_mut())(payload, ringid) - } } diff --git a/src/wal.rs b/src/wal.rs index 28831cd..deae98d 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; -use futures::future::{self, FutureExt, TryFutureExt}; use futures::executor::block_on; +use futures::future::{self, FutureExt, TryFutureExt}; use std::cell::{RefCell, UnsafeCell}; -use std::collections::{BinaryHeap, HashMap, hash_map}; +use std::collections::{hash_map, BinaryHeap, HashMap}; use std::future::Future; use std::mem::MaybeUninit; use std::pin::Pin; @@ -67,15 +67,21 @@ pub trait Record { } impl Record for WALBytes { - fn serialize(&self) -> WALBytes { self[..].into() } + fn serialize(&self) -> WALBytes { + self[..].into() + } } impl Record for String { - fn serialize(&self) -> WALBytes { self.as_bytes().into() } + fn serialize(&self) -> WALBytes { + self.as_bytes().into() + } } impl Record for &str { - fn serialize(&self) -> WALBytes { self.as_bytes().into() } + fn serialize(&self) -> WALBytes { + self.as_bytes().into() + } } /// the state for a WAL writer @@ -124,14 +130,6 @@ pub trait WALStore { /// Enumerate all WAL filenames. It should include all WAL files that are previously opened /// (created) but not removed. The list could be unordered. fn enumerate_files(&self) -> Result; - /// Apply the payload during recovery. An invocation of the callback waits the application for - /// redoing the given operation to ensure its state is consistent. We assume the necessary - /// changes by the payload has already been persistent when the callback returns. - fn apply_payload( - &self, - payload: WALBytes, - ringid: WALRingId, - ) -> Result<(), ()>; } struct WALFileHandle<'a, F: WALStore> { @@ -142,7 +140,9 @@ struct WALFileHandle<'a, F: WALStore> { impl<'a, F: WALStore> std::ops::Deref for WALFileHandle<'a, F> { type Target = dyn WALFile + 'a; - fn deref(&self) -> &Self::Target { self.handle } + fn deref(&self) -> &Self::Target { + self.handle + } } impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> { @@ -158,7 +158,8 @@ impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> { struct WALFilePool { store: F, handle_cache: RefCell>>, - handle_used: RefCell, usize)>>>, + handle_used: + RefCell, usize)>>>, last_write: UnsafeCell>>>>>, last_peel: @@ -201,18 +202,33 @@ impl WALFilePool { let pool = self as *const WALFilePool; if let Some(h) = self.handle_cache.borrow_mut().pop(&fid) { let handle = match self.handle_used.borrow_mut().entry(fid) { - hash_map::Entry::Vacant(e) => unsafe {&*(*e.insert(UnsafeCell::new((h, 1))).get()).0}, + hash_map::Entry::Vacant(e) => unsafe { + &*(*e.insert(UnsafeCell::new((h, 1))).get()).0 + }, _ => unreachable!(), }; Ok(WALFileHandle { fid, handle, pool }) } else { - let v = unsafe{&mut *match self.handle_used.borrow_mut().entry(fid) { - hash_map::Entry::Occupied(e) => e.into_mut(), - hash_map::Entry::Vacant(e) => e.insert( - UnsafeCell::new((self.store.open_file(&Self::get_fname(fid), touch).await?, 0))) - }.get()}; + let v = unsafe { + &mut *match self.handle_used.borrow_mut().entry(fid) { + hash_map::Entry::Occupied(e) => e.into_mut(), + hash_map::Entry::Vacant(e) => { + e.insert(UnsafeCell::new(( + self.store + .open_file(&Self::get_fname(fid), touch) + .await?, + 0, + ))) + } + } + .get() + }; v.1 += 1; - Ok(WALFileHandle { fid, handle: &*v.0, pool }) + Ok(WALFileHandle { + fid, + handle: &*v.0, + pool, + }) } } } @@ -220,13 +236,15 @@ impl WALFilePool { fn release_file(&self, fid: WALFileId) { match self.handle_used.borrow_mut().entry(fid) { hash_map::Entry::Occupied(e) => { - let v = unsafe{&mut *e.get().get()}; + let v = unsafe { &mut *e.get().get() }; v.1 -= 1; if v.1 == 0 { - self.handle_cache.borrow_mut().put(fid, e.remove().into_inner().0); + self.handle_cache + .borrow_mut() + .put(fid, e.remove().into_inner().0); } - }, - _ => unreachable!() + } + _ => unreachable!(), } } @@ -263,7 +281,12 @@ impl WALFilePool { let alloc = async move { last_write.await?; let mut last_h: Option< - Pin, ()>> + 'a>>, + Pin< + Box< + dyn Future, ()>> + + 'a, + >, + >, > = None; for ((next_fid, wl), h) in meta.into_iter().zip(files.into_iter()) { if let Some(lh) = last_h.take() { @@ -349,7 +372,7 @@ impl WALFilePool { } fn in_use_len(&self) -> usize { - self.handle_used.borrow().len() + self.handle_used.borrow().len() } fn reset(&mut self) { @@ -556,7 +579,9 @@ impl WALWriter { self.file_pool.remove_files(orig_fid, next_fid) } - pub fn file_pool_in_use(&self) -> usize { self.file_pool.in_use_len() } + pub fn file_pool_in_use(&self) -> usize { + self.file_pool.in_use_len() + } } #[derive(Copy, Clone)] @@ -564,7 +589,7 @@ pub enum RecoverPolicy { /// all checksums must be correct, otherwise recovery fails Strict, /// stop recovering when hitting the first corrupted record - BestEffort + BestEffort, } pub struct WALLoader { @@ -577,10 +602,10 @@ pub struct WALLoader { impl Default for WALLoader { fn default() -> Self { WALLoader { - file_nbit: 22, // 4MB + file_nbit: 22, // 4MB block_nbit: 15, // 32KB, cache_size: 16, - recover_policy: RecoverPolicy::Strict + recover_policy: RecoverPolicy::Strict, } } } @@ -622,7 +647,11 @@ impl WALLoader { } /// Recover by reading the WAL files. - pub fn load(&self, store: F) -> Result, ()> { + pub fn load Result<(), ()>>( + &self, + store: S, + mut recover_func: F, + ) -> Result, ()> { let msize = std::mem::size_of::(); assert!(self.file_nbit > self.block_nbit); assert!(msize < 1 << self.block_nbit); @@ -650,7 +679,7 @@ impl WALLoader { if skip { f.truncate(0)?; block_on(file_pool.store.remove_file(fname))?; - continue + continue; } while let Some(header_raw) = f.read(off, msize as usize)? { let ringid_start = (fid << file_pool.file_nbit) + off; @@ -668,10 +697,10 @@ impl WALLoader { // TODO: improve the behavior when CRC32 fails if !self.verify_checksum(&payload, header.crc32)? { skip = true; - break + break; } off += rsize as u64; - file_pool.store.apply_payload( + recover_func( payload, WALRingId { start: ringid_start, @@ -684,17 +713,18 @@ impl WALLoader { let chunk = f.read(off, rsize as usize)?.ok_or(())?; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; - break + break; } chunks = Some((vec![chunk], ringid_start)); off += rsize as u64; } WALRingType::Middle => { if let Some((chunks, _)) = &mut chunks { - let chunk = f.read(off, rsize as usize)?.ok_or(())?; + let chunk = + f.read(off, rsize as usize)?.ok_or(())?; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; - break + break; } chunks.push(chunk); } // otherwise ignore the leftover @@ -703,11 +733,12 @@ impl WALLoader { WALRingType::Last => { if let Some((mut chunks, ringid_start)) = chunks.take() { - let chunk = f.read(off, rsize as usize)?.ok_or(())?; + let chunk = + f.read(off, rsize as usize)?.ok_or(())?; off += rsize as u64; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; - break + break; } chunks.push(chunk); let mut payload = Vec::new(); @@ -720,7 +751,7 @@ impl WALLoader { ps[..c.len()].copy_from_slice(&*c); ps = &mut ps[c.len()..]; } - file_pool.store.apply_payload( + recover_func( payload.into_boxed_slice(), WALRingId { start: ringid_start, -- cgit v1.2.3-70-g09d2