diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/wal.rs | 79 |
1 files changed, 53 insertions, 26 deletions
@@ -69,6 +69,8 @@ struct WALState { next: WALPos, /// number of bits for a file file_nbit: u64, + next_complete: WALPos, + io_complete: BinaryHeap<WALRingId>, } #[async_trait(?Send)] @@ -101,7 +103,7 @@ pub trait WALStore { touch: bool, ) -> Result<Box<dyn WALFile>, ()>; /// Unlink a file given the filename. - fn remove_file(&self, filename: &str) -> Result<(), ()>; + async fn remove_file(&self, filename: String) -> Result<(), ()>; /// Enumerate all WAL filenames. It should include all WAL files that are previously opened /// (created) but not removed. The list could be unordered. fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>; @@ -122,6 +124,8 @@ struct WALFilePool<F: WALStore> { handles: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>, last_write: UnsafeCell<MaybeUninit<Pin<Box<dyn Future<Output = Result<(), ()>>>>>>, + last_peel: + UnsafeCell<MaybeUninit<Pin<Box<dyn Future<Output = Result<(), ()>>>>>>, file_nbit: u64, file_size: u64, block_nbit: u64, @@ -137,6 +141,9 @@ impl<F: WALStore> WALFilePool<F> { last_write: UnsafeCell::new(MaybeUninit::new(Box::pin( future::ready(Ok(())), ))), + last_peel: UnsafeCell::new(MaybeUninit::new(Box::pin( + future::ready(Ok(())), + ))), file_nbit, file_size: 1 << (file_nbit as u64), block_nbit, @@ -172,7 +179,6 @@ impl<F: WALStore> WALFilePool<F> { scan_fmt!(fname, "{x}.log", [hex WALFileId]).unwrap() } - // TODO: evict stale handles fn write<'a>( &'a mut self, writes: Vec<(WALPos, WALBytes)>, @@ -253,8 +259,33 @@ impl<F: WALStore> WALFilePool<F> { res } - fn remove_file(&mut self, fid: u64) -> Result<(), ()> { - self.store.remove_file(&Self::get_fname(fid)) + fn remove_files<'a>(&'a mut self, fid_s: u64, fid_e: u64) -> impl Future<Output = Result<(), ()>> + 'a { + let last_peel = unsafe { + std::mem::replace( + &mut *self.last_peel.get(), + std::mem::MaybeUninit::uninit(), + ) + .assume_init() + }; + + let mut removes = Vec::new(); + for fid in fid_s..fid_e { + removes.push(self.store.remove_file(Self::get_fname(fid)) + as Pin<Box<dyn Future<Output = _> + 'a>>) + } + let p = async move { + last_peel.await?; + for r in removes.into_iter() { + r.await? + } + Ok(()) + }.shared(); + unsafe { + (*self.last_peel.get()) = MaybeUninit::new( + std::mem::transmute( + Box::pin(p.clone()) as Pin<Box<dyn Future<Output = _> + 'a>>)) + } + p } fn reset(&mut self) { @@ -267,8 +298,6 @@ pub struct WALWriter<F: WALStore> { file_pool: WALFilePool<F>, block_buffer: WALBytes, block_size: u32, - next_complete: WALPos, - io_complete: BinaryHeap<WALRingId>, msize: usize, } @@ -283,8 +312,6 @@ impl<F: WALStore> WALWriter<F> { file_pool, block_buffer: b.into_boxed_slice(), block_size, - next_complete: 0, - io_complete: BinaryHeap::new(), msize, } } @@ -430,34 +457,32 @@ impl<F: WALStore> WALWriter<F> { /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are /// complete so that it could automatically remove obsolete WAL files. - pub fn peel<T: AsRef<[WALRingId]>>( - &mut self, + pub fn peel<'a, T: AsRef<[WALRingId]>>( + &'a mut self, records: T, - ) -> Result<(), ()> { + ) -> impl Future<Output = Result<(), ()>> + 'a { let msize = self.msize as u64; let block_size = self.block_size as u64; + let state = &mut self.state; for rec in records.as_ref() { - self.io_complete.push(*rec); + state.io_complete.push(*rec); } - let orig_fid = self.state.first_fid; - while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) + let orig_fid = state.first_fid; + while let Some(s) = state.io_complete.peek().and_then(|&e| Some(e.start)) { - if s != self.next_complete { + if s != state.next_complete { break; } - let mut m = self.io_complete.pop().unwrap(); + let mut m = state.io_complete.pop().unwrap(); let block_remain = block_size - (m.end & (block_size - 1)); if block_remain <= msize as u64 { m.end += block_remain } - self.next_complete = m.end - } - let next_fid = self.next_complete >> self.state.file_nbit; - for fid in orig_fid..next_fid { - self.file_pool.remove_file(fid)?; + state.next_complete = m.end } - self.state.first_fid = next_fid; - Ok(()) + let next_fid = state.next_complete >> state.file_nbit; + state.first_fid = next_fid; + self.file_pool.remove_files(orig_fid, next_fid) } } @@ -502,8 +527,8 @@ impl WALLoader { // TODO: check for missing logfiles logfiles.sort(); let mut chunks = None; - for fname in logfiles.iter() { - let fid = file_pool.get_fid(fname); + for fname in logfiles.into_iter() { + let fid = file_pool.get_fid(&fname); let f = futures::executor::block_on(file_pool.get_file(fid, false))?; let mut off = 0; @@ -581,7 +606,7 @@ impl WALLoader { } } f.truncate(0)?; - file_pool.remove_file(fid)?; + futures::executor::block_on(file_pool.store.remove_file(fname))?; } file_pool.reset(); Ok(WALWriter::new( @@ -589,6 +614,8 @@ impl WALLoader { first_fid: 0, next: 0, file_nbit: file_pool.file_nbit, + next_complete: 0, + io_complete: BinaryHeap::new(), }, file_pool, )) |