aboutsummaryrefslogtreecommitdiff
path: root/src/wal.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wal.rs')
-rw-r--r--src/wal.rs115
1 files changed, 73 insertions, 42 deletions
diff --git a/src/wal.rs b/src/wal.rs
index 28831cd..deae98d 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,8 +1,8 @@
use async_trait::async_trait;
-use futures::future::{self, FutureExt, TryFutureExt};
use futures::executor::block_on;
+use futures::future::{self, FutureExt, TryFutureExt};
use std::cell::{RefCell, UnsafeCell};
-use std::collections::{BinaryHeap, HashMap, hash_map};
+use std::collections::{hash_map, BinaryHeap, HashMap};
use std::future::Future;
use std::mem::MaybeUninit;
use std::pin::Pin;
@@ -67,15 +67,21 @@ pub trait Record {
}
impl Record for WALBytes {
- fn serialize(&self) -> WALBytes { self[..].into() }
+ fn serialize(&self) -> WALBytes {
+ self[..].into()
+ }
}
impl Record for String {
- fn serialize(&self) -> WALBytes { self.as_bytes().into() }
+ fn serialize(&self) -> WALBytes {
+ self.as_bytes().into()
+ }
}
impl Record for &str {
- fn serialize(&self) -> WALBytes { self.as_bytes().into() }
+ fn serialize(&self) -> WALBytes {
+ self.as_bytes().into()
+ }
}
/// the state for a WAL writer
@@ -124,14 +130,6 @@ pub trait WALStore {
/// Enumerate all WAL filenames. It should include all WAL files that are previously opened
/// (created) but not removed. The list could be unordered.
fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>;
- /// Apply the payload during recovery. An invocation of the callback waits the application for
- /// redoing the given operation to ensure its state is consistent. We assume the necessary
- /// changes by the payload has already been persistent when the callback returns.
- fn apply_payload(
- &self,
- payload: WALBytes,
- ringid: WALRingId,
- ) -> Result<(), ()>;
}
struct WALFileHandle<'a, F: WALStore> {
@@ -142,7 +140,9 @@ struct WALFileHandle<'a, F: WALStore> {
impl<'a, F: WALStore> std::ops::Deref for WALFileHandle<'a, F> {
type Target = dyn WALFile + 'a;
- fn deref(&self) -> &Self::Target { self.handle }
+ fn deref(&self) -> &Self::Target {
+ self.handle
+ }
}
impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> {
@@ -158,7 +158,8 @@ impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> {
struct WALFilePool<F: WALStore> {
store: F,
handle_cache: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>,
- handle_used: RefCell<HashMap<WALFileId, UnsafeCell<(Box<dyn WALFile>, usize)>>>,
+ handle_used:
+ RefCell<HashMap<WALFileId, UnsafeCell<(Box<dyn WALFile>, usize)>>>,
last_write:
UnsafeCell<MaybeUninit<Pin<Box<dyn Future<Output = Result<(), ()>>>>>>,
last_peel:
@@ -201,18 +202,33 @@ impl<F: WALStore> WALFilePool<F> {
let pool = self as *const WALFilePool<F>;
if let Some(h) = self.handle_cache.borrow_mut().pop(&fid) {
let handle = match self.handle_used.borrow_mut().entry(fid) {
- hash_map::Entry::Vacant(e) => unsafe {&*(*e.insert(UnsafeCell::new((h, 1))).get()).0},
+ hash_map::Entry::Vacant(e) => unsafe {
+ &*(*e.insert(UnsafeCell::new((h, 1))).get()).0
+ },
_ => unreachable!(),
};
Ok(WALFileHandle { fid, handle, pool })
} else {
- let v = unsafe{&mut *match self.handle_used.borrow_mut().entry(fid) {
- hash_map::Entry::Occupied(e) => e.into_mut(),
- hash_map::Entry::Vacant(e) => e.insert(
- UnsafeCell::new((self.store.open_file(&Self::get_fname(fid), touch).await?, 0)))
- }.get()};
+ let v = unsafe {
+ &mut *match self.handle_used.borrow_mut().entry(fid) {
+ hash_map::Entry::Occupied(e) => e.into_mut(),
+ hash_map::Entry::Vacant(e) => {
+ e.insert(UnsafeCell::new((
+ self.store
+ .open_file(&Self::get_fname(fid), touch)
+ .await?,
+ 0,
+ )))
+ }
+ }
+ .get()
+ };
v.1 += 1;
- Ok(WALFileHandle { fid, handle: &*v.0, pool })
+ Ok(WALFileHandle {
+ fid,
+ handle: &*v.0,
+ pool,
+ })
}
}
}
@@ -220,13 +236,15 @@ impl<F: WALStore> WALFilePool<F> {
fn release_file(&self, fid: WALFileId) {
match self.handle_used.borrow_mut().entry(fid) {
hash_map::Entry::Occupied(e) => {
- let v = unsafe{&mut *e.get().get()};
+ let v = unsafe { &mut *e.get().get() };
v.1 -= 1;
if v.1 == 0 {
- self.handle_cache.borrow_mut().put(fid, e.remove().into_inner().0);
+ self.handle_cache
+ .borrow_mut()
+ .put(fid, e.remove().into_inner().0);
}
- },
- _ => unreachable!()
+ }
+ _ => unreachable!(),
}
}
@@ -263,7 +281,12 @@ impl<F: WALStore> WALFilePool<F> {
let alloc = async move {
last_write.await?;
let mut last_h: Option<
- Pin<Box<dyn Future<Output = Result<WALFileHandle<'a, F>, ()>> + 'a>>,
+ Pin<
+ Box<
+ dyn Future<Output = Result<WALFileHandle<'a, F>, ()>>
+ + 'a,
+ >,
+ >,
> = None;
for ((next_fid, wl), h) in meta.into_iter().zip(files.into_iter()) {
if let Some(lh) = last_h.take() {
@@ -349,7 +372,7 @@ impl<F: WALStore> WALFilePool<F> {
}
fn in_use_len(&self) -> usize {
- self.handle_used.borrow().len()
+ self.handle_used.borrow().len()
}
fn reset(&mut self) {
@@ -556,7 +579,9 @@ impl<F: WALStore> WALWriter<F> {
self.file_pool.remove_files(orig_fid, next_fid)
}
- pub fn file_pool_in_use(&self) -> usize { self.file_pool.in_use_len() }
+ pub fn file_pool_in_use(&self) -> usize {
+ self.file_pool.in_use_len()
+ }
}
#[derive(Copy, Clone)]
@@ -564,7 +589,7 @@ pub enum RecoverPolicy {
/// all checksums must be correct, otherwise recovery fails
Strict,
/// stop recovering when hitting the first corrupted record
- BestEffort
+ BestEffort,
}
pub struct WALLoader {
@@ -577,10 +602,10 @@ pub struct WALLoader {
impl Default for WALLoader {
fn default() -> Self {
WALLoader {
- file_nbit: 22, // 4MB
+ file_nbit: 22, // 4MB
block_nbit: 15, // 32KB,
cache_size: 16,
- recover_policy: RecoverPolicy::Strict
+ recover_policy: RecoverPolicy::Strict,
}
}
}
@@ -622,7 +647,11 @@ impl WALLoader {
}
/// Recover by reading the WAL files.
- pub fn load<F: WALStore>(&self, store: F) -> Result<WALWriter<F>, ()> {
+ pub fn load<S: WALStore, F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>(
+ &self,
+ store: S,
+ mut recover_func: F,
+ ) -> Result<WALWriter<S>, ()> {
let msize = std::mem::size_of::<WALRingBlob>();
assert!(self.file_nbit > self.block_nbit);
assert!(msize < 1 << self.block_nbit);
@@ -650,7 +679,7 @@ impl WALLoader {
if skip {
f.truncate(0)?;
block_on(file_pool.store.remove_file(fname))?;
- continue
+ continue;
}
while let Some(header_raw) = f.read(off, msize as usize)? {
let ringid_start = (fid << file_pool.file_nbit) + off;
@@ -668,10 +697,10 @@ impl WALLoader {
// TODO: improve the behavior when CRC32 fails
if !self.verify_checksum(&payload, header.crc32)? {
skip = true;
- break
+ break;
}
off += rsize as u64;
- file_pool.store.apply_payload(
+ recover_func(
payload,
WALRingId {
start: ringid_start,
@@ -684,17 +713,18 @@ impl WALLoader {
let chunk = f.read(off, rsize as usize)?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
- break
+ break;
}
chunks = Some((vec![chunk], ringid_start));
off += rsize as u64;
}
WALRingType::Middle => {
if let Some((chunks, _)) = &mut chunks {
- let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ let chunk =
+ f.read(off, rsize as usize)?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
- break
+ break;
}
chunks.push(chunk);
} // otherwise ignore the leftover
@@ -703,11 +733,12 @@ impl WALLoader {
WALRingType::Last => {
if let Some((mut chunks, ringid_start)) = chunks.take()
{
- let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ let chunk =
+ f.read(off, rsize as usize)?.ok_or(())?;
off += rsize as u64;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
- break
+ break;
}
chunks.push(chunk);
let mut payload = Vec::new();
@@ -720,7 +751,7 @@ impl WALLoader {
ps[..c.len()].copy_from_slice(&*c);
ps = &mut ps[c.len()..];
}
- file_pool.store.apply_payload(
+ recover_func(
payload.into_boxed_slice(),
WALRingId {
start: ringid_start,