aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/wal.rs79
1 files changed, 53 insertions, 26 deletions
diff --git a/src/wal.rs b/src/wal.rs
index 454fbd0..8d92d01 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -69,6 +69,8 @@ struct WALState {
next: WALPos,
/// number of bits for a file
file_nbit: u64,
+ next_complete: WALPos,
+ io_complete: BinaryHeap<WALRingId>,
}
#[async_trait(?Send)]
@@ -101,7 +103,7 @@ pub trait WALStore {
touch: bool,
) -> Result<Box<dyn WALFile>, ()>;
/// Unlink a file given the filename.
- fn remove_file(&self, filename: &str) -> Result<(), ()>;
+ async fn remove_file(&self, filename: String) -> Result<(), ()>;
/// Enumerate all WAL filenames. It should include all WAL files that are previously opened
/// (created) but not removed. The list could be unordered.
fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>;
@@ -122,6 +124,8 @@ struct WALFilePool<F: WALStore> {
handles: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>,
last_write:
UnsafeCell<MaybeUninit<Pin<Box<dyn Future<Output = Result<(), ()>>>>>>,
+ last_peel:
+ UnsafeCell<MaybeUninit<Pin<Box<dyn Future<Output = Result<(), ()>>>>>>,
file_nbit: u64,
file_size: u64,
block_nbit: u64,
@@ -137,6 +141,9 @@ impl<F: WALStore> WALFilePool<F> {
last_write: UnsafeCell::new(MaybeUninit::new(Box::pin(
future::ready(Ok(())),
))),
+ last_peel: UnsafeCell::new(MaybeUninit::new(Box::pin(
+ future::ready(Ok(())),
+ ))),
file_nbit,
file_size: 1 << (file_nbit as u64),
block_nbit,
@@ -172,7 +179,6 @@ impl<F: WALStore> WALFilePool<F> {
scan_fmt!(fname, "{x}.log", [hex WALFileId]).unwrap()
}
- // TODO: evict stale handles
fn write<'a>(
&'a mut self,
writes: Vec<(WALPos, WALBytes)>,
@@ -253,8 +259,33 @@ impl<F: WALStore> WALFilePool<F> {
res
}
- fn remove_file(&mut self, fid: u64) -> Result<(), ()> {
- self.store.remove_file(&Self::get_fname(fid))
+ fn remove_files<'a>(&'a mut self, fid_s: u64, fid_e: u64) -> impl Future<Output = Result<(), ()>> + 'a {
+ let last_peel = unsafe {
+ std::mem::replace(
+ &mut *self.last_peel.get(),
+ std::mem::MaybeUninit::uninit(),
+ )
+ .assume_init()
+ };
+
+ let mut removes = Vec::new();
+ for fid in fid_s..fid_e {
+ removes.push(self.store.remove_file(Self::get_fname(fid))
+ as Pin<Box<dyn Future<Output = _> + 'a>>)
+ }
+ let p = async move {
+ last_peel.await?;
+ for r in removes.into_iter() {
+ r.await?
+ }
+ Ok(())
+ }.shared();
+ unsafe {
+ (*self.last_peel.get()) = MaybeUninit::new(
+ std::mem::transmute(
+ Box::pin(p.clone()) as Pin<Box<dyn Future<Output = _> + 'a>>))
+ }
+ p
}
fn reset(&mut self) {
@@ -267,8 +298,6 @@ pub struct WALWriter<F: WALStore> {
file_pool: WALFilePool<F>,
block_buffer: WALBytes,
block_size: u32,
- next_complete: WALPos,
- io_complete: BinaryHeap<WALRingId>,
msize: usize,
}
@@ -283,8 +312,6 @@ impl<F: WALStore> WALWriter<F> {
file_pool,
block_buffer: b.into_boxed_slice(),
block_size,
- next_complete: 0,
- io_complete: BinaryHeap::new(),
msize,
}
}
@@ -430,34 +457,32 @@ impl<F: WALStore> WALWriter<F> {
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
/// complete so that it could automatically remove obsolete WAL files.
- pub fn peel<T: AsRef<[WALRingId]>>(
- &mut self,
+ pub fn peel<'a, T: AsRef<[WALRingId]>>(
+ &'a mut self,
records: T,
- ) -> Result<(), ()> {
+ ) -> impl Future<Output = Result<(), ()>> + 'a {
let msize = self.msize as u64;
let block_size = self.block_size as u64;
+ let state = &mut self.state;
for rec in records.as_ref() {
- self.io_complete.push(*rec);
+ state.io_complete.push(*rec);
}
- let orig_fid = self.state.first_fid;
- while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start))
+ let orig_fid = state.first_fid;
+ while let Some(s) = state.io_complete.peek().and_then(|&e| Some(e.start))
{
- if s != self.next_complete {
+ if s != state.next_complete {
break;
}
- let mut m = self.io_complete.pop().unwrap();
+ let mut m = state.io_complete.pop().unwrap();
let block_remain = block_size - (m.end & (block_size - 1));
if block_remain <= msize as u64 {
m.end += block_remain
}
- self.next_complete = m.end
- }
- let next_fid = self.next_complete >> self.state.file_nbit;
- for fid in orig_fid..next_fid {
- self.file_pool.remove_file(fid)?;
+ state.next_complete = m.end
}
- self.state.first_fid = next_fid;
- Ok(())
+ let next_fid = state.next_complete >> state.file_nbit;
+ state.first_fid = next_fid;
+ self.file_pool.remove_files(orig_fid, next_fid)
}
}
@@ -502,8 +527,8 @@ impl WALLoader {
// TODO: check for missing logfiles
logfiles.sort();
let mut chunks = None;
- for fname in logfiles.iter() {
- let fid = file_pool.get_fid(fname);
+ for fname in logfiles.into_iter() {
+ let fid = file_pool.get_fid(&fname);
let f =
futures::executor::block_on(file_pool.get_file(fid, false))?;
let mut off = 0;
@@ -581,7 +606,7 @@ impl WALLoader {
}
}
f.truncate(0)?;
- file_pool.remove_file(fid)?;
+ futures::executor::block_on(file_pool.store.remove_file(fname))?;
}
file_pool.reset();
Ok(WALWriter::new(
@@ -589,6 +614,8 @@ impl WALLoader {
first_fid: 0,
next: 0,
file_nbit: file_pool.file_nbit,
+ next_complete: 0,
+ io_complete: BinaryHeap::new(),
},
file_pool,
))