aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-06-12 11:28:09 -0400
committerDeterminant <[email protected]>2020-06-12 11:28:09 -0400
commit4b2c4cdf5de3e7bd1df954dccc320806b18fd1fb (patch)
treefe5664998e8d2882e261b8ebb76eb682cd432c2c
parentf69fd448feec51bee8cb579ac35b1f93d2a80e14 (diff)
clean up
-rw-r--r--src/wal.rs115
-rw-r--r--tests/common/mod.rs9
2 files changed, 76 insertions, 48 deletions
diff --git a/src/wal.rs b/src/wal.rs
index 5cd5a84..761ea82 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,7 +1,7 @@
use async_trait::async_trait;
+use futures::future::{self, FutureExt, TryFutureExt};
use std::collections::BinaryHeap;
use std::future::Future;
-use futures::future::{self, TryFutureExt, FutureExt};
use std::pin::Pin;
#[repr(u8)]
@@ -163,27 +163,37 @@ impl<F: WALStore> WALFilePool<F> {
}
// TODO: evict stale handles
- fn write<'a>(&'a mut self, writes: Vec<(WALPos, WALBytes)>) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>>>>> {
-
- // pre-allocate the file space
- let mut fid = writes[0].0 >> self.file_nbit;
- let mut alloc_start = writes[0].0 & (self.file_size - 1);
- let mut alloc_end = alloc_start + writes[0].1.len() as u64;
+ fn write<'a>(
+ &'a mut self,
+ writes: Vec<(WALPos, WALBytes)>,
+ ) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>>>>> {
let file_size = self.file_size;
let file_nbit = self.file_nbit;
- // prepare file handles
- let meta: Vec<(u64, u64)> = writes.iter().map(|(off, w)| ((*off) >> self.file_nbit, w.len() as u64)).collect();
- let files = meta.iter().map(|(fid, _)| self.get_file(*fid, true)).collect::<Result<Vec<&dyn WALFile>, ()>>();
- let prepare = async move {
- let files: Vec<&dyn WALFile> = files?;
+ let meta: Vec<(u64, u64)> = writes
+ .iter()
+ .map(|(off, w)| ((*off) >> file_nbit, w.len() as u64))
+ .collect();
+ let files = meta
+ .iter()
+ .map(|(fid, _)| self.get_file(*fid, true))
+ .collect::<Result<Vec<_>, ()>>();
+ let mut fid = writes[0].0 >> file_nbit;
+ let mut alloc_start = writes[0].0 & (self.file_size - 1);
+ let mut alloc_end = alloc_start + writes[0].1.len() as u64;
+ // pre-allocate the file space
+ let alloc = async move {
+ let files = files?;
let mut last_h = files[0];
- for ((off, wl), h) in meta[1..].iter().zip(files[1..].iter()) {
- let next_fid = off >> file_nbit;
+ for ((next_fid, wl), h) in meta[1..].iter().zip(files[1..].iter()) {
+ let next_fid = *next_fid;
+ let wl = *wl;
if next_fid != fid {
- last_h.allocate(
- alloc_start,
- (alloc_end - alloc_start) as usize,
- ).await?;
+ last_h
+ .allocate(
+ alloc_start,
+ (alloc_end - alloc_start) as usize,
+ )
+ .await?;
last_h = *h;
alloc_start = 0;
alloc_end = alloc_start + wl;
@@ -192,18 +202,23 @@ impl<F: WALStore> WALFilePool<F> {
alloc_end += wl;
}
}
- last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize).await?;
+ last_h
+ .allocate(alloc_start, (alloc_end - alloc_start) as usize)
+ .await?;
Ok(())
};
let mut res = Vec::new();
- let n = writes.len();
- let mut f = Box::pin(prepare) as Pin<Box<dyn Future<Output = Result<(), ()>>>>;
+ let mut prev = Box::pin(alloc) as Pin<Box<dyn Future<Output = _>>>;
for (off, w) in writes.into_iter() {
- let fr = future::ready(self.get_file(off >> self.file_nbit, true))
- .and_then(move |f| f.write(off & (file_size - 1), w));
- let g = (async {f.await?; fr.await}).shared();
- f = Box::pin(g.clone());
- res.push(Box::pin(g) as Pin<Box<dyn Future<Output = Result<(), ()>>>>)
+ let w = future::ready(self.get_file(off >> file_nbit, true))
+ .and_then(move |f| f.write(off & (file_size - 1), w));
+ let w = (async {
+ prev.await?;
+ w.await
+ })
+ .shared();
+ prev = Box::pin(w.clone());
+ res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _>>>)
}
res
}
@@ -323,7 +338,7 @@ impl<F: WALStore> WALWriter<F> {
self.state.next,
self.block_buffer[bbuff_start as usize..]
.to_vec()
- .into_boxed_slice()
+ .into_boxed_slice(),
));
self.state.next += (self.block_size - bbuff_start) as u64;
bbuff_start = 0;
@@ -344,31 +359,43 @@ impl<F: WALStore> WALWriter<F> {
// mark the block info for each record
let mut i = 0;
'outer: for (j, (off, w)) in writes.iter().enumerate() {
- let off = *off;
- let len = w.len() as u64;
- while res[i].0.end <= off {
+ let blk_s = *off;
+ let blk_e = blk_s + w.len() as u64;
+ while res[i].0.end <= blk_s {
i += 1;
- if i >= res.len() { break 'outer }
+ if i >= res.len() {
+ break 'outer;
+ }
}
- while res[i].0.start < off + len {
+ while res[i].0.start < blk_e {
res[i].1.push(j);
- if res[i].0.end >= off + len { break }
+ if res[i].0.end >= blk_e {
+ break;
+ }
i += 1;
- if i >= res.len() { break 'outer }
+ if i >= res.len() {
+ break 'outer;
+ }
}
}
- let futures: Vec<futures::future::Shared<_>> = self.file_pool.write(writes).into_iter()
- .map(move |f| async move {f.await}.shared()).collect();
- let res = res.into_iter().map(|(ringid, blks)| {
- futures::future::try_join_all(blks.into_iter().map(|idx| futures[idx].clone()))
- .or_else(|_| future::ready(Err(()))).and_then(move |_| future::ready(Ok(ringid)))
- }).collect();
+ let writes: Vec<future::Shared<_>> = self
+ .file_pool
+ .write(writes)
+ .into_iter()
+ .map(move |f| async move { f.await }.shared())
+ .collect();
+ let res = res
+ .into_iter()
+ .map(|(ringid, blks)| {
+ future::try_join_all(
+ blks.into_iter().map(|idx| writes[idx].clone()),
+ )
+ .or_else(|_| future::ready(Err(())))
+ .and_then(move |_| future::ready(Ok(ringid)))
+ })
+ .collect();
res
- //.into_iter()
- //.zip(res.into_iter())
- //.map(move |(f, ringid)| Box::pin(async move {f.await?; Ok(ringid)}) as Pin<Box<dyn Future<Output = Result<WALRingId, ()>>>>).collect()
- //Vec::new().into_boxed_slice()
}
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index ba302f9..70c8d8e 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -542,15 +542,16 @@ impl PaintingSim {
let payloads =
pss.iter().map(|e| e.to_bytes()).collect::<Vec<WALBytes>>();
// write ahead
- let (rids, ok) = wal.grow(payloads);
+ let rids = wal.grow(payloads);
assert_eq!(pss.len(), rids.len());
+ let rids = rids.into_iter().map(|r| futures::executor::block_on(r)).collect::<Vec<_>>();
// keep track of the operations
+ // grow could fail
for (ps, rid) in pss.iter().zip(rids.iter()) {
ops.push(ps.clone());
- ringid_map.insert(*rid, ops.len() - 1);
+ ringid_map.insert((*rid)?, ops.len() - 1);
}
- // grow could fail
- futures::executor::block_on(ok)?;
+ let rids = rids.into_iter().map(|r| r.unwrap());
// finish appending to WAL
/*
for rid in rids.iter() {