aboutsummaryrefslogtreecommitdiff
path: root/src/wal.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wal.rs')
-rw-r--r--src/wal.rs114
1 files changed, 63 insertions, 51 deletions
diff --git a/src/wal.rs b/src/wal.rs
index 761ea82..6f3b424 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use futures::future::{self, FutureExt, TryFutureExt};
+use std::cell::RefCell;
use std::collections::BinaryHeap;
use std::future::Future;
use std::pin::Pin;
@@ -89,17 +90,18 @@ pub trait WALFile {
) -> Result<Option<WALBytes>, ()>;
}
+#[async_trait(?Send)]
pub trait WALStore {
type FileNameIter: Iterator<Item = String>;
/// Open a file given the filename, create the file if not exists when `touch` is `true`.
- fn open_file(
- &mut self,
+ async fn open_file(
+ &self,
filename: &str,
touch: bool,
) -> Result<Box<dyn WALFile>, ()>;
/// Unlink a file given the filename.
- fn remove_file(&mut self, filename: &str) -> Result<(), ()>;
+ fn remove_file(&self, filename: &str) -> Result<(), ()>;
/// Enumerate all WAL filenames. It should include all WAL files that are previously opened
/// (created) but not removed. The list could be unordered.
fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>;
@@ -107,7 +109,7 @@ pub trait WALStore {
/// redoing the given operation to ensure its state is consistent. We assume the necessary
/// changes by the payload has already been persistent when the callback returns.
fn apply_payload(
- &mut self,
+ &self,
payload: WALBytes,
ringid: WALRingId,
) -> Result<(), ()>;
@@ -117,7 +119,7 @@ pub trait WALStore {
/// manipulate files and their contents.
struct WALFilePool<F: WALStore> {
store: F,
- handles: lru::LruCache<WALFileId, Box<dyn WALFile>>,
+ handles: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>,
file_nbit: u64,
file_size: u64,
block_nbit: u64,
@@ -129,7 +131,7 @@ impl<F: WALStore> WALFilePool<F> {
let block_nbit = block_nbit as u64;
WALFilePool {
store,
- handles: lru::LruCache::new(cache_size),
+ handles: RefCell::new(lru::LruCache::new(cache_size)),
file_nbit,
file_size: 1 << (file_nbit as u64),
block_nbit,
@@ -140,22 +142,25 @@ impl<F: WALStore> WALFilePool<F> {
format!("{:08x}.log", fid)
}
- fn get_file(
- &mut self,
+ fn get_file<'a>(
+ &'a self,
fid: u64,
touch: bool,
- ) -> Result<&'static dyn WALFile, ()> {
- let h = match self.handles.get(&fid) {
- Some(h) => &**h,
- None => {
- self.handles.put(
+ ) -> impl Future<Output = Result<&'a dyn WALFile, ()>> {
+ async move {
+ if let Some(h) = self.handles.borrow_mut().get(&fid) {
+ return Ok(unsafe { &*(&**h as *const dyn WALFile) });
+ }
+ let h = {
+ let mut handles = self.handles.borrow_mut();
+ handles.put(
fid,
- self.store.open_file(&Self::get_fname(fid), touch)?,
+ self.store.open_file(&Self::get_fname(fid), touch).await?,
);
- &**self.handles.get(&fid).unwrap()
- }
- };
- Ok(unsafe { &*(h as *const dyn WALFile) })
+ &**handles.get(&fid).unwrap() as *const dyn WALFile
+ };
+ Ok(unsafe { &*(h) })
+ }
}
fn get_fid(&mut self, fname: &str) -> WALFileId {
@@ -166,59 +171,65 @@ impl<F: WALStore> WALFilePool<F> {
fn write<'a>(
&'a mut self,
writes: Vec<(WALPos, WALBytes)>,
- ) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>>>>> {
+ ) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>> + 'a>>> {
let file_size = self.file_size;
let file_nbit = self.file_nbit;
let meta: Vec<(u64, u64)> = writes
.iter()
.map(|(off, w)| ((*off) >> file_nbit, w.len() as u64))
.collect();
- let files = meta
- .iter()
- .map(|(fid, _)| self.get_file(*fid, true))
- .collect::<Result<Vec<_>, ()>>();
+ let mut files: Vec<Pin<Box<dyn Future<Output = _> + 'a>>> = Vec::new();
+ for &(fid, _) in meta.iter() {
+ files.push(Box::pin(self.get_file(fid, true))
+ as Pin<Box<dyn Future<Output = _> + 'a>>)
+ }
let mut fid = writes[0].0 >> file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
let mut alloc_end = alloc_start + writes[0].1.len() as u64;
// pre-allocate the file space
let alloc = async move {
- let files = files?;
- let mut last_h = files[0];
- for ((next_fid, wl), h) in meta[1..].iter().zip(files[1..].iter()) {
- let next_fid = *next_fid;
- let wl = *wl;
- if next_fid != fid {
- last_h
- .allocate(
- alloc_start,
- (alloc_end - alloc_start) as usize,
- )
- .await?;
- last_h = *h;
- alloc_start = 0;
- alloc_end = alloc_start + wl;
- fid = next_fid;
+ let mut last_h: Option<
+ Pin<Box<dyn Future<Output = Result<&'a dyn WALFile, ()>> + 'a>>,
+ > = None;
+ for ((next_fid, wl), h) in meta.into_iter().zip(files.into_iter()) {
+ if let Some(lh) = last_h.take() {
+ if next_fid != fid {
+ lh.await?
+ .allocate(
+ alloc_start,
+ (alloc_end - alloc_start) as usize,
+ )
+ .await?;
+ last_h = Some(h);
+ alloc_start = 0;
+ alloc_end = alloc_start + wl;
+ fid = next_fid;
+ } else {
+ last_h = Some(lh);
+ alloc_end += wl;
+ }
} else {
- alloc_end += wl;
+ last_h = Some(h);
}
}
- last_h
- .allocate(alloc_start, (alloc_end - alloc_start) as usize)
- .await?;
+ if let Some(lh) = last_h {
+ lh.await?
+ .allocate(alloc_start, (alloc_end - alloc_start) as usize)
+ .await?
+ }
Ok(())
};
let mut res = Vec::new();
- let mut prev = Box::pin(alloc) as Pin<Box<dyn Future<Output = _>>>;
+ let mut prev = Box::pin(alloc) as Pin<Box<dyn Future<Output = _> + 'a>>;
for (off, w) in writes.into_iter() {
- let w = future::ready(self.get_file(off >> file_nbit, true))
- .and_then(move |f| f.write(off & (file_size - 1), w));
- let w = (async {
+ let f = self.get_file(off >> file_nbit, true);
+ let w = (async move {
prev.await?;
- w.await
+ f.await?.write(off & (file_size - 1), w).await
})
.shared();
prev = Box::pin(w.clone());
- res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _>>>)
+ res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _> + 'a>>)
}
res
}
@@ -228,7 +239,7 @@ impl<F: WALStore> WALFilePool<F> {
}
fn reset(&mut self) {
- self.handles.clear()
+ self.handles.borrow_mut().clear()
}
}
@@ -474,7 +485,8 @@ impl WALLoader {
let mut chunks = None;
for fname in logfiles.iter() {
let fid = file_pool.get_fid(fname);
- let f = file_pool.get_file(fid, false)?;
+ let f =
+ futures::executor::block_on(file_pool.get_file(fid, false))?;
let mut off = 0;
while let Some(header_raw) = f.read(off, msize as usize)? {
let ringid_start = (fid << file_pool.file_nbit) + off;