diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/wal.rs | 114 |
1 files changed, 63 insertions, 51 deletions
@@ -1,5 +1,6 @@ use async_trait::async_trait; use futures::future::{self, FutureExt, TryFutureExt}; +use std::cell::RefCell; use std::collections::BinaryHeap; use std::future::Future; use std::pin::Pin; @@ -89,17 +90,18 @@ pub trait WALFile { ) -> Result<Option<WALBytes>, ()>; } +#[async_trait(?Send)] pub trait WALStore { type FileNameIter: Iterator<Item = String>; /// Open a file given the filename, create the file if not exists when `touch` is `true`. - fn open_file( - &mut self, + async fn open_file( + &self, filename: &str, touch: bool, ) -> Result<Box<dyn WALFile>, ()>; /// Unlink a file given the filename. - fn remove_file(&mut self, filename: &str) -> Result<(), ()>; + fn remove_file(&self, filename: &str) -> Result<(), ()>; /// Enumerate all WAL filenames. It should include all WAL files that are previously opened /// (created) but not removed. The list could be unordered. fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>; @@ -107,7 +109,7 @@ pub trait WALStore { /// redoing the given operation to ensure its state is consistent. We assume the necessary /// changes by the payload has already been persistent when the callback returns. fn apply_payload( - &mut self, + &self, payload: WALBytes, ringid: WALRingId, ) -> Result<(), ()>; @@ -117,7 +119,7 @@ pub trait WALStore { /// manipulate files and their contents. struct WALFilePool<F: WALStore> { store: F, - handles: lru::LruCache<WALFileId, Box<dyn WALFile>>, + handles: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>, file_nbit: u64, file_size: u64, block_nbit: u64, @@ -129,7 +131,7 @@ impl<F: WALStore> WALFilePool<F> { let block_nbit = block_nbit as u64; WALFilePool { store, - handles: lru::LruCache::new(cache_size), + handles: RefCell::new(lru::LruCache::new(cache_size)), file_nbit, file_size: 1 << (file_nbit as u64), block_nbit, @@ -140,22 +142,25 @@ impl<F: WALStore> WALFilePool<F> { format!("{:08x}.log", fid) } - fn get_file( - &mut self, + fn get_file<'a>( + &'a self, fid: u64, touch: bool, - ) -> Result<&'static dyn WALFile, ()> { - let h = match self.handles.get(&fid) { - Some(h) => &**h, - None => { - self.handles.put( + ) -> impl Future<Output = Result<&'a dyn WALFile, ()>> { + async move { + if let Some(h) = self.handles.borrow_mut().get(&fid) { + return Ok(unsafe { &*(&**h as *const dyn WALFile) }); + } + let h = { + let mut handles = self.handles.borrow_mut(); + handles.put( fid, - self.store.open_file(&Self::get_fname(fid), touch)?, + self.store.open_file(&Self::get_fname(fid), touch).await?, ); - &**self.handles.get(&fid).unwrap() - } - }; - Ok(unsafe { &*(h as *const dyn WALFile) }) + &**handles.get(&fid).unwrap() as *const dyn WALFile + }; + Ok(unsafe { &*(h) }) + } } fn get_fid(&mut self, fname: &str) -> WALFileId { @@ -166,59 +171,65 @@ impl<F: WALStore> WALFilePool<F> { fn write<'a>( &'a mut self, writes: Vec<(WALPos, WALBytes)>, - ) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>>>>> { + ) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>> + 'a>>> { let file_size = self.file_size; let file_nbit = self.file_nbit; let meta: Vec<(u64, u64)> = writes .iter() .map(|(off, w)| ((*off) >> file_nbit, w.len() as u64)) .collect(); - let files = meta - .iter() - .map(|(fid, _)| self.get_file(*fid, true)) - .collect::<Result<Vec<_>, ()>>(); + let mut files: Vec<Pin<Box<dyn Future<Output = _> + 'a>>> = Vec::new(); + for &(fid, _) in meta.iter() { + files.push(Box::pin(self.get_file(fid, true)) + as Pin<Box<dyn Future<Output = _> + 'a>>) + } let mut fid = writes[0].0 >> file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); let mut alloc_end = alloc_start + writes[0].1.len() as u64; // pre-allocate the file space let alloc = async move { - let files = files?; - let mut last_h = files[0]; - for ((next_fid, wl), h) in meta[1..].iter().zip(files[1..].iter()) { - let next_fid = *next_fid; - let wl = *wl; - if next_fid != fid { - last_h - .allocate( - alloc_start, - (alloc_end - alloc_start) as usize, - ) - .await?; - last_h = *h; - alloc_start = 0; - alloc_end = alloc_start + wl; - fid = next_fid; + let mut last_h: Option< + Pin<Box<dyn Future<Output = Result<&'a dyn WALFile, ()>> + 'a>>, + > = None; + for ((next_fid, wl), h) in meta.into_iter().zip(files.into_iter()) { + if let Some(lh) = last_h.take() { + if next_fid != fid { + lh.await? + .allocate( + alloc_start, + (alloc_end - alloc_start) as usize, + ) + .await?; + last_h = Some(h); + alloc_start = 0; + alloc_end = alloc_start + wl; + fid = next_fid; + } else { + last_h = Some(lh); + alloc_end += wl; + } } else { - alloc_end += wl; + last_h = Some(h); } } - last_h - .allocate(alloc_start, (alloc_end - alloc_start) as usize) - .await?; + if let Some(lh) = last_h { + lh.await? + .allocate(alloc_start, (alloc_end - alloc_start) as usize) + .await? + } Ok(()) }; let mut res = Vec::new(); - let mut prev = Box::pin(alloc) as Pin<Box<dyn Future<Output = _>>>; + let mut prev = Box::pin(alloc) as Pin<Box<dyn Future<Output = _> + 'a>>; for (off, w) in writes.into_iter() { - let w = future::ready(self.get_file(off >> file_nbit, true)) - .and_then(move |f| f.write(off & (file_size - 1), w)); - let w = (async { + let f = self.get_file(off >> file_nbit, true); + let w = (async move { prev.await?; - w.await + f.await?.write(off & (file_size - 1), w).await }) .shared(); prev = Box::pin(w.clone()); - res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _>>>) + res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _> + 'a>>) } res } @@ -228,7 +239,7 @@ impl<F: WALStore> WALFilePool<F> { } fn reset(&mut self) { - self.handles.clear() + self.handles.borrow_mut().clear() } } @@ -474,7 +485,8 @@ impl WALLoader { let mut chunks = None; for fname in logfiles.iter() { let fid = file_pool.get_fid(fname); - let f = file_pool.get_file(fid, false)?; + let f = + futures::executor::block_on(file_pool.get_file(fid, false))?; let mut off = 0; while let Some(header_raw) = f.read(off, msize as usize)? { let ringid_start = (fid << file_pool.file_nbit) + off; |