diff options
author | Determinant <[email protected]> | 2020-06-12 14:30:12 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-12 14:30:12 -0400 |
commit | 5b6bb306bb1d20bdab0e1d7752f2b7b7f16a2b7e (patch) | |
tree | 6527068a490257de43f4c7a6148b44d8031bcb4b | |
parent | 4b2c4cdf5de3e7bd1df954dccc320806b18fd1fb (diff) |
async write works
-rw-r--r-- | examples/demo1.rs | 7 | ||||
-rw-r--r-- | src/wal.rs | 114 | ||||
-rw-r--r-- | tests/common/mod.rs | 33 |
3 files changed, 89 insertions, 65 deletions
diff --git a/examples/demo1.rs b/examples/demo1.rs index 5ee2b64..a360885 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -91,16 +91,17 @@ impl Drop for WALStoreTest { } } +#[async_trait(?Send)] impl WALStore for WALStoreTest { type FileNameIter = std::vec::IntoIter<String>; - fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()> { + async fn open_file(&self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()> { println!("open_file(filename={}, touch={})", filename, touch); let filename = filename.to_string(); WALFileTest::new(self.rootfd, &filename).and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>)) } - fn remove_file(&mut self, filename: &str) -> Result<(), ()> { + fn remove_file(&self, filename: &str) -> Result<(), ()> { println!("remove_file(filename={})", filename); unlinkat(Some(self.rootfd), filename, UnlinkatFlags::NoRemoveDir).or_else(|_| Err(())) } @@ -114,7 +115,7 @@ impl WALStore for WALStoreTest { Ok(logfiles.into_iter()) } - fn apply_payload(&mut self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()> { + fn apply_payload(&self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()> { println!("apply_payload(payload={}, ringid={:?})", std::str::from_utf8(&payload).unwrap(), ringid); @@ -1,5 +1,6 @@ use async_trait::async_trait; use futures::future::{self, FutureExt, TryFutureExt}; +use std::cell::RefCell; use std::collections::BinaryHeap; use std::future::Future; use std::pin::Pin; @@ -89,17 +90,18 @@ pub trait WALFile { ) -> Result<Option<WALBytes>, ()>; } +#[async_trait(?Send)] pub trait WALStore { type FileNameIter: Iterator<Item = String>; /// Open a file given the filename, create the file if not exists when `touch` is `true`. - fn open_file( - &mut self, + async fn open_file( + &self, filename: &str, touch: bool, ) -> Result<Box<dyn WALFile>, ()>; /// Unlink a file given the filename. - fn remove_file(&mut self, filename: &str) -> Result<(), ()>; + fn remove_file(&self, filename: &str) -> Result<(), ()>; /// Enumerate all WAL filenames. It should include all WAL files that are previously opened /// (created) but not removed. The list could be unordered. fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>; @@ -107,7 +109,7 @@ pub trait WALStore { /// redoing the given operation to ensure its state is consistent. We assume the necessary /// changes by the payload has already been persistent when the callback returns. fn apply_payload( - &mut self, + &self, payload: WALBytes, ringid: WALRingId, ) -> Result<(), ()>; @@ -117,7 +119,7 @@ pub trait WALStore { /// manipulate files and their contents. struct WALFilePool<F: WALStore> { store: F, - handles: lru::LruCache<WALFileId, Box<dyn WALFile>>, + handles: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>, file_nbit: u64, file_size: u64, block_nbit: u64, @@ -129,7 +131,7 @@ impl<F: WALStore> WALFilePool<F> { let block_nbit = block_nbit as u64; WALFilePool { store, - handles: lru::LruCache::new(cache_size), + handles: RefCell::new(lru::LruCache::new(cache_size)), file_nbit, file_size: 1 << (file_nbit as u64), block_nbit, @@ -140,22 +142,25 @@ impl<F: WALStore> WALFilePool<F> { format!("{:08x}.log", fid) } - fn get_file( - &mut self, + fn get_file<'a>( + &'a self, fid: u64, touch: bool, - ) -> Result<&'static dyn WALFile, ()> { - let h = match self.handles.get(&fid) { - Some(h) => &**h, - None => { - self.handles.put( + ) -> impl Future<Output = Result<&'a dyn WALFile, ()>> { + async move { + if let Some(h) = self.handles.borrow_mut().get(&fid) { + return Ok(unsafe { &*(&**h as *const dyn WALFile) }); + } + let h = { + let mut handles = self.handles.borrow_mut(); + handles.put( fid, - self.store.open_file(&Self::get_fname(fid), touch)?, + self.store.open_file(&Self::get_fname(fid), touch).await?, ); - &**self.handles.get(&fid).unwrap() - } - }; - Ok(unsafe { &*(h as *const dyn WALFile) }) + &**handles.get(&fid).unwrap() as *const dyn WALFile + }; + Ok(unsafe { &*(h) }) + } } fn get_fid(&mut self, fname: &str) -> WALFileId { @@ -166,59 +171,65 @@ impl<F: WALStore> WALFilePool<F> { fn write<'a>( &'a mut self, writes: Vec<(WALPos, WALBytes)>, - ) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>>>>> { + ) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>> + 'a>>> { let file_size = self.file_size; let file_nbit = self.file_nbit; let meta: Vec<(u64, u64)> = writes .iter() .map(|(off, w)| ((*off) >> file_nbit, w.len() as u64)) .collect(); - let files = meta - .iter() - .map(|(fid, _)| self.get_file(*fid, true)) - .collect::<Result<Vec<_>, ()>>(); + let mut files: Vec<Pin<Box<dyn Future<Output = _> + 'a>>> = Vec::new(); + for &(fid, _) in meta.iter() { + files.push(Box::pin(self.get_file(fid, true)) + as Pin<Box<dyn Future<Output = _> + 'a>>) + } let mut fid = writes[0].0 >> file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); let mut alloc_end = alloc_start + writes[0].1.len() as u64; // pre-allocate the file space let alloc = async move { - let files = files?; - let mut last_h = files[0]; - for ((next_fid, wl), h) in meta[1..].iter().zip(files[1..].iter()) { - let next_fid = *next_fid; - let wl = *wl; - if next_fid != fid { - last_h - .allocate( - alloc_start, - (alloc_end - alloc_start) as usize, - ) - .await?; - last_h = *h; - alloc_start = 0; - alloc_end = alloc_start + wl; - fid = next_fid; + let mut last_h: Option< + Pin<Box<dyn Future<Output = Result<&'a dyn WALFile, ()>> + 'a>>, + > = None; + for ((next_fid, wl), h) in meta.into_iter().zip(files.into_iter()) { + if let Some(lh) = last_h.take() { + if next_fid != fid { + lh.await? + .allocate( + alloc_start, + (alloc_end - alloc_start) as usize, + ) + .await?; + last_h = Some(h); + alloc_start = 0; + alloc_end = alloc_start + wl; + fid = next_fid; + } else { + last_h = Some(lh); + alloc_end += wl; + } } else { - alloc_end += wl; + last_h = Some(h); } } - last_h - .allocate(alloc_start, (alloc_end - alloc_start) as usize) - .await?; + if let Some(lh) = last_h { + lh.await? + .allocate(alloc_start, (alloc_end - alloc_start) as usize) + .await? + } Ok(()) }; let mut res = Vec::new(); - let mut prev = Box::pin(alloc) as Pin<Box<dyn Future<Output = _>>>; + let mut prev = Box::pin(alloc) as Pin<Box<dyn Future<Output = _> + 'a>>; for (off, w) in writes.into_iter() { - let w = future::ready(self.get_file(off >> file_nbit, true)) - .and_then(move |f| f.write(off & (file_size - 1), w)); - let w = (async { + let f = self.get_file(off >> file_nbit, true); + let w = (async move { prev.await?; - w.await + f.await?.write(off & (file_size - 1), w).await }) .shared(); prev = Box::pin(w.clone()); - res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _>>>) + res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _> + 'a>>) } res } @@ -228,7 +239,7 @@ impl<F: WALStore> WALFilePool<F> { } fn reset(&mut self) { - self.handles.clear() + self.handles.borrow_mut().clear() } } @@ -474,7 +485,8 @@ impl WALLoader { let mut chunks = None; for fname in logfiles.iter() { let fid = file_pool.get_fid(fname); - let f = file_pool.get_file(fid, false)?; + let f = + futures::executor::block_on(file_pool.get_file(fid, false))?; let mut off = 0; while let Some(header_raw) = f.read(off, msize as usize)? { let ringid_start = (fid << file_pool.file_nbit) + off; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 70c8d8e..5fc422f 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -103,7 +103,11 @@ impl WALStoreEmulState { files: HashMap::new(), } } - pub fn clone(&self) -> Self { WALStoreEmulState{ files: self.files.clone() }} + pub fn clone(&self) -> Self { + WALStoreEmulState { + files: self.files.clone(), + } + } } /// Emulate the persistent storage state. @@ -112,9 +116,9 @@ where G: FailGen, F: FnMut(WALBytes, WALRingId), { - state: &'a mut WALStoreEmulState, + state: RefCell<&'a mut WALStoreEmulState>, fgen: Rc<G>, - recover: F, + recover: RefCell<F>, } impl<'a, G: FailGen, F: FnMut(WALBytes, WALRingId)> WALStoreEmul<'a, G, F> { @@ -123,6 +127,8 @@ impl<'a, G: FailGen, F: FnMut(WALBytes, WALRingId)> WALStoreEmul<'a, G, F> { fgen: Rc<G>, recover: F, ) -> Self { + let state = RefCell::new(state); + let recover = RefCell::new(recover); WALStoreEmul { state, fgen, @@ -131,6 +137,7 @@ impl<'a, G: FailGen, F: FnMut(WALBytes, WALRingId)> WALStoreEmul<'a, G, F> { } } +#[async_trait(?Send)] impl<'a, G, F> WALStore for WALStoreEmul<'a, G, F> where G: 'static + FailGen, @@ -138,15 +145,15 @@ where { type FileNameIter = std::vec::IntoIter<String>; - fn open_file( - &mut self, + async fn open_file( + &self, filename: &str, touch: bool, ) -> Result<Box<dyn WALFile>, ()> { if self.fgen.next_fail() { return Err(()); } - match self.state.files.entry(filename.to_string()) { + match self.state.borrow_mut().files.entry(filename.to_string()) { hash_map::Entry::Occupied(e) => Ok(Box::new(WALFileEmul { file: e.get().clone(), fgen: self.fgen.clone(), @@ -164,12 +171,13 @@ where } } - fn remove_file(&mut self, filename: &str) -> Result<(), ()> { + fn remove_file(&self, filename: &str) -> Result<(), ()> { //println!("remove_file(filename={})", filename); if self.fgen.next_fail() { return Err(()); } self.state + .borrow_mut() .files .remove(filename) .ok_or(()) @@ -181,14 +189,14 @@ where return Err(()); } let mut logfiles = Vec::new(); - for (fname, _) in self.state.files.iter() { + for (fname, _) in self.state.borrow().files.iter() { logfiles.push(fname.clone()) } Ok(logfiles.into_iter()) } fn apply_payload( - &mut self, + &self, payload: WALBytes, ringid: WALRingId, ) -> Result<(), ()> { @@ -200,7 +208,7 @@ where hex::encode(&payload), ringid); */ - (self.recover)(payload, ringid); + (&mut *self.recover.borrow_mut())(payload, ringid); Ok(()) } } @@ -544,7 +552,10 @@ impl PaintingSim { // write ahead let rids = wal.grow(payloads); assert_eq!(pss.len(), rids.len()); - let rids = rids.into_iter().map(|r| futures::executor::block_on(r)).collect::<Vec<_>>(); + let rids = rids + .into_iter() + .map(|r| futures::executor::block_on(r)) + .collect::<Vec<_>>(); // keep track of the operations // grow could fail for (ps, rid) in pss.iter().zip(rids.iter()) { |