summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-06-12 14:30:12 -0400
committerDeterminant <[email protected]>2020-06-12 14:30:12 -0400
commit5b6bb306bb1d20bdab0e1d7752f2b7b7f16a2b7e (patch)
tree6527068a490257de43f4c7a6148b44d8031bcb4b
parent4b2c4cdf5de3e7bd1df954dccc320806b18fd1fb (diff)
async write works
-rw-r--r--examples/demo1.rs7
-rw-r--r--src/wal.rs114
-rw-r--r--tests/common/mod.rs33
3 files changed, 89 insertions, 65 deletions
diff --git a/examples/demo1.rs b/examples/demo1.rs
index 5ee2b64..a360885 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -91,16 +91,17 @@ impl Drop for WALStoreTest {
}
}
+#[async_trait(?Send)]
impl WALStore for WALStoreTest {
type FileNameIter = std::vec::IntoIter<String>;
- fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()> {
+ async fn open_file(&self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()> {
println!("open_file(filename={}, touch={})", filename, touch);
let filename = filename.to_string();
WALFileTest::new(self.rootfd, &filename).and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>))
}
- fn remove_file(&mut self, filename: &str) -> Result<(), ()> {
+ fn remove_file(&self, filename: &str) -> Result<(), ()> {
println!("remove_file(filename={})", filename);
unlinkat(Some(self.rootfd), filename, UnlinkatFlags::NoRemoveDir).or_else(|_| Err(()))
}
@@ -114,7 +115,7 @@ impl WALStore for WALStoreTest {
Ok(logfiles.into_iter())
}
- fn apply_payload(&mut self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()> {
+ fn apply_payload(&self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()> {
println!("apply_payload(payload={}, ringid={:?})",
std::str::from_utf8(&payload).unwrap(),
ringid);
diff --git a/src/wal.rs b/src/wal.rs
index 761ea82..6f3b424 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use futures::future::{self, FutureExt, TryFutureExt};
+use std::cell::RefCell;
use std::collections::BinaryHeap;
use std::future::Future;
use std::pin::Pin;
@@ -89,17 +90,18 @@ pub trait WALFile {
) -> Result<Option<WALBytes>, ()>;
}
+#[async_trait(?Send)]
pub trait WALStore {
type FileNameIter: Iterator<Item = String>;
/// Open a file given the filename, create the file if not exists when `touch` is `true`.
- fn open_file(
- &mut self,
+ async fn open_file(
+ &self,
filename: &str,
touch: bool,
) -> Result<Box<dyn WALFile>, ()>;
/// Unlink a file given the filename.
- fn remove_file(&mut self, filename: &str) -> Result<(), ()>;
+ fn remove_file(&self, filename: &str) -> Result<(), ()>;
/// Enumerate all WAL filenames. It should include all WAL files that are previously opened
/// (created) but not removed. The list could be unordered.
fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>;
@@ -107,7 +109,7 @@ pub trait WALStore {
/// redoing the given operation to ensure its state is consistent. We assume the necessary
/// changes by the payload has already been persistent when the callback returns.
fn apply_payload(
- &mut self,
+ &self,
payload: WALBytes,
ringid: WALRingId,
) -> Result<(), ()>;
@@ -117,7 +119,7 @@ pub trait WALStore {
/// manipulate files and their contents.
struct WALFilePool<F: WALStore> {
store: F,
- handles: lru::LruCache<WALFileId, Box<dyn WALFile>>,
+ handles: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>,
file_nbit: u64,
file_size: u64,
block_nbit: u64,
@@ -129,7 +131,7 @@ impl<F: WALStore> WALFilePool<F> {
let block_nbit = block_nbit as u64;
WALFilePool {
store,
- handles: lru::LruCache::new(cache_size),
+ handles: RefCell::new(lru::LruCache::new(cache_size)),
file_nbit,
file_size: 1 << (file_nbit as u64),
block_nbit,
@@ -140,22 +142,25 @@ impl<F: WALStore> WALFilePool<F> {
format!("{:08x}.log", fid)
}
- fn get_file(
- &mut self,
+ fn get_file<'a>(
+ &'a self,
fid: u64,
touch: bool,
- ) -> Result<&'static dyn WALFile, ()> {
- let h = match self.handles.get(&fid) {
- Some(h) => &**h,
- None => {
- self.handles.put(
+ ) -> impl Future<Output = Result<&'a dyn WALFile, ()>> {
+ async move {
+ if let Some(h) = self.handles.borrow_mut().get(&fid) {
+ return Ok(unsafe { &*(&**h as *const dyn WALFile) });
+ }
+ let h = {
+ let mut handles = self.handles.borrow_mut();
+ handles.put(
fid,
- self.store.open_file(&Self::get_fname(fid), touch)?,
+ self.store.open_file(&Self::get_fname(fid), touch).await?,
);
- &**self.handles.get(&fid).unwrap()
- }
- };
- Ok(unsafe { &*(h as *const dyn WALFile) })
+ &**handles.get(&fid).unwrap() as *const dyn WALFile
+ };
+ Ok(unsafe { &*(h) })
+ }
}
fn get_fid(&mut self, fname: &str) -> WALFileId {
@@ -166,59 +171,65 @@ impl<F: WALStore> WALFilePool<F> {
fn write<'a>(
&'a mut self,
writes: Vec<(WALPos, WALBytes)>,
- ) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>>>>> {
+ ) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>> + 'a>>> {
let file_size = self.file_size;
let file_nbit = self.file_nbit;
let meta: Vec<(u64, u64)> = writes
.iter()
.map(|(off, w)| ((*off) >> file_nbit, w.len() as u64))
.collect();
- let files = meta
- .iter()
- .map(|(fid, _)| self.get_file(*fid, true))
- .collect::<Result<Vec<_>, ()>>();
+ let mut files: Vec<Pin<Box<dyn Future<Output = _> + 'a>>> = Vec::new();
+ for &(fid, _) in meta.iter() {
+ files.push(Box::pin(self.get_file(fid, true))
+ as Pin<Box<dyn Future<Output = _> + 'a>>)
+ }
let mut fid = writes[0].0 >> file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
let mut alloc_end = alloc_start + writes[0].1.len() as u64;
// pre-allocate the file space
let alloc = async move {
- let files = files?;
- let mut last_h = files[0];
- for ((next_fid, wl), h) in meta[1..].iter().zip(files[1..].iter()) {
- let next_fid = *next_fid;
- let wl = *wl;
- if next_fid != fid {
- last_h
- .allocate(
- alloc_start,
- (alloc_end - alloc_start) as usize,
- )
- .await?;
- last_h = *h;
- alloc_start = 0;
- alloc_end = alloc_start + wl;
- fid = next_fid;
+ let mut last_h: Option<
+ Pin<Box<dyn Future<Output = Result<&'a dyn WALFile, ()>> + 'a>>,
+ > = None;
+ for ((next_fid, wl), h) in meta.into_iter().zip(files.into_iter()) {
+ if let Some(lh) = last_h.take() {
+ if next_fid != fid {
+ lh.await?
+ .allocate(
+ alloc_start,
+ (alloc_end - alloc_start) as usize,
+ )
+ .await?;
+ last_h = Some(h);
+ alloc_start = 0;
+ alloc_end = alloc_start + wl;
+ fid = next_fid;
+ } else {
+ last_h = Some(lh);
+ alloc_end += wl;
+ }
} else {
- alloc_end += wl;
+ last_h = Some(h);
}
}
- last_h
- .allocate(alloc_start, (alloc_end - alloc_start) as usize)
- .await?;
+ if let Some(lh) = last_h {
+ lh.await?
+ .allocate(alloc_start, (alloc_end - alloc_start) as usize)
+ .await?
+ }
Ok(())
};
let mut res = Vec::new();
- let mut prev = Box::pin(alloc) as Pin<Box<dyn Future<Output = _>>>;
+ let mut prev = Box::pin(alloc) as Pin<Box<dyn Future<Output = _> + 'a>>;
for (off, w) in writes.into_iter() {
- let w = future::ready(self.get_file(off >> file_nbit, true))
- .and_then(move |f| f.write(off & (file_size - 1), w));
- let w = (async {
+ let f = self.get_file(off >> file_nbit, true);
+ let w = (async move {
prev.await?;
- w.await
+ f.await?.write(off & (file_size - 1), w).await
})
.shared();
prev = Box::pin(w.clone());
- res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _>>>)
+ res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _> + 'a>>)
}
res
}
@@ -228,7 +239,7 @@ impl<F: WALStore> WALFilePool<F> {
}
fn reset(&mut self) {
- self.handles.clear()
+ self.handles.borrow_mut().clear()
}
}
@@ -474,7 +485,8 @@ impl WALLoader {
let mut chunks = None;
for fname in logfiles.iter() {
let fid = file_pool.get_fid(fname);
- let f = file_pool.get_file(fid, false)?;
+ let f =
+ futures::executor::block_on(file_pool.get_file(fid, false))?;
let mut off = 0;
while let Some(header_raw) = f.read(off, msize as usize)? {
let ringid_start = (fid << file_pool.file_nbit) + off;
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index 70c8d8e..5fc422f 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -103,7 +103,11 @@ impl WALStoreEmulState {
files: HashMap::new(),
}
}
- pub fn clone(&self) -> Self { WALStoreEmulState{ files: self.files.clone() }}
+ pub fn clone(&self) -> Self {
+ WALStoreEmulState {
+ files: self.files.clone(),
+ }
+ }
}
/// Emulate the persistent storage state.
@@ -112,9 +116,9 @@ where
G: FailGen,
F: FnMut(WALBytes, WALRingId),
{
- state: &'a mut WALStoreEmulState,
+ state: RefCell<&'a mut WALStoreEmulState>,
fgen: Rc<G>,
- recover: F,
+ recover: RefCell<F>,
}
impl<'a, G: FailGen, F: FnMut(WALBytes, WALRingId)> WALStoreEmul<'a, G, F> {
@@ -123,6 +127,8 @@ impl<'a, G: FailGen, F: FnMut(WALBytes, WALRingId)> WALStoreEmul<'a, G, F> {
fgen: Rc<G>,
recover: F,
) -> Self {
+ let state = RefCell::new(state);
+ let recover = RefCell::new(recover);
WALStoreEmul {
state,
fgen,
@@ -131,6 +137,7 @@ impl<'a, G: FailGen, F: FnMut(WALBytes, WALRingId)> WALStoreEmul<'a, G, F> {
}
}
+#[async_trait(?Send)]
impl<'a, G, F> WALStore for WALStoreEmul<'a, G, F>
where
G: 'static + FailGen,
@@ -138,15 +145,15 @@ where
{
type FileNameIter = std::vec::IntoIter<String>;
- fn open_file(
- &mut self,
+ async fn open_file(
+ &self,
filename: &str,
touch: bool,
) -> Result<Box<dyn WALFile>, ()> {
if self.fgen.next_fail() {
return Err(());
}
- match self.state.files.entry(filename.to_string()) {
+ match self.state.borrow_mut().files.entry(filename.to_string()) {
hash_map::Entry::Occupied(e) => Ok(Box::new(WALFileEmul {
file: e.get().clone(),
fgen: self.fgen.clone(),
@@ -164,12 +171,13 @@ where
}
}
- fn remove_file(&mut self, filename: &str) -> Result<(), ()> {
+ fn remove_file(&self, filename: &str) -> Result<(), ()> {
//println!("remove_file(filename={})", filename);
if self.fgen.next_fail() {
return Err(());
}
self.state
+ .borrow_mut()
.files
.remove(filename)
.ok_or(())
@@ -181,14 +189,14 @@ where
return Err(());
}
let mut logfiles = Vec::new();
- for (fname, _) in self.state.files.iter() {
+ for (fname, _) in self.state.borrow().files.iter() {
logfiles.push(fname.clone())
}
Ok(logfiles.into_iter())
}
fn apply_payload(
- &mut self,
+ &self,
payload: WALBytes,
ringid: WALRingId,
) -> Result<(), ()> {
@@ -200,7 +208,7 @@ where
hex::encode(&payload),
ringid);
*/
- (self.recover)(payload, ringid);
+ (&mut *self.recover.borrow_mut())(payload, ringid);
Ok(())
}
}
@@ -544,7 +552,10 @@ impl PaintingSim {
// write ahead
let rids = wal.grow(payloads);
assert_eq!(pss.len(), rids.len());
- let rids = rids.into_iter().map(|r| futures::executor::block_on(r)).collect::<Vec<_>>();
+ let rids = rids
+ .into_iter()
+ .map(|r| futures::executor::block_on(r))
+ .collect::<Vec<_>>();
// keep track of the operations
// grow could fail
for (ps, rid) in pss.iter().zip(rids.iter()) {