summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-08-25 17:35:17 -0400
committerDeterminant <[email protected]>2020-08-25 17:35:17 -0400
commit395bd19d51c5f5e0bfd3b5897ddce5f6bef5ec79 (patch)
tree416234ba859ac761e12b15182adbc6c67a3875d1 /src
parent5a578e4dac2721b324aff1665b7d2afbe00b5fb3 (diff)
simplify API by moving `recover_func` to `load()`v0.1.5
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs39
-rw-r--r--src/wal.rs115
2 files changed, 85 insertions, 69 deletions
diff --git a/src/lib.rs b/src/lib.rs
index be8993b..2a67cd3 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,8 +10,8 @@
//!
//!
//! // Start with empty WAL (truncate = true).
-//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}, None).unwrap();
-//! let mut wal = loader.load(store).unwrap();
+//! let store = WALStoreAIO::new("./walfiles", true, None).unwrap();
+//! let mut wal = loader.load(store, |_, _| {Ok(())}).unwrap();
//! // Write a vector of records to WAL.
//! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() {
//! let ring_id = block_on(f).unwrap().1;
@@ -20,14 +20,14 @@
//!
//!
//! // Load from WAL (truncate = false).
-//! let store = WALStoreAIO::new("./walfiles", false, |payload, ringid| {
+//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap();
+//! let mut wal = loader.load(store, |payload, ringid| {
//! // redo the operations in your application
//! println!("recover(payload={}, ringid={:?})",
//! std::str::from_utf8(&payload).unwrap(),
//! ringid);
//! Ok(())
-//! }, None).unwrap();
-//! let mut wal = loader.load(store).unwrap();
+//! }).unwrap();
//! // We saw some log playback, even there is no failure.
//! // Let's try to grow the WAL to create many files.
//! let ring_ids = wal.grow((0..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>())
@@ -37,11 +37,11 @@
//! block_on(wal.peel(ring_ids)).unwrap();
//! // There will only be one remaining file in ./walfiles.
//!
-//! let store = WALStoreAIO::new("./walfiles", false, |payload, _| {
+//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap();
+//! let wal = loader.load(store, |payload, _| {
//! println!("payload.len() = {}", payload.len());
//! Ok(())
-//! }, None).unwrap();
-//! let wal = loader.load(store).unwrap();
+//! }).unwrap();
//! // After each recovery, the ./walfiles is empty.
//! ```
@@ -55,10 +55,9 @@ use libc::off_t;
use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag};
use nix::sys::stat::Mode;
use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags};
-use std::cell::RefCell;
use std::os::unix::io::RawFd;
use std::rc::Rc;
-use wal::{WALBytes, WALFile, WALPos, WALRingId, WALStore};
+use wal::{WALBytes, WALFile, WALPos, WALStore};
pub struct WALFileAIO {
fd: RawFd,
@@ -130,21 +129,18 @@ impl WALFile for WALFileAIO {
}
}
-pub struct WALStoreAIO<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> {
+pub struct WALStoreAIO {
rootfd: RawFd,
rootpath: String,
- recover_func: RefCell<F>,
aiomgr: Rc<AIOManager>,
}
-impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStoreAIO<F> {
+impl WALStoreAIO {
pub fn new(
wal_dir: &str,
truncate: bool,
- recover_func: F,
aiomgr: Option<AIOManager>,
) -> Result<Self, ()> {
- let recover_func = RefCell::new(recover_func);
let rootpath = wal_dir.to_string();
let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else(
|_: Result<AIOManager, ()>| {
@@ -174,16 +170,13 @@ impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStoreAIO<F> {
Ok(WALStoreAIO {
rootfd,
rootpath,
- recover_func,
aiomgr,
})
}
}
#[async_trait(?Send)]
-impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStore
- for WALStoreAIO<F>
-{
+impl WALStore for WALStoreAIO {
type FileNameIter = std::vec::IntoIter<String>;
async fn open_file(
@@ -212,12 +205,4 @@ impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStore
}
Ok(logfiles.into_iter())
}
-
- fn apply_payload(
- &self,
- payload: WALBytes,
- ringid: WALRingId,
- ) -> Result<(), ()> {
- (&mut *self.recover_func.borrow_mut())(payload, ringid)
- }
}
diff --git a/src/wal.rs b/src/wal.rs
index 28831cd..deae98d 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,8 +1,8 @@
use async_trait::async_trait;
-use futures::future::{self, FutureExt, TryFutureExt};
use futures::executor::block_on;
+use futures::future::{self, FutureExt, TryFutureExt};
use std::cell::{RefCell, UnsafeCell};
-use std::collections::{BinaryHeap, HashMap, hash_map};
+use std::collections::{hash_map, BinaryHeap, HashMap};
use std::future::Future;
use std::mem::MaybeUninit;
use std::pin::Pin;
@@ -67,15 +67,21 @@ pub trait Record {
}
impl Record for WALBytes {
- fn serialize(&self) -> WALBytes { self[..].into() }
+ fn serialize(&self) -> WALBytes {
+ self[..].into()
+ }
}
impl Record for String {
- fn serialize(&self) -> WALBytes { self.as_bytes().into() }
+ fn serialize(&self) -> WALBytes {
+ self.as_bytes().into()
+ }
}
impl Record for &str {
- fn serialize(&self) -> WALBytes { self.as_bytes().into() }
+ fn serialize(&self) -> WALBytes {
+ self.as_bytes().into()
+ }
}
/// the state for a WAL writer
@@ -124,14 +130,6 @@ pub trait WALStore {
/// Enumerate all WAL filenames. It should include all WAL files that are previously opened
/// (created) but not removed. The list could be unordered.
fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>;
- /// Apply the payload during recovery. An invocation of the callback waits the application for
- /// redoing the given operation to ensure its state is consistent. We assume the necessary
- /// changes by the payload has already been persistent when the callback returns.
- fn apply_payload(
- &self,
- payload: WALBytes,
- ringid: WALRingId,
- ) -> Result<(), ()>;
}
struct WALFileHandle<'a, F: WALStore> {
@@ -142,7 +140,9 @@ struct WALFileHandle<'a, F: WALStore> {
impl<'a, F: WALStore> std::ops::Deref for WALFileHandle<'a, F> {
type Target = dyn WALFile + 'a;
- fn deref(&self) -> &Self::Target { self.handle }
+ fn deref(&self) -> &Self::Target {
+ self.handle
+ }
}
impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> {
@@ -158,7 +158,8 @@ impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> {
struct WALFilePool<F: WALStore> {
store: F,
handle_cache: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>,
- handle_used: RefCell<HashMap<WALFileId, UnsafeCell<(Box<dyn WALFile>, usize)>>>,
+ handle_used:
+ RefCell<HashMap<WALFileId, UnsafeCell<(Box<dyn WALFile>, usize)>>>,
last_write:
UnsafeCell<MaybeUninit<Pin<Box<dyn Future<Output = Result<(), ()>>>>>>,
last_peel:
@@ -201,18 +202,33 @@ impl<F: WALStore> WALFilePool<F> {
let pool = self as *const WALFilePool<F>;
if let Some(h) = self.handle_cache.borrow_mut().pop(&fid) {
let handle = match self.handle_used.borrow_mut().entry(fid) {
- hash_map::Entry::Vacant(e) => unsafe {&*(*e.insert(UnsafeCell::new((h, 1))).get()).0},
+ hash_map::Entry::Vacant(e) => unsafe {
+ &*(*e.insert(UnsafeCell::new((h, 1))).get()).0
+ },
_ => unreachable!(),
};
Ok(WALFileHandle { fid, handle, pool })
} else {
- let v = unsafe{&mut *match self.handle_used.borrow_mut().entry(fid) {
- hash_map::Entry::Occupied(e) => e.into_mut(),
- hash_map::Entry::Vacant(e) => e.insert(
- UnsafeCell::new((self.store.open_file(&Self::get_fname(fid), touch).await?, 0)))
- }.get()};
+ let v = unsafe {
+ &mut *match self.handle_used.borrow_mut().entry(fid) {
+ hash_map::Entry::Occupied(e) => e.into_mut(),
+ hash_map::Entry::Vacant(e) => {
+ e.insert(UnsafeCell::new((
+ self.store
+ .open_file(&Self::get_fname(fid), touch)
+ .await?,
+ 0,
+ )))
+ }
+ }
+ .get()
+ };
v.1 += 1;
- Ok(WALFileHandle { fid, handle: &*v.0, pool })
+ Ok(WALFileHandle {
+ fid,
+ handle: &*v.0,
+ pool,
+ })
}
}
}
@@ -220,13 +236,15 @@ impl<F: WALStore> WALFilePool<F> {
fn release_file(&self, fid: WALFileId) {
match self.handle_used.borrow_mut().entry(fid) {
hash_map::Entry::Occupied(e) => {
- let v = unsafe{&mut *e.get().get()};
+ let v = unsafe { &mut *e.get().get() };
v.1 -= 1;
if v.1 == 0 {
- self.handle_cache.borrow_mut().put(fid, e.remove().into_inner().0);
+ self.handle_cache
+ .borrow_mut()
+ .put(fid, e.remove().into_inner().0);
}
- },
- _ => unreachable!()
+ }
+ _ => unreachable!(),
}
}
@@ -263,7 +281,12 @@ impl<F: WALStore> WALFilePool<F> {
let alloc = async move {
last_write.await?;
let mut last_h: Option<
- Pin<Box<dyn Future<Output = Result<WALFileHandle<'a, F>, ()>> + 'a>>,
+ Pin<
+ Box<
+ dyn Future<Output = Result<WALFileHandle<'a, F>, ()>>
+ + 'a,
+ >,
+ >,
> = None;
for ((next_fid, wl), h) in meta.into_iter().zip(files.into_iter()) {
if let Some(lh) = last_h.take() {
@@ -349,7 +372,7 @@ impl<F: WALStore> WALFilePool<F> {
}
fn in_use_len(&self) -> usize {
- self.handle_used.borrow().len()
+ self.handle_used.borrow().len()
}
fn reset(&mut self) {
@@ -556,7 +579,9 @@ impl<F: WALStore> WALWriter<F> {
self.file_pool.remove_files(orig_fid, next_fid)
}
- pub fn file_pool_in_use(&self) -> usize { self.file_pool.in_use_len() }
+ pub fn file_pool_in_use(&self) -> usize {
+ self.file_pool.in_use_len()
+ }
}
#[derive(Copy, Clone)]
@@ -564,7 +589,7 @@ pub enum RecoverPolicy {
/// all checksums must be correct, otherwise recovery fails
Strict,
/// stop recovering when hitting the first corrupted record
- BestEffort
+ BestEffort,
}
pub struct WALLoader {
@@ -577,10 +602,10 @@ pub struct WALLoader {
impl Default for WALLoader {
fn default() -> Self {
WALLoader {
- file_nbit: 22, // 4MB
+ file_nbit: 22, // 4MB
block_nbit: 15, // 32KB,
cache_size: 16,
- recover_policy: RecoverPolicy::Strict
+ recover_policy: RecoverPolicy::Strict,
}
}
}
@@ -622,7 +647,11 @@ impl WALLoader {
}
/// Recover by reading the WAL files.
- pub fn load<F: WALStore>(&self, store: F) -> Result<WALWriter<F>, ()> {
+ pub fn load<S: WALStore, F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>(
+ &self,
+ store: S,
+ mut recover_func: F,
+ ) -> Result<WALWriter<S>, ()> {
let msize = std::mem::size_of::<WALRingBlob>();
assert!(self.file_nbit > self.block_nbit);
assert!(msize < 1 << self.block_nbit);
@@ -650,7 +679,7 @@ impl WALLoader {
if skip {
f.truncate(0)?;
block_on(file_pool.store.remove_file(fname))?;
- continue
+ continue;
}
while let Some(header_raw) = f.read(off, msize as usize)? {
let ringid_start = (fid << file_pool.file_nbit) + off;
@@ -668,10 +697,10 @@ impl WALLoader {
// TODO: improve the behavior when CRC32 fails
if !self.verify_checksum(&payload, header.crc32)? {
skip = true;
- break
+ break;
}
off += rsize as u64;
- file_pool.store.apply_payload(
+ recover_func(
payload,
WALRingId {
start: ringid_start,
@@ -684,17 +713,18 @@ impl WALLoader {
let chunk = f.read(off, rsize as usize)?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
- break
+ break;
}
chunks = Some((vec![chunk], ringid_start));
off += rsize as u64;
}
WALRingType::Middle => {
if let Some((chunks, _)) = &mut chunks {
- let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ let chunk =
+ f.read(off, rsize as usize)?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
- break
+ break;
}
chunks.push(chunk);
} // otherwise ignore the leftover
@@ -703,11 +733,12 @@ impl WALLoader {
WALRingType::Last => {
if let Some((mut chunks, ringid_start)) = chunks.take()
{
- let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ let chunk =
+ f.read(off, rsize as usize)?.ok_or(())?;
off += rsize as u64;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
- break
+ break;
}
chunks.push(chunk);
let mut payload = Vec::new();
@@ -720,7 +751,7 @@ impl WALLoader {
ps[..c.len()].copy_from_slice(&*c);
ps = &mut ps[c.len()..];
}
- file_pool.store.apply_payload(
+ recover_func(
payload.into_boxed_slice(),
WALRingId {
start: ringid_start,