aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock2
-rw-r--r--Cargo.toml2
-rw-r--r--README.rst2
-rw-r--r--examples/demo1.rs16
-rw-r--r--src/lib.rs39
-rw-r--r--src/wal.rs115
-rw-r--r--tests/common/mod.rs70
7 files changed, 122 insertions, 124 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 98369cd..4a278a9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -232,7 +232,7 @@ dependencies = [
[[package]]
name = "growth-ring"
-version = "0.1.4"
+version = "0.1.5"
dependencies = [
"async-trait",
"crc",
diff --git a/Cargo.toml b/Cargo.toml
index dd87bde..22b3232 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "growth-ring"
-version = "0.1.4"
+version = "0.1.5"
authors = ["Determinant <[email protected]>"]
edition = "2018"
homepage = "https://github.com/Determinant/growth-ring"
diff --git a/README.rst b/README.rst
index c963bf0..69f443d 100644
--- a/README.rst
+++ b/README.rst
@@ -8,4 +8,4 @@ Documentation
-------------
- Latest_
-.. _Latest: https://docs.rs/growth-ring/0.1.2/growthring/
+.. _Latest: https://docs.rs/growth-ring/0.1.5/growthring/
diff --git a/examples/demo1.rs b/examples/demo1.rs
index d51801d..6f029ab 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -4,9 +4,9 @@ use growthring::{
};
use rand::{seq::SliceRandom, Rng};
-fn test<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>(
+fn test(
records: Vec<String>,
- wal: &mut WALWriter<WALStoreAIO<F>>,
+ wal: &mut WALWriter<WALStoreAIO>,
) -> Vec<WALRingId> {
let mut res = Vec::new();
for r in wal.grow(records).into_iter() {
@@ -32,8 +32,8 @@ fn main() {
let mut loader = WALLoader::new();
loader.file_nbit(9).block_nbit(8);
- let store = WALStoreAIO::new(&wal_dir, true, recover, None).unwrap();
- let mut wal = loader.load(store).unwrap();
+ let store = WALStoreAIO::new(&wal_dir, true, None).unwrap();
+ let mut wal = loader.load(store, recover).unwrap();
for _ in 0..3 {
test(
["hi", "hello", "lol"]
@@ -50,8 +50,8 @@ fn main() {
);
}
- let store = WALStoreAIO::new(&wal_dir, false, recover, None).unwrap();
- let mut wal = loader.load(store).unwrap();
+ let store = WALStoreAIO::new(&wal_dir, false, None).unwrap();
+ let mut wal = loader.load(store, recover).unwrap();
for _ in 0..3 {
test(
vec![
@@ -64,8 +64,8 @@ fn main() {
);
}
- let store = WALStoreAIO::new(&wal_dir, false, recover, None).unwrap();
- let mut wal = loader.load(store).unwrap();
+ let store = WALStoreAIO::new(&wal_dir, false, None).unwrap();
+ let mut wal = loader.load(store, recover).unwrap();
for _ in 0..3 {
let mut ids = Vec::new();
for _ in 0..3 {
diff --git a/src/lib.rs b/src/lib.rs
index be8993b..2a67cd3 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,8 +10,8 @@
//!
//!
//! // Start with empty WAL (truncate = true).
-//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}, None).unwrap();
-//! let mut wal = loader.load(store).unwrap();
+//! let store = WALStoreAIO::new("./walfiles", true, None).unwrap();
+//! let mut wal = loader.load(store, |_, _| {Ok(())}).unwrap();
//! // Write a vector of records to WAL.
//! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() {
//! let ring_id = block_on(f).unwrap().1;
@@ -20,14 +20,14 @@
//!
//!
//! // Load from WAL (truncate = false).
-//! let store = WALStoreAIO::new("./walfiles", false, |payload, ringid| {
+//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap();
+//! let mut wal = loader.load(store, |payload, ringid| {
//! // redo the operations in your application
//! println!("recover(payload={}, ringid={:?})",
//! std::str::from_utf8(&payload).unwrap(),
//! ringid);
//! Ok(())
-//! }, None).unwrap();
-//! let mut wal = loader.load(store).unwrap();
+//! }).unwrap();
//! // We saw some log playback, even there is no failure.
//! // Let's try to grow the WAL to create many files.
//! let ring_ids = wal.grow((0..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>())
@@ -37,11 +37,11 @@
//! block_on(wal.peel(ring_ids)).unwrap();
//! // There will only be one remaining file in ./walfiles.
//!
-//! let store = WALStoreAIO::new("./walfiles", false, |payload, _| {
+//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap();
+//! let wal = loader.load(store, |payload, _| {
//! println!("payload.len() = {}", payload.len());
//! Ok(())
-//! }, None).unwrap();
-//! let wal = loader.load(store).unwrap();
+//! }).unwrap();
//! // After each recovery, the ./walfiles is empty.
//! ```
@@ -55,10 +55,9 @@ use libc::off_t;
use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag};
use nix::sys::stat::Mode;
use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags};
-use std::cell::RefCell;
use std::os::unix::io::RawFd;
use std::rc::Rc;
-use wal::{WALBytes, WALFile, WALPos, WALRingId, WALStore};
+use wal::{WALBytes, WALFile, WALPos, WALStore};
pub struct WALFileAIO {
fd: RawFd,
@@ -130,21 +129,18 @@ impl WALFile for WALFileAIO {
}
}
-pub struct WALStoreAIO<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> {
+pub struct WALStoreAIO {
rootfd: RawFd,
rootpath: String,
- recover_func: RefCell<F>,
aiomgr: Rc<AIOManager>,
}
-impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStoreAIO<F> {
+impl WALStoreAIO {
pub fn new(
wal_dir: &str,
truncate: bool,
- recover_func: F,
aiomgr: Option<AIOManager>,
) -> Result<Self, ()> {
- let recover_func = RefCell::new(recover_func);
let rootpath = wal_dir.to_string();
let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else(
|_: Result<AIOManager, ()>| {
@@ -174,16 +170,13 @@ impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStoreAIO<F> {
Ok(WALStoreAIO {
rootfd,
rootpath,
- recover_func,
aiomgr,
})
}
}
#[async_trait(?Send)]
-impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStore
- for WALStoreAIO<F>
-{
+impl WALStore for WALStoreAIO {
type FileNameIter = std::vec::IntoIter<String>;
async fn open_file(
@@ -212,12 +205,4 @@ impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStore
}
Ok(logfiles.into_iter())
}
-
- fn apply_payload(
- &self,
- payload: WALBytes,
- ringid: WALRingId,
- ) -> Result<(), ()> {
- (&mut *self.recover_func.borrow_mut())(payload, ringid)
- }
}
diff --git a/src/wal.rs b/src/wal.rs
index 28831cd..deae98d 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,8 +1,8 @@
use async_trait::async_trait;
-use futures::future::{self, FutureExt, TryFutureExt};
use futures::executor::block_on;
+use futures::future::{self, FutureExt, TryFutureExt};
use std::cell::{RefCell, UnsafeCell};
-use std::collections::{BinaryHeap, HashMap, hash_map};
+use std::collections::{hash_map, BinaryHeap, HashMap};
use std::future::Future;
use std::mem::MaybeUninit;
use std::pin::Pin;
@@ -67,15 +67,21 @@ pub trait Record {
}
impl Record for WALBytes {
- fn serialize(&self) -> WALBytes { self[..].into() }
+ fn serialize(&self) -> WALBytes {
+ self[..].into()
+ }
}
impl Record for String {
- fn serialize(&self) -> WALBytes { self.as_bytes().into() }
+ fn serialize(&self) -> WALBytes {
+ self.as_bytes().into()
+ }
}
impl Record for &str {
- fn serialize(&self) -> WALBytes { self.as_bytes().into() }
+ fn serialize(&self) -> WALBytes {
+ self.as_bytes().into()
+ }
}
/// the state for a WAL writer
@@ -124,14 +130,6 @@ pub trait WALStore {
/// Enumerate all WAL filenames. It should include all WAL files that are previously opened
/// (created) but not removed. The list could be unordered.
fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>;
- /// Apply the payload during recovery. An invocation of the callback waits the application for
- /// redoing the given operation to ensure its state is consistent. We assume the necessary
- /// changes by the payload has already been persistent when the callback returns.
- fn apply_payload(
- &self,
- payload: WALBytes,
- ringid: WALRingId,
- ) -> Result<(), ()>;
}
struct WALFileHandle<'a, F: WALStore> {
@@ -142,7 +140,9 @@ struct WALFileHandle<'a, F: WALStore> {
impl<'a, F: WALStore> std::ops::Deref for WALFileHandle<'a, F> {
type Target = dyn WALFile + 'a;
- fn deref(&self) -> &Self::Target { self.handle }
+ fn deref(&self) -> &Self::Target {
+ self.handle
+ }
}
impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> {
@@ -158,7 +158,8 @@ impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> {
struct WALFilePool<F: WALStore> {
store: F,
handle_cache: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>,
- handle_used: RefCell<HashMap<WALFileId, UnsafeCell<(Box<dyn WALFile>, usize)>>>,
+ handle_used:
+ RefCell<HashMap<WALFileId, UnsafeCell<(Box<dyn WALFile>, usize)>>>,
last_write:
UnsafeCell<MaybeUninit<Pin<Box<dyn Future<Output = Result<(), ()>>>>>>,
last_peel:
@@ -201,18 +202,33 @@ impl<F: WALStore> WALFilePool<F> {
let pool = self as *const WALFilePool<F>;
if let Some(h) = self.handle_cache.borrow_mut().pop(&fid) {
let handle = match self.handle_used.borrow_mut().entry(fid) {
- hash_map::Entry::Vacant(e) => unsafe {&*(*e.insert(UnsafeCell::new((h, 1))).get()).0},
+ hash_map::Entry::Vacant(e) => unsafe {
+ &*(*e.insert(UnsafeCell::new((h, 1))).get()).0
+ },
_ => unreachable!(),
};
Ok(WALFileHandle { fid, handle, pool })
} else {
- let v = unsafe{&mut *match self.handle_used.borrow_mut().entry(fid) {
- hash_map::Entry::Occupied(e) => e.into_mut(),
- hash_map::Entry::Vacant(e) => e.insert(
- UnsafeCell::new((self.store.open_file(&Self::get_fname(fid), touch).await?, 0)))
- }.get()};
+ let v = unsafe {
+ &mut *match self.handle_used.borrow_mut().entry(fid) {
+ hash_map::Entry::Occupied(e) => e.into_mut(),
+ hash_map::Entry::Vacant(e) => {
+ e.insert(UnsafeCell::new((
+ self.store
+ .open_file(&Self::get_fname(fid), touch)
+ .await?,
+ 0,
+ )))
+ }
+ }
+ .get()
+ };
v.1 += 1;
- Ok(WALFileHandle { fid, handle: &*v.0, pool })
+ Ok(WALFileHandle {
+ fid,
+ handle: &*v.0,
+ pool,
+ })
}
}
}
@@ -220,13 +236,15 @@ impl<F: WALStore> WALFilePool<F> {
fn release_file(&self, fid: WALFileId) {
match self.handle_used.borrow_mut().entry(fid) {
hash_map::Entry::Occupied(e) => {
- let v = unsafe{&mut *e.get().get()};
+ let v = unsafe { &mut *e.get().get() };
v.1 -= 1;
if v.1 == 0 {
- self.handle_cache.borrow_mut().put(fid, e.remove().into_inner().0);
+ self.handle_cache
+ .borrow_mut()
+ .put(fid, e.remove().into_inner().0);
}
- },
- _ => unreachable!()
+ }
+ _ => unreachable!(),
}
}
@@ -263,7 +281,12 @@ impl<F: WALStore> WALFilePool<F> {
let alloc = async move {
last_write.await?;
let mut last_h: Option<
- Pin<Box<dyn Future<Output = Result<WALFileHandle<'a, F>, ()>> + 'a>>,
+ Pin<
+ Box<
+ dyn Future<Output = Result<WALFileHandle<'a, F>, ()>>
+ + 'a,
+ >,
+ >,
> = None;
for ((next_fid, wl), h) in meta.into_iter().zip(files.into_iter()) {
if let Some(lh) = last_h.take() {
@@ -349,7 +372,7 @@ impl<F: WALStore> WALFilePool<F> {
}
fn in_use_len(&self) -> usize {
- self.handle_used.borrow().len()
+ self.handle_used.borrow().len()
}
fn reset(&mut self) {
@@ -556,7 +579,9 @@ impl<F: WALStore> WALWriter<F> {
self.file_pool.remove_files(orig_fid, next_fid)
}
- pub fn file_pool_in_use(&self) -> usize { self.file_pool.in_use_len() }
+ pub fn file_pool_in_use(&self) -> usize {
+ self.file_pool.in_use_len()
+ }
}
#[derive(Copy, Clone)]
@@ -564,7 +589,7 @@ pub enum RecoverPolicy {
/// all checksums must be correct, otherwise recovery fails
Strict,
/// stop recovering when hitting the first corrupted record
- BestEffort
+ BestEffort,
}
pub struct WALLoader {
@@ -577,10 +602,10 @@ pub struct WALLoader {
impl Default for WALLoader {
fn default() -> Self {
WALLoader {
- file_nbit: 22, // 4MB
+ file_nbit: 22, // 4MB
block_nbit: 15, // 32KB,
cache_size: 16,
- recover_policy: RecoverPolicy::Strict
+ recover_policy: RecoverPolicy::Strict,
}
}
}
@@ -622,7 +647,11 @@ impl WALLoader {
}
/// Recover by reading the WAL files.
- pub fn load<F: WALStore>(&self, store: F) -> Result<WALWriter<F>, ()> {
+ pub fn load<S: WALStore, F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>(
+ &self,
+ store: S,
+ mut recover_func: F,
+ ) -> Result<WALWriter<S>, ()> {
let msize = std::mem::size_of::<WALRingBlob>();
assert!(self.file_nbit > self.block_nbit);
assert!(msize < 1 << self.block_nbit);
@@ -650,7 +679,7 @@ impl WALLoader {
if skip {
f.truncate(0)?;
block_on(file_pool.store.remove_file(fname))?;
- continue
+ continue;
}
while let Some(header_raw) = f.read(off, msize as usize)? {
let ringid_start = (fid << file_pool.file_nbit) + off;
@@ -668,10 +697,10 @@ impl WALLoader {
// TODO: improve the behavior when CRC32 fails
if !self.verify_checksum(&payload, header.crc32)? {
skip = true;
- break
+ break;
}
off += rsize as u64;
- file_pool.store.apply_payload(
+ recover_func(
payload,
WALRingId {
start: ringid_start,
@@ -684,17 +713,18 @@ impl WALLoader {
let chunk = f.read(off, rsize as usize)?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
- break
+ break;
}
chunks = Some((vec![chunk], ringid_start));
off += rsize as u64;
}
WALRingType::Middle => {
if let Some((chunks, _)) = &mut chunks {
- let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ let chunk =
+ f.read(off, rsize as usize)?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
- break
+ break;
}
chunks.push(chunk);
} // otherwise ignore the leftover
@@ -703,11 +733,12 @@ impl WALLoader {
WALRingType::Last => {
if let Some((mut chunks, ringid_start)) = chunks.take()
{
- let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ let chunk =
+ f.read(off, rsize as usize)?.ok_or(())?;
off += rsize as u64;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
- break
+ break;
}
chunks.push(chunk);
let mut payload = Vec::new();
@@ -720,7 +751,7 @@ impl WALLoader {
ps[..c.len()].copy_from_slice(&*c);
ps = &mut ps[c.len()..];
}
- file_pool.store.apply_payload(
+ recover_func(
payload.into_boxed_slice(),
WALRingId {
start: ringid_start,
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index 769f422..9ec9e8e 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -1,9 +1,8 @@
#[cfg(test)]
-
#[allow(dead_code)]
use async_trait::async_trait;
use growthring::wal::{
- WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore
+ WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore,
};
use indexmap::{map::Entry, IndexMap};
use rand::Rng;
@@ -111,37 +110,25 @@ impl WALStoreEmulState {
}
/// Emulate the persistent storage state.
-pub struct WALStoreEmul<'a, G, F>
+pub struct WALStoreEmul<'a, G>
where
G: FailGen,
- F: FnMut(WALBytes, WALRingId),
{
state: RefCell<&'a mut WALStoreEmulState>,
fgen: Rc<G>,
- recover: RefCell<F>,
}
-impl<'a, G: FailGen, F: FnMut(WALBytes, WALRingId)> WALStoreEmul<'a, G, F> {
- pub fn new(
- state: &'a mut WALStoreEmulState,
- fgen: Rc<G>,
- recover: F,
- ) -> Self {
+impl<'a, G: FailGen> WALStoreEmul<'a, G> {
+ pub fn new(state: &'a mut WALStoreEmulState, fgen: Rc<G>) -> Self {
let state = RefCell::new(state);
- let recover = RefCell::new(recover);
- WALStoreEmul {
- state,
- fgen,
- recover,
- }
+ WALStoreEmul { state, fgen }
}
}
#[async_trait(?Send)]
-impl<'a, G, F> WALStore for WALStoreEmul<'a, G, F>
+impl<'a, G> WALStore for WALStoreEmul<'a, G>
where
G: 'static + FailGen,
- F: FnMut(WALBytes, WALRingId),
{
type FileNameIter = std::vec::IntoIter<String>;
@@ -194,23 +181,6 @@ where
}
Ok(logfiles.into_iter())
}
-
- fn apply_payload(
- &self,
- payload: WALBytes,
- ringid: WALRingId,
- ) -> Result<(), ()> {
- if self.fgen.next_fail() {
- return Err(());
- }
- /*
- println!("apply_payload(payload=0x{}, ringid={:?})",
- hex::encode(&payload),
- ringid);
- */
- (&mut *self.recover.borrow_mut())(payload, ringid);
- Ok(())
- }
}
pub struct SingleFailGen {
@@ -336,7 +306,9 @@ impl PaintStrokes {
}
impl growthring::wal::Record for PaintStrokes {
- fn serialize(&self) -> WALBytes { self.to_bytes() }
+ fn serialize(&self) -> WALBytes {
+ self.to_bytes()
+ }
}
#[test]
@@ -461,7 +433,9 @@ impl Canvas {
} else {
None
}
- } else { None }
+ } else {
+ None
+ }
}
pub fn is_same(&self, other: &Canvas) -> bool {
@@ -538,7 +512,14 @@ impl PaintingSim {
) -> Result<(), ()> {
let mut rng =
<rand::rngs::StdRng as rand::SeedableRng>::seed_from_u64(self.seed);
- let mut wal = loader.load(WALStoreEmul::new(state, fgen.clone(), |_, _| {}))?;
+ let mut wal =
+ loader.load(WALStoreEmul::new(state, fgen.clone()), |_, _| {
+ if fgen.next_fail() {
+ Err(())
+ } else {
+ Ok(())
+ }
+ })?;
for _ in 0..self.n {
let pss = (0..self.m)
.map(|_| {
@@ -597,7 +578,8 @@ impl PaintingSim {
pub fn get_walloader(&self) -> WALLoader {
let mut loader = WALLoader::new();
- loader.file_nbit(self.file_nbit)
+ loader
+ .file_nbit(self.file_nbit)
.block_nbit(self.block_nbit)
.cache_size(self.file_cache);
loader
@@ -634,16 +616,16 @@ impl PaintingSim {
let mut last_idx = 0;
let mut napplied = 0;
canvas.clear_queued();
- wal.load(WALStoreEmul::new(
- state,
- Rc::new(ZeroFailGen),
+ wal.load(
+ WALStoreEmul::new(state, Rc::new(ZeroFailGen)),
|payload, ringid| {
let s = PaintStrokes::from_bytes(&payload);
canvas.prepaint(&s, &ringid);
last_idx = *ringid_map.get(&ringid).unwrap() + 1;
napplied += 1;
+ Ok(())
},
- ))
+ )
.unwrap();
println!("last = {}/{}, applied = {}", last_idx, ops.len(), napplied);
canvas.paint_all();