From 395bd19d51c5f5e0bfd3b5897ddce5f6bef5ec79 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 25 Aug 2020 17:35:17 -0400 Subject: simplify API by moving `recover_func` to `load()` --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.rst | 2 +- examples/demo1.rs | 16 ++++---- src/lib.rs | 39 ++++++------------ src/wal.rs | 115 +++++++++++++++++++++++++++++++++------------------- tests/common/mod.rs | 70 ++++++++++++-------------------- 7 files changed, 122 insertions(+), 124 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 98369cd..4a278a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,7 +232,7 @@ dependencies = [ [[package]] name = "growth-ring" -version = "0.1.4" +version = "0.1.5" dependencies = [ "async-trait", "crc", diff --git a/Cargo.toml b/Cargo.toml index dd87bde..22b3232 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "growth-ring" -version = "0.1.4" +version = "0.1.5" authors = ["Determinant "] edition = "2018" homepage = "https://github.com/Determinant/growth-ring" diff --git a/README.rst b/README.rst index c963bf0..69f443d 100644 --- a/README.rst +++ b/README.rst @@ -8,4 +8,4 @@ Documentation ------------- - Latest_ -.. _Latest: https://docs.rs/growth-ring/0.1.2/growthring/ +.. _Latest: https://docs.rs/growth-ring/0.1.5/growthring/ diff --git a/examples/demo1.rs b/examples/demo1.rs index d51801d..6f029ab 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -4,9 +4,9 @@ use growthring::{ }; use rand::{seq::SliceRandom, Rng}; -fn test Result<(), ()>>( +fn test( records: Vec, - wal: &mut WALWriter>, + wal: &mut WALWriter, ) -> Vec { let mut res = Vec::new(); for r in wal.grow(records).into_iter() { @@ -32,8 +32,8 @@ fn main() { let mut loader = WALLoader::new(); loader.file_nbit(9).block_nbit(8); - let store = WALStoreAIO::new(&wal_dir, true, recover, None).unwrap(); - let mut wal = loader.load(store).unwrap(); + let store = WALStoreAIO::new(&wal_dir, true, None).unwrap(); + let mut wal = loader.load(store, recover).unwrap(); for _ in 0..3 { test( ["hi", "hello", "lol"] @@ -50,8 +50,8 @@ fn main() { ); } - let store = WALStoreAIO::new(&wal_dir, false, recover, None).unwrap(); - let mut wal = loader.load(store).unwrap(); + let store = WALStoreAIO::new(&wal_dir, false, None).unwrap(); + let mut wal = loader.load(store, recover).unwrap(); for _ in 0..3 { test( vec![ @@ -64,8 +64,8 @@ fn main() { ); } - let store = WALStoreAIO::new(&wal_dir, false, recover, None).unwrap(); - let mut wal = loader.load(store).unwrap(); + let store = WALStoreAIO::new(&wal_dir, false, None).unwrap(); + let mut wal = loader.load(store, recover).unwrap(); for _ in 0..3 { let mut ids = Vec::new(); for _ in 0..3 { diff --git a/src/lib.rs b/src/lib.rs index be8993b..2a67cd3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,8 @@ //! //! //! // Start with empty WAL (truncate = true). -//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}, None).unwrap(); -//! let mut wal = loader.load(store).unwrap(); +//! let store = WALStoreAIO::new("./walfiles", true, None).unwrap(); +//! let mut wal = loader.load(store, |_, _| {Ok(())}).unwrap(); //! // Write a vector of records to WAL. //! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() { //! let ring_id = block_on(f).unwrap().1; @@ -20,14 +20,14 @@ //! //! //! // Load from WAL (truncate = false). -//! let store = WALStoreAIO::new("./walfiles", false, |payload, ringid| { +//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap(); +//! let mut wal = loader.load(store, |payload, ringid| { //! // redo the operations in your application //! println!("recover(payload={}, ringid={:?})", //! std::str::from_utf8(&payload).unwrap(), //! ringid); //! Ok(()) -//! }, None).unwrap(); -//! let mut wal = loader.load(store).unwrap(); +//! }).unwrap(); //! // We saw some log playback, even there is no failure. //! // Let's try to grow the WAL to create many files. //! let ring_ids = wal.grow((0..100).into_iter().map(|i| "a".repeat(i)).collect::>()) @@ -37,11 +37,11 @@ //! block_on(wal.peel(ring_ids)).unwrap(); //! // There will only be one remaining file in ./walfiles. //! -//! let store = WALStoreAIO::new("./walfiles", false, |payload, _| { +//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap(); +//! let wal = loader.load(store, |payload, _| { //! println!("payload.len() = {}", payload.len()); //! Ok(()) -//! }, None).unwrap(); -//! let wal = loader.load(store).unwrap(); +//! }).unwrap(); //! // After each recovery, the ./walfiles is empty. //! ``` @@ -55,10 +55,9 @@ use libc::off_t; use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag}; use nix::sys::stat::Mode; use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags}; -use std::cell::RefCell; use std::os::unix::io::RawFd; use std::rc::Rc; -use wal::{WALBytes, WALFile, WALPos, WALRingId, WALStore}; +use wal::{WALBytes, WALFile, WALPos, WALStore}; pub struct WALFileAIO { fd: RawFd, @@ -130,21 +129,18 @@ impl WALFile for WALFileAIO { } } -pub struct WALStoreAIO Result<(), ()>> { +pub struct WALStoreAIO { rootfd: RawFd, rootpath: String, - recover_func: RefCell, aiomgr: Rc, } -impl Result<(), ()>> WALStoreAIO { +impl WALStoreAIO { pub fn new( wal_dir: &str, truncate: bool, - recover_func: F, aiomgr: Option, ) -> Result { - let recover_func = RefCell::new(recover_func); let rootpath = wal_dir.to_string(); let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else( |_: Result| { @@ -174,16 +170,13 @@ impl Result<(), ()>> WALStoreAIO { Ok(WALStoreAIO { rootfd, rootpath, - recover_func, aiomgr, }) } } #[async_trait(?Send)] -impl Result<(), ()>> WALStore - for WALStoreAIO -{ +impl WALStore for WALStoreAIO { type FileNameIter = std::vec::IntoIter; async fn open_file( @@ -212,12 +205,4 @@ impl Result<(), ()>> WALStore } Ok(logfiles.into_iter()) } - - fn apply_payload( - &self, - payload: WALBytes, - ringid: WALRingId, - ) -> Result<(), ()> { - (&mut *self.recover_func.borrow_mut())(payload, ringid) - } } diff --git a/src/wal.rs b/src/wal.rs index 28831cd..deae98d 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; -use futures::future::{self, FutureExt, TryFutureExt}; use futures::executor::block_on; +use futures::future::{self, FutureExt, TryFutureExt}; use std::cell::{RefCell, UnsafeCell}; -use std::collections::{BinaryHeap, HashMap, hash_map}; +use std::collections::{hash_map, BinaryHeap, HashMap}; use std::future::Future; use std::mem::MaybeUninit; use std::pin::Pin; @@ -67,15 +67,21 @@ pub trait Record { } impl Record for WALBytes { - fn serialize(&self) -> WALBytes { self[..].into() } + fn serialize(&self) -> WALBytes { + self[..].into() + } } impl Record for String { - fn serialize(&self) -> WALBytes { self.as_bytes().into() } + fn serialize(&self) -> WALBytes { + self.as_bytes().into() + } } impl Record for &str { - fn serialize(&self) -> WALBytes { self.as_bytes().into() } + fn serialize(&self) -> WALBytes { + self.as_bytes().into() + } } /// the state for a WAL writer @@ -124,14 +130,6 @@ pub trait WALStore { /// Enumerate all WAL filenames. It should include all WAL files that are previously opened /// (created) but not removed. The list could be unordered. fn enumerate_files(&self) -> Result; - /// Apply the payload during recovery. An invocation of the callback waits the application for - /// redoing the given operation to ensure its state is consistent. We assume the necessary - /// changes by the payload has already been persistent when the callback returns. - fn apply_payload( - &self, - payload: WALBytes, - ringid: WALRingId, - ) -> Result<(), ()>; } struct WALFileHandle<'a, F: WALStore> { @@ -142,7 +140,9 @@ struct WALFileHandle<'a, F: WALStore> { impl<'a, F: WALStore> std::ops::Deref for WALFileHandle<'a, F> { type Target = dyn WALFile + 'a; - fn deref(&self) -> &Self::Target { self.handle } + fn deref(&self) -> &Self::Target { + self.handle + } } impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> { @@ -158,7 +158,8 @@ impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> { struct WALFilePool { store: F, handle_cache: RefCell>>, - handle_used: RefCell, usize)>>>, + handle_used: + RefCell, usize)>>>, last_write: UnsafeCell>>>>>, last_peel: @@ -201,18 +202,33 @@ impl WALFilePool { let pool = self as *const WALFilePool; if let Some(h) = self.handle_cache.borrow_mut().pop(&fid) { let handle = match self.handle_used.borrow_mut().entry(fid) { - hash_map::Entry::Vacant(e) => unsafe {&*(*e.insert(UnsafeCell::new((h, 1))).get()).0}, + hash_map::Entry::Vacant(e) => unsafe { + &*(*e.insert(UnsafeCell::new((h, 1))).get()).0 + }, _ => unreachable!(), }; Ok(WALFileHandle { fid, handle, pool }) } else { - let v = unsafe{&mut *match self.handle_used.borrow_mut().entry(fid) { - hash_map::Entry::Occupied(e) => e.into_mut(), - hash_map::Entry::Vacant(e) => e.insert( - UnsafeCell::new((self.store.open_file(&Self::get_fname(fid), touch).await?, 0))) - }.get()}; + let v = unsafe { + &mut *match self.handle_used.borrow_mut().entry(fid) { + hash_map::Entry::Occupied(e) => e.into_mut(), + hash_map::Entry::Vacant(e) => { + e.insert(UnsafeCell::new(( + self.store + .open_file(&Self::get_fname(fid), touch) + .await?, + 0, + ))) + } + } + .get() + }; v.1 += 1; - Ok(WALFileHandle { fid, handle: &*v.0, pool }) + Ok(WALFileHandle { + fid, + handle: &*v.0, + pool, + }) } } } @@ -220,13 +236,15 @@ impl WALFilePool { fn release_file(&self, fid: WALFileId) { match self.handle_used.borrow_mut().entry(fid) { hash_map::Entry::Occupied(e) => { - let v = unsafe{&mut *e.get().get()}; + let v = unsafe { &mut *e.get().get() }; v.1 -= 1; if v.1 == 0 { - self.handle_cache.borrow_mut().put(fid, e.remove().into_inner().0); + self.handle_cache + .borrow_mut() + .put(fid, e.remove().into_inner().0); } - }, - _ => unreachable!() + } + _ => unreachable!(), } } @@ -263,7 +281,12 @@ impl WALFilePool { let alloc = async move { last_write.await?; let mut last_h: Option< - Pin, ()>> + 'a>>, + Pin< + Box< + dyn Future, ()>> + + 'a, + >, + >, > = None; for ((next_fid, wl), h) in meta.into_iter().zip(files.into_iter()) { if let Some(lh) = last_h.take() { @@ -349,7 +372,7 @@ impl WALFilePool { } fn in_use_len(&self) -> usize { - self.handle_used.borrow().len() + self.handle_used.borrow().len() } fn reset(&mut self) { @@ -556,7 +579,9 @@ impl WALWriter { self.file_pool.remove_files(orig_fid, next_fid) } - pub fn file_pool_in_use(&self) -> usize { self.file_pool.in_use_len() } + pub fn file_pool_in_use(&self) -> usize { + self.file_pool.in_use_len() + } } #[derive(Copy, Clone)] @@ -564,7 +589,7 @@ pub enum RecoverPolicy { /// all checksums must be correct, otherwise recovery fails Strict, /// stop recovering when hitting the first corrupted record - BestEffort + BestEffort, } pub struct WALLoader { @@ -577,10 +602,10 @@ pub struct WALLoader { impl Default for WALLoader { fn default() -> Self { WALLoader { - file_nbit: 22, // 4MB + file_nbit: 22, // 4MB block_nbit: 15, // 32KB, cache_size: 16, - recover_policy: RecoverPolicy::Strict + recover_policy: RecoverPolicy::Strict, } } } @@ -622,7 +647,11 @@ impl WALLoader { } /// Recover by reading the WAL files. - pub fn load(&self, store: F) -> Result, ()> { + pub fn load Result<(), ()>>( + &self, + store: S, + mut recover_func: F, + ) -> Result, ()> { let msize = std::mem::size_of::(); assert!(self.file_nbit > self.block_nbit); assert!(msize < 1 << self.block_nbit); @@ -650,7 +679,7 @@ impl WALLoader { if skip { f.truncate(0)?; block_on(file_pool.store.remove_file(fname))?; - continue + continue; } while let Some(header_raw) = f.read(off, msize as usize)? { let ringid_start = (fid << file_pool.file_nbit) + off; @@ -668,10 +697,10 @@ impl WALLoader { // TODO: improve the behavior when CRC32 fails if !self.verify_checksum(&payload, header.crc32)? { skip = true; - break + break; } off += rsize as u64; - file_pool.store.apply_payload( + recover_func( payload, WALRingId { start: ringid_start, @@ -684,17 +713,18 @@ impl WALLoader { let chunk = f.read(off, rsize as usize)?.ok_or(())?; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; - break + break; } chunks = Some((vec![chunk], ringid_start)); off += rsize as u64; } WALRingType::Middle => { if let Some((chunks, _)) = &mut chunks { - let chunk = f.read(off, rsize as usize)?.ok_or(())?; + let chunk = + f.read(off, rsize as usize)?.ok_or(())?; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; - break + break; } chunks.push(chunk); } // otherwise ignore the leftover @@ -703,11 +733,12 @@ impl WALLoader { WALRingType::Last => { if let Some((mut chunks, ringid_start)) = chunks.take() { - let chunk = f.read(off, rsize as usize)?.ok_or(())?; + let chunk = + f.read(off, rsize as usize)?.ok_or(())?; off += rsize as u64; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; - break + break; } chunks.push(chunk); let mut payload = Vec::new(); @@ -720,7 +751,7 @@ impl WALLoader { ps[..c.len()].copy_from_slice(&*c); ps = &mut ps[c.len()..]; } - file_pool.store.apply_payload( + recover_func( payload.into_boxed_slice(), WALRingId { start: ringid_start, diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 769f422..9ec9e8e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,9 +1,8 @@ #[cfg(test)] - #[allow(dead_code)] use async_trait::async_trait; use growthring::wal::{ - WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore + WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, }; use indexmap::{map::Entry, IndexMap}; use rand::Rng; @@ -111,37 +110,25 @@ impl WALStoreEmulState { } /// Emulate the persistent storage state. -pub struct WALStoreEmul<'a, G, F> +pub struct WALStoreEmul<'a, G> where G: FailGen, - F: FnMut(WALBytes, WALRingId), { state: RefCell<&'a mut WALStoreEmulState>, fgen: Rc, - recover: RefCell, } -impl<'a, G: FailGen, F: FnMut(WALBytes, WALRingId)> WALStoreEmul<'a, G, F> { - pub fn new( - state: &'a mut WALStoreEmulState, - fgen: Rc, - recover: F, - ) -> Self { +impl<'a, G: FailGen> WALStoreEmul<'a, G> { + pub fn new(state: &'a mut WALStoreEmulState, fgen: Rc) -> Self { let state = RefCell::new(state); - let recover = RefCell::new(recover); - WALStoreEmul { - state, - fgen, - recover, - } + WALStoreEmul { state, fgen } } } #[async_trait(?Send)] -impl<'a, G, F> WALStore for WALStoreEmul<'a, G, F> +impl<'a, G> WALStore for WALStoreEmul<'a, G> where G: 'static + FailGen, - F: FnMut(WALBytes, WALRingId), { type FileNameIter = std::vec::IntoIter; @@ -194,23 +181,6 @@ where } Ok(logfiles.into_iter()) } - - fn apply_payload( - &self, - payload: WALBytes, - ringid: WALRingId, - ) -> Result<(), ()> { - if self.fgen.next_fail() { - return Err(()); - } - /* - println!("apply_payload(payload=0x{}, ringid={:?})", - hex::encode(&payload), - ringid); - */ - (&mut *self.recover.borrow_mut())(payload, ringid); - Ok(()) - } } pub struct SingleFailGen { @@ -336,7 +306,9 @@ impl PaintStrokes { } impl growthring::wal::Record for PaintStrokes { - fn serialize(&self) -> WALBytes { self.to_bytes() } + fn serialize(&self) -> WALBytes { + self.to_bytes() + } } #[test] @@ -461,7 +433,9 @@ impl Canvas { } else { None } - } else { None } + } else { + None + } } pub fn is_same(&self, other: &Canvas) -> bool { @@ -538,7 +512,14 @@ impl PaintingSim { ) -> Result<(), ()> { let mut rng = ::seed_from_u64(self.seed); - let mut wal = loader.load(WALStoreEmul::new(state, fgen.clone(), |_, _| {}))?; + let mut wal = + loader.load(WALStoreEmul::new(state, fgen.clone()), |_, _| { + if fgen.next_fail() { + Err(()) + } else { + Ok(()) + } + })?; for _ in 0..self.n { let pss = (0..self.m) .map(|_| { @@ -597,7 +578,8 @@ impl PaintingSim { pub fn get_walloader(&self) -> WALLoader { let mut loader = WALLoader::new(); - loader.file_nbit(self.file_nbit) + loader + .file_nbit(self.file_nbit) .block_nbit(self.block_nbit) .cache_size(self.file_cache); loader @@ -634,16 +616,16 @@ impl PaintingSim { let mut last_idx = 0; let mut napplied = 0; canvas.clear_queued(); - wal.load(WALStoreEmul::new( - state, - Rc::new(ZeroFailGen), + wal.load( + WALStoreEmul::new(state, Rc::new(ZeroFailGen)), |payload, ringid| { let s = PaintStrokes::from_bytes(&payload); canvas.prepaint(&s, &ringid); last_idx = *ringid_map.get(&ringid).unwrap() + 1; napplied += 1; + Ok(()) }, - )) + ) .unwrap(); println!("last = {}/{}, applied = {}", last_idx, ops.len(), napplied); canvas.paint_all(); -- cgit v1.2.3