diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/wal.rs | 115 |
1 files changed, 71 insertions, 44 deletions
@@ -1,7 +1,7 @@ use async_trait::async_trait; +use futures::future::{self, FutureExt, TryFutureExt}; use std::collections::BinaryHeap; use std::future::Future; -use futures::future::{self, TryFutureExt, FutureExt}; use std::pin::Pin; #[repr(u8)] @@ -163,27 +163,37 @@ impl<F: WALStore> WALFilePool<F> { } // TODO: evict stale handles - fn write<'a>(&'a mut self, writes: Vec<(WALPos, WALBytes)>) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>>>>> { - - // pre-allocate the file space - let mut fid = writes[0].0 >> self.file_nbit; - let mut alloc_start = writes[0].0 & (self.file_size - 1); - let mut alloc_end = alloc_start + writes[0].1.len() as u64; + fn write<'a>( + &'a mut self, + writes: Vec<(WALPos, WALBytes)>, + ) -> Vec<Pin<Box<dyn Future<Output = Result<(), ()>>>>> { let file_size = self.file_size; let file_nbit = self.file_nbit; - // prepare file handles - let meta: Vec<(u64, u64)> = writes.iter().map(|(off, w)| ((*off) >> self.file_nbit, w.len() as u64)).collect(); - let files = meta.iter().map(|(fid, _)| self.get_file(*fid, true)).collect::<Result<Vec<&dyn WALFile>, ()>>(); - let prepare = async move { - let files: Vec<&dyn WALFile> = files?; + let meta: Vec<(u64, u64)> = writes + .iter() + .map(|(off, w)| ((*off) >> file_nbit, w.len() as u64)) + .collect(); + let files = meta + .iter() + .map(|(fid, _)| self.get_file(*fid, true)) + .collect::<Result<Vec<_>, ()>>(); + let mut fid = writes[0].0 >> file_nbit; + let mut alloc_start = writes[0].0 & (self.file_size - 1); + let mut alloc_end = alloc_start + writes[0].1.len() as u64; + // pre-allocate the file space + let alloc = async move { + let files = files?; let mut last_h = files[0]; - for ((off, wl), h) in meta[1..].iter().zip(files[1..].iter()) { - let next_fid = off >> file_nbit; + for ((next_fid, wl), h) in meta[1..].iter().zip(files[1..].iter()) { + let next_fid = *next_fid; + let wl = *wl; if next_fid != fid { - last_h.allocate( - alloc_start, - (alloc_end - alloc_start) as usize, - ).await?; + last_h + .allocate( + alloc_start, + (alloc_end - alloc_start) as usize, + ) + .await?; last_h = *h; alloc_start = 0; alloc_end = alloc_start + wl; @@ -192,18 +202,23 @@ impl<F: WALStore> WALFilePool<F> { alloc_end += wl; } } - last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize).await?; + last_h + .allocate(alloc_start, (alloc_end - alloc_start) as usize) + .await?; Ok(()) }; let mut res = Vec::new(); - let n = writes.len(); - let mut f = Box::pin(prepare) as Pin<Box<dyn Future<Output = Result<(), ()>>>>; + let mut prev = Box::pin(alloc) as Pin<Box<dyn Future<Output = _>>>; for (off, w) in writes.into_iter() { - let fr = future::ready(self.get_file(off >> self.file_nbit, true)) - .and_then(move |f| f.write(off & (file_size - 1), w)); - let g = (async {f.await?; fr.await}).shared(); - f = Box::pin(g.clone()); - res.push(Box::pin(g) as Pin<Box<dyn Future<Output = Result<(), ()>>>>) + let w = future::ready(self.get_file(off >> file_nbit, true)) + .and_then(move |f| f.write(off & (file_size - 1), w)); + let w = (async { + prev.await?; + w.await + }) + .shared(); + prev = Box::pin(w.clone()); + res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _>>>) } res } @@ -323,7 +338,7 @@ impl<F: WALStore> WALWriter<F> { self.state.next, self.block_buffer[bbuff_start as usize..] .to_vec() - .into_boxed_slice() + .into_boxed_slice(), )); self.state.next += (self.block_size - bbuff_start) as u64; bbuff_start = 0; @@ -344,31 +359,43 @@ impl<F: WALStore> WALWriter<F> { // mark the block info for each record let mut i = 0; 'outer: for (j, (off, w)) in writes.iter().enumerate() { - let off = *off; - let len = w.len() as u64; - while res[i].0.end <= off { + let blk_s = *off; + let blk_e = blk_s + w.len() as u64; + while res[i].0.end <= blk_s { i += 1; - if i >= res.len() { break 'outer } + if i >= res.len() { + break 'outer; + } } - while res[i].0.start < off + len { + while res[i].0.start < blk_e { res[i].1.push(j); - if res[i].0.end >= off + len { break } + if res[i].0.end >= blk_e { + break; + } i += 1; - if i >= res.len() { break 'outer } + if i >= res.len() { + break 'outer; + } } } - let futures: Vec<futures::future::Shared<_>> = self.file_pool.write(writes).into_iter() - .map(move |f| async move {f.await}.shared()).collect(); - let res = res.into_iter().map(|(ringid, blks)| { - futures::future::try_join_all(blks.into_iter().map(|idx| futures[idx].clone())) - .or_else(|_| future::ready(Err(()))).and_then(move |_| future::ready(Ok(ringid))) - }).collect(); + let writes: Vec<future::Shared<_>> = self + .file_pool + .write(writes) + .into_iter() + .map(move |f| async move { f.await }.shared()) + .collect(); + let res = res + .into_iter() + .map(|(ringid, blks)| { + future::try_join_all( + blks.into_iter().map(|idx| writes[idx].clone()), + ) + .or_else(|_| future::ready(Err(()))) + .and_then(move |_| future::ready(Ok(ringid))) + }) + .collect(); res - //.into_iter() - //.zip(res.into_iter()) - //.map(move |(f, ringid)| Box::pin(async move {f.await?; Ok(ringid)}) as Pin<Box<dyn Future<Output = Result<WALRingId, ()>>>>).collect() - //Vec::new().into_boxed_slice() } /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are |