diff options
Diffstat (limited to 'src/wal.rs')
-rw-r--r-- | src/wal.rs | 96 |
1 files changed, 68 insertions, 28 deletions
@@ -1,6 +1,8 @@ use async_trait::async_trait; use std::collections::BinaryHeap; use std::future::Future; +use futures::future::{self, TryFutureExt, FutureExt}; +use std::pin::Pin; #[repr(u8)] enum WALRingType { @@ -161,38 +163,49 @@ impl<F: WALStore> WALFilePool<F> { } // TODO: evict stale handles - async fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> { + fn write<'a>(&'a mut self, writes: Vec<(WALPos, WALBytes)>) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>>>>> { + // pre-allocate the file space let mut fid = writes[0].0 >> self.file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); let mut alloc_end = alloc_start + writes[0].1.len() as u64; - let files = writes - .iter() - .map(|(off, _)| self.get_file((*off) >> self.file_nbit, true)) - .collect::<Result<Vec<&dyn WALFile>, ()>>()?; + let file_size = self.file_size; + let file_nbit = self.file_nbit; // prepare file handles - let mut last_h = files[0]; - for ((off, w), h) in writes[1..].iter().zip(files[1..].iter()) { - let next_fid = off >> self.file_nbit; - if next_fid != fid { - last_h.allocate( - alloc_start, - (alloc_end - alloc_start) as usize, - ).await?; - last_h = *h; - alloc_start = 0; - alloc_end = alloc_start + w.len() as u64; - fid = next_fid; - } else { - alloc_end += w.len() as u64; + let meta: Vec<(u64, u64)> = writes.iter().map(|(off, w)| ((*off) >> self.file_nbit, w.len() as u64)).collect(); + let files = meta.iter().map(|(fid, _)| self.get_file(*fid, true)).collect::<Result<Vec<&dyn WALFile>, ()>>(); + let prepare = async move { + let files: Vec<&dyn WALFile> = files?; + let mut last_h = files[0]; + for ((off, wl), h) in meta[1..].iter().zip(files[1..].iter()) { + let next_fid = off >> file_nbit; + if next_fid != fid { + last_h.allocate( + alloc_start, + (alloc_end - alloc_start) as usize, + ).await?; + last_h = *h; + alloc_start = 0; + alloc_end = alloc_start + wl; + fid = next_fid; + } else { + alloc_end += wl; + } } - } - last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize).await?; + last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize).await?; + Ok(()) + }; + let mut res = Vec::new(); + let n = writes.len(); + let mut f = Box::pin(prepare) as Pin<Box<dyn Future<Output = Result<(), ()>>>>; for (off, w) in writes.into_iter() { - self.get_file(off >> self.file_nbit, true)? - .write(off & (self.file_size - 1), w).await?; + let fr = future::ready(self.get_file(off >> self.file_nbit, true)) + .and_then(move |f| f.write(off & (file_size - 1), w)); + let g = (async {f.await?; fr.await}).shared(); + f = Box::pin(g.clone()); + res.push(Box::pin(g) as Pin<Box<dyn Future<Output = Result<(), ()>>>>) } - Ok(()) + res } fn remove_file(&mut self, fid: u64) -> Result<(), ()> { @@ -237,7 +250,7 @@ impl<F: WALStore> WALWriter<F> { pub fn grow<'a, T: AsRef<[WALBytes]>>( &'a mut self, records: T, - ) -> (Box<[WALRingId]>, impl Future<Output = Result<(), ()>> + 'a) { + ) -> Vec<impl Future<Output = Result<WALRingId, ()>> + 'a> { let mut res = Vec::new(); let mut writes = Vec::new(); let msize = self.msize as u32; @@ -282,7 +295,7 @@ impl<F: WALStore> WALWriter<F> { rsize = 0; let end = self.state.next + (bbuff_cur - bbuff_start) as u64; - res.push(WALRingId { start: rs, end }); + res.push((WALRingId { start: rs, end }, Vec::new())); } else { // the remaining block can only accommodate partial rec let payload = &rec[..d as usize]; @@ -310,7 +323,7 @@ impl<F: WALStore> WALWriter<F> { self.state.next, self.block_buffer[bbuff_start as usize..] .to_vec() - .into_boxed_slice(), + .into_boxed_slice() )); self.state.next += (self.block_size - bbuff_start) as u64; bbuff_start = 0; @@ -328,7 +341,34 @@ impl<F: WALStore> WALWriter<F> { self.state.next += (bbuff_cur - bbuff_start) as u64; } - (res.into_boxed_slice(), self.file_pool.write(writes)) + // mark the block info for each record + let mut i = 0; + 'outer: for (j, (off, w)) in writes.iter().enumerate() { + let off = *off; + let len = w.len() as u64; + while res[i].0.end <= off { + i += 1; + if i >= res.len() { break 'outer } + } + while res[i].0.start < off + len { + res[i].1.push(j); + if res[i].0.end >= off + len { break } + i += 1; + if i >= res.len() { break 'outer } + } + } + + let futures: Vec<futures::future::Shared<_>> = self.file_pool.write(writes).into_iter() + .map(move |f| async move {f.await}.shared()).collect(); + let res = res.into_iter().map(|(ringid, blks)| { + futures::future::try_join_all(blks.into_iter().map(|idx| futures[idx].clone())) + .or_else(|_| future::ready(Err(()))).and_then(move |_| future::ready(Ok(ringid))) + }).collect(); + res + //.into_iter() + //.zip(res.into_iter()) + //.map(move |(f, ringid)| Box::pin(async move {f.await?; Ok(ringid)}) as Pin<Box<dyn Future<Output = Result<WALRingId, ()>>>>).collect() + //Vec::new().into_boxed_slice() } /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are |