summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/wal.rs83
-rw-r--r--tests/common/mod.rs1
2 files changed, 65 insertions, 19 deletions
diff --git a/src/wal.rs b/src/wal.rs
index ff76bdb..fe4f11d 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use futures::future::{self, FutureExt, TryFutureExt};
use std::cell::{RefCell, UnsafeCell};
-use std::collections::BinaryHeap;
+use std::collections::{BinaryHeap, HashMap, hash_map};
use std::future::Future;
use std::mem::MaybeUninit;
use std::pin::Pin;
@@ -117,11 +117,31 @@ pub trait WALStore {
) -> Result<(), ()>;
}
+struct WALFileHandle<'a, F: WALStore> {
+ fid: WALFileId,
+ handle: &'a dyn WALFile,
+ pool: *const WALFilePool<F>,
+}
+
+impl<'a, F: WALStore> std::ops::Deref for WALFileHandle<'a, F> {
+ type Target = dyn WALFile + 'a;
+ fn deref(&self) -> &Self::Target { self.handle }
+}
+
+impl<'a, F: WALStore> Drop for WALFileHandle<'a, F> {
+ fn drop(&mut self) {
+ unsafe {
+ (&*self.pool).release_file(self.fid);
+ }
+ }
+}
+
/// The middle layer that manages WAL file handles and invokes public trait functions to actually
/// manipulate files and their contents.
struct WALFilePool<F: WALStore> {
store: F,
- handles: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>,
+ handle_cache: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>,
+ handle_used: RefCell<HashMap<WALFileId, UnsafeCell<(Box<dyn WALFile>, usize)>>>,
last_write:
UnsafeCell<MaybeUninit<Pin<Box<dyn Future<Output = Result<(), ()>>>>>>,
last_peel:
@@ -137,7 +157,8 @@ impl<F: WALStore> WALFilePool<F> {
let block_nbit = block_nbit as u64;
WALFilePool {
store,
- handles: RefCell::new(lru::LruCache::new(cache_size)),
+ handle_cache: RefCell::new(lru::LruCache::new(cache_size)),
+ handle_used: RefCell::new(HashMap::new()),
last_write: UnsafeCell::new(MaybeUninit::new(Box::pin(
future::ready(Ok(())),
))),
@@ -154,24 +175,41 @@ impl<F: WALStore> WALFilePool<F> {
format!("{:08x}.log", fid)
}
- fn get_file(
- &self,
+ fn get_file<'a>(
+ &'a self,
fid: u64,
touch: bool,
- ) -> impl Future<Output = Result<&dyn WALFile, ()>> {
+ ) -> impl Future<Output = Result<WALFileHandle<'a, F>, ()>> {
async move {
- if let Some(h) = self.handles.borrow_mut().get(&fid) {
- return Ok(unsafe { &*(&**h as *const dyn WALFile) });
+ let pool = self as *const WALFilePool<F>;
+ if let Some(h) = self.handle_cache.borrow_mut().pop(&fid) {
+ let handle = match self.handle_used.borrow_mut().entry(fid) {
+ hash_map::Entry::Vacant(e) => unsafe {&*(*e.insert(UnsafeCell::new((h, 1))).get()).0},
+ _ => unreachable!(),
+ };
+ Ok(WALFileHandle { fid, handle, pool })
+ } else {
+ let v = unsafe{&mut *match self.handle_used.borrow_mut().entry(fid) {
+ hash_map::Entry::Occupied(e) => e.into_mut(),
+ hash_map::Entry::Vacant(e) => e.insert(
+ UnsafeCell::new((self.store.open_file(&Self::get_fname(fid), touch).await?, 0)))
+ }.get()};
+ v.1 += 1;
+ Ok(WALFileHandle { fid, handle: &*v.0, pool })
}
- let h = {
- let mut handles = self.handles.borrow_mut();
- handles.put(
- fid,
- self.store.open_file(&Self::get_fname(fid), touch).await?,
- );
- &**handles.get(&fid).unwrap() as *const dyn WALFile
- };
- Ok(unsafe { &*(h) })
+ }
+ }
+
+ fn release_file(&self, fid: WALFileId) {
+ match self.handle_used.borrow_mut().entry(fid) {
+ hash_map::Entry::Occupied(e) => {
+ let v = unsafe{&mut *e.get().get()};
+ v.1 -= 1;
+ if v.1 == 0 {
+ self.handle_cache.borrow_mut().put(fid, e.remove().into_inner().0);
+ }
+ },
+ _ => unreachable!()
}
}
@@ -208,7 +246,7 @@ impl<F: WALStore> WALFilePool<F> {
let alloc = async move {
last_write.await?;
let mut last_h: Option<
- Pin<Box<dyn Future<Output = Result<&'a dyn WALFile, ()>> + 'a>>,
+ Pin<Box<dyn Future<Output = Result<WALFileHandle<'a, F>, ()>> + 'a>>,
> = None;
for ((next_fid, wl), h) in meta.into_iter().zip(files.into_iter()) {
if let Some(lh) = last_h.take() {
@@ -293,8 +331,13 @@ impl<F: WALStore> WALFilePool<F> {
p
}
+ fn in_use_len(&self) -> usize {
+ self.handle_used.borrow().len()
+ }
+
fn reset(&mut self) {
- self.handles.borrow_mut().clear()
+ self.handle_cache.borrow_mut().clear();
+ self.handle_used.borrow_mut().clear()
}
}
@@ -493,6 +536,8 @@ impl<F: WALStore> WALWriter<F> {
state.first_fid = next_fid;
self.file_pool.remove_files(orig_fid, next_fid)
}
+
+ pub fn file_pool_in_use(&self) -> usize { self.file_pool.in_use_len() }
}
pub struct WALLoader {
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index ba35654..d4645f9 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -590,6 +590,7 @@ impl PaintingSim {
}
}
//canvas.print(40);
+ assert_eq!(wal.file_pool_in_use(), 0);
Ok(())
}