From 4b2c4cdf5de3e7bd1df954dccc320806b18fd1fb Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 12 Jun 2020 11:28:09 -0400 Subject: clean up --- src/wal.rs | 115 ++++++++++++++++++++++++++++++++-------------------- tests/common/mod.rs | 9 ++-- 2 files changed, 76 insertions(+), 48 deletions(-) diff --git a/src/wal.rs b/src/wal.rs index 5cd5a84..761ea82 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; +use futures::future::{self, FutureExt, TryFutureExt}; use std::collections::BinaryHeap; use std::future::Future; -use futures::future::{self, TryFutureExt, FutureExt}; use std::pin::Pin; #[repr(u8)] @@ -163,27 +163,37 @@ impl WALFilePool { } // TODO: evict stale handles - fn write<'a>(&'a mut self, writes: Vec<(WALPos, WALBytes)>) -> Vec>>>> { - - // pre-allocate the file space - let mut fid = writes[0].0 >> self.file_nbit; - let mut alloc_start = writes[0].0 & (self.file_size - 1); - let mut alloc_end = alloc_start + writes[0].1.len() as u64; + fn write<'a>( + &'a mut self, + writes: Vec<(WALPos, WALBytes)>, + ) -> Vec>>>> { let file_size = self.file_size; let file_nbit = self.file_nbit; - // prepare file handles - let meta: Vec<(u64, u64)> = writes.iter().map(|(off, w)| ((*off) >> self.file_nbit, w.len() as u64)).collect(); - let files = meta.iter().map(|(fid, _)| self.get_file(*fid, true)).collect::, ()>>(); - let prepare = async move { - let files: Vec<&dyn WALFile> = files?; + let meta: Vec<(u64, u64)> = writes + .iter() + .map(|(off, w)| ((*off) >> file_nbit, w.len() as u64)) + .collect(); + let files = meta + .iter() + .map(|(fid, _)| self.get_file(*fid, true)) + .collect::, ()>>(); + let mut fid = writes[0].0 >> file_nbit; + let mut alloc_start = writes[0].0 & (self.file_size - 1); + let mut alloc_end = alloc_start + writes[0].1.len() as u64; + // pre-allocate the file space + let alloc = async move { + let files = files?; let mut last_h = files[0]; - for ((off, wl), h) in meta[1..].iter().zip(files[1..].iter()) { - let next_fid = off >> file_nbit; + for ((next_fid, wl), h) in meta[1..].iter().zip(files[1..].iter()) { + let next_fid = *next_fid; + let wl = *wl; if next_fid != fid { - last_h.allocate( - alloc_start, - (alloc_end - alloc_start) as usize, - ).await?; + last_h + .allocate( + alloc_start, + (alloc_end - alloc_start) as usize, + ) + .await?; last_h = *h; alloc_start = 0; alloc_end = alloc_start + wl; @@ -192,18 +202,23 @@ impl WALFilePool { alloc_end += wl; } } - last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize).await?; + last_h + .allocate(alloc_start, (alloc_end - alloc_start) as usize) + .await?; Ok(()) }; let mut res = Vec::new(); - let n = writes.len(); - let mut f = Box::pin(prepare) as Pin>>>; + let mut prev = Box::pin(alloc) as Pin>>; for (off, w) in writes.into_iter() { - let fr = future::ready(self.get_file(off >> self.file_nbit, true)) - .and_then(move |f| f.write(off & (file_size - 1), w)); - let g = (async {f.await?; fr.await}).shared(); - f = Box::pin(g.clone()); - res.push(Box::pin(g) as Pin>>>) + let w = future::ready(self.get_file(off >> file_nbit, true)) + .and_then(move |f| f.write(off & (file_size - 1), w)); + let w = (async { + prev.await?; + w.await + }) + .shared(); + prev = Box::pin(w.clone()); + res.push(Box::pin(w) as Pin>>) } res } @@ -323,7 +338,7 @@ impl WALWriter { self.state.next, self.block_buffer[bbuff_start as usize..] .to_vec() - .into_boxed_slice() + .into_boxed_slice(), )); self.state.next += (self.block_size - bbuff_start) as u64; bbuff_start = 0; @@ -344,31 +359,43 @@ impl WALWriter { // mark the block info for each record let mut i = 0; 'outer: for (j, (off, w)) in writes.iter().enumerate() { - let off = *off; - let len = w.len() as u64; - while res[i].0.end <= off { + let blk_s = *off; + let blk_e = blk_s + w.len() as u64; + while res[i].0.end <= blk_s { i += 1; - if i >= res.len() { break 'outer } + if i >= res.len() { + break 'outer; + } } - while res[i].0.start < off + len { + while res[i].0.start < blk_e { res[i].1.push(j); - if res[i].0.end >= off + len { break } + if res[i].0.end >= blk_e { + break; + } i += 1; - if i >= res.len() { break 'outer } + if i >= res.len() { + break 'outer; + } } } - let futures: Vec> = self.file_pool.write(writes).into_iter() - .map(move |f| async move {f.await}.shared()).collect(); - let res = res.into_iter().map(|(ringid, blks)| { - futures::future::try_join_all(blks.into_iter().map(|idx| futures[idx].clone())) - .or_else(|_| future::ready(Err(()))).and_then(move |_| future::ready(Ok(ringid))) - }).collect(); + let writes: Vec> = self + .file_pool + .write(writes) + .into_iter() + .map(move |f| async move { f.await }.shared()) + .collect(); + let res = res + .into_iter() + .map(|(ringid, blks)| { + future::try_join_all( + blks.into_iter().map(|idx| writes[idx].clone()), + ) + .or_else(|_| future::ready(Err(()))) + .and_then(move |_| future::ready(Ok(ringid))) + }) + .collect(); res - //.into_iter() - //.zip(res.into_iter()) - //.map(move |(f, ringid)| Box::pin(async move {f.await?; Ok(ringid)}) as Pin>>>).collect() - //Vec::new().into_boxed_slice() } /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are diff --git a/tests/common/mod.rs b/tests/common/mod.rs index ba302f9..70c8d8e 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -542,15 +542,16 @@ impl PaintingSim { let payloads = pss.iter().map(|e| e.to_bytes()).collect::>(); // write ahead - let (rids, ok) = wal.grow(payloads); + let rids = wal.grow(payloads); assert_eq!(pss.len(), rids.len()); + let rids = rids.into_iter().map(|r| futures::executor::block_on(r)).collect::>(); // keep track of the operations + // grow could fail for (ps, rid) in pss.iter().zip(rids.iter()) { ops.push(ps.clone()); - ringid_map.insert(*rid, ops.len() - 1); + ringid_map.insert((*rid)?, ops.len() - 1); } - // grow could fail - futures::executor::block_on(ok)?; + let rids = rids.into_iter().map(|r| r.unwrap()); // finish appending to WAL /* for rid in rids.iter() { -- cgit v1.2.3-70-g09d2