summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/wal.rs96
1 files changed, 68 insertions, 28 deletions
diff --git a/src/wal.rs b/src/wal.rs
index 85b144b..5cd5a84 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,6 +1,8 @@
use async_trait::async_trait;
use std::collections::BinaryHeap;
use std::future::Future;
+use futures::future::{self, TryFutureExt, FutureExt};
+use std::pin::Pin;
#[repr(u8)]
enum WALRingType {
@@ -161,38 +163,49 @@ impl<F: WALStore> WALFilePool<F> {
}
// TODO: evict stale handles
- async fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> {
+ fn write<'a>(&'a mut self, writes: Vec<(WALPos, WALBytes)>) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>>>>> {
+
// pre-allocate the file space
let mut fid = writes[0].0 >> self.file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
let mut alloc_end = alloc_start + writes[0].1.len() as u64;
- let files = writes
- .iter()
- .map(|(off, _)| self.get_file((*off) >> self.file_nbit, true))
- .collect::<Result<Vec<&dyn WALFile>, ()>>()?;
+ let file_size = self.file_size;
+ let file_nbit = self.file_nbit;
// prepare file handles
- let mut last_h = files[0];
- for ((off, w), h) in writes[1..].iter().zip(files[1..].iter()) {
- let next_fid = off >> self.file_nbit;
- if next_fid != fid {
- last_h.allocate(
- alloc_start,
- (alloc_end - alloc_start) as usize,
- ).await?;
- last_h = *h;
- alloc_start = 0;
- alloc_end = alloc_start + w.len() as u64;
- fid = next_fid;
- } else {
- alloc_end += w.len() as u64;
+ let meta: Vec<(u64, u64)> = writes.iter().map(|(off, w)| ((*off) >> self.file_nbit, w.len() as u64)).collect();
+ let files = meta.iter().map(|(fid, _)| self.get_file(*fid, true)).collect::<Result<Vec<&dyn WALFile>, ()>>();
+ let prepare = async move {
+ let files: Vec<&dyn WALFile> = files?;
+ let mut last_h = files[0];
+ for ((off, wl), h) in meta[1..].iter().zip(files[1..].iter()) {
+ let next_fid = off >> file_nbit;
+ if next_fid != fid {
+ last_h.allocate(
+ alloc_start,
+ (alloc_end - alloc_start) as usize,
+ ).await?;
+ last_h = *h;
+ alloc_start = 0;
+ alloc_end = alloc_start + wl;
+ fid = next_fid;
+ } else {
+ alloc_end += wl;
+ }
}
- }
- last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize).await?;
+ last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize).await?;
+ Ok(())
+ };
+ let mut res = Vec::new();
+ let n = writes.len();
+ let mut f = Box::pin(prepare) as Pin<Box<dyn Future<Output = Result<(), ()>>>>;
for (off, w) in writes.into_iter() {
- self.get_file(off >> self.file_nbit, true)?
- .write(off & (self.file_size - 1), w).await?;
+ let fr = future::ready(self.get_file(off >> self.file_nbit, true))
+ .and_then(move |f| f.write(off & (file_size - 1), w));
+ let g = (async {f.await?; fr.await}).shared();
+ f = Box::pin(g.clone());
+ res.push(Box::pin(g) as Pin<Box<dyn Future<Output = Result<(), ()>>>>)
}
- Ok(())
+ res
}
fn remove_file(&mut self, fid: u64) -> Result<(), ()> {
@@ -237,7 +250,7 @@ impl<F: WALStore> WALWriter<F> {
pub fn grow<'a, T: AsRef<[WALBytes]>>(
&'a mut self,
records: T,
- ) -> (Box<[WALRingId]>, impl Future<Output = Result<(), ()>> + 'a) {
+ ) -> Vec<impl Future<Output = Result<WALRingId, ()>> + 'a> {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = self.msize as u32;
@@ -282,7 +295,7 @@ impl<F: WALStore> WALWriter<F> {
rsize = 0;
let end =
self.state.next + (bbuff_cur - bbuff_start) as u64;
- res.push(WALRingId { start: rs, end });
+ res.push((WALRingId { start: rs, end }, Vec::new()));
} else {
// the remaining block can only accommodate partial rec
let payload = &rec[..d as usize];
@@ -310,7 +323,7 @@ impl<F: WALStore> WALWriter<F> {
self.state.next,
self.block_buffer[bbuff_start as usize..]
.to_vec()
- .into_boxed_slice(),
+ .into_boxed_slice()
));
self.state.next += (self.block_size - bbuff_start) as u64;
bbuff_start = 0;
@@ -328,7 +341,34 @@ impl<F: WALStore> WALWriter<F> {
self.state.next += (bbuff_cur - bbuff_start) as u64;
}
- (res.into_boxed_slice(), self.file_pool.write(writes))
+ // mark the block info for each record
+ let mut i = 0;
+ 'outer: for (j, (off, w)) in writes.iter().enumerate() {
+ let off = *off;
+ let len = w.len() as u64;
+ while res[i].0.end <= off {
+ i += 1;
+ if i >= res.len() { break 'outer }
+ }
+ while res[i].0.start < off + len {
+ res[i].1.push(j);
+ if res[i].0.end >= off + len { break }
+ i += 1;
+ if i >= res.len() { break 'outer }
+ }
+ }
+
+ let futures: Vec<futures::future::Shared<_>> = self.file_pool.write(writes).into_iter()
+ .map(move |f| async move {f.await}.shared()).collect();
+ let res = res.into_iter().map(|(ringid, blks)| {
+ futures::future::try_join_all(blks.into_iter().map(|idx| futures[idx].clone()))
+ .or_else(|_| future::ready(Err(()))).and_then(move |_| future::ready(Ok(ringid)))
+ }).collect();
+ res
+ //.into_iter()
+ //.zip(res.into_iter())
+ //.map(move |(f, ringid)| Box::pin(async move {f.await?; Ok(ringid)}) as Pin<Box<dyn Future<Output = Result<WALRingId, ()>>>>).collect()
+ //Vec::new().into_boxed_slice()
}
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are