From 75578432bb61c09715f3389734f9cdbf285cb9d5 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 9 Jun 2020 14:54:00 -0400 Subject: ... --- src/lib.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e4ae0ea..2716cb5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ +use std::collections::BinaryHeap; + #[repr(u8)] enum WALRingType { Null = 0x0, @@ -16,10 +18,27 @@ struct WALRingBlob { type WALFileId = u32; type WALPos = u64; -type WALWrite = (WALPos, Box<[u8]>); + +#[derive(Eq, PartialEq, Copy, Clone)] +pub struct WALRingId { + start: WALPos, + end: WALPos +} + +impl Ord for WALRingId { + fn cmp(&self, other: &WALRingId) -> std::cmp::Ordering { + other.start.cmp(&self.start).then_with(|| other.end.cmp(&self.end)) + } +} + +impl PartialOrd for WALRingId { + fn partial_cmp(&self, other: &WALRingId) -> Option { + Some(self.cmp(other)) + } +} pub struct WALState { - pub base: WALPos, + pub first_fid: u64, pub last: WALPos, pub block_nbit: u8, pub file_nbit: u8, @@ -53,6 +72,9 @@ impl WALFilePool { } fn write(&mut self, offset: u64, data: Box<[u8]>) { } + fn remove_file(&self, fid: u64) -> bool { + true + } } pub struct WALWriter { @@ -60,6 +82,8 @@ pub struct WALWriter { file_pool: WALFilePool, block_buffer: Box<[u8]>, block_size: u32, + next_complete: WALPos, + io_complete: BinaryHeap } impl WALWriter { @@ -73,10 +97,16 @@ impl WALWriter { file_pool: WALFilePool::new(wal_store, file_size, cache_size), block_buffer: b.into_boxed_slice(), block_size, + next_complete: 0, + io_complete: BinaryHeap::new(), } } - pub fn grow(&mut self, records: &[Box<[u8]>]) { + /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the + /// function returns. The caller then has the knowledge of WAL writes so it should defer + /// actual data writes after WAL writes. + pub fn grow(&mut self, records: &[Box<[u8]>]) -> Box<[WALRingId]> { + let mut res = Vec::new(); let mut writes = Vec::new(); let msize = std::mem::size_of::() as u32; // the global offest of the begining of the block @@ -95,6 +125,7 @@ impl WALWriter { let d = remain - msize; let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>( &mut self.block_buffer[bbuff_cur as usize] as *mut u8)}; + let ring_start = self.state.last + (bbuff_cur - bbuff_start) as u64; if d >= rsize { // the remaining rec fits in the block let payload = rec; @@ -118,6 +149,8 @@ impl WALWriter { bbuff_cur += d; rec = &rec[d as usize..]; } + let ring_end = self.state.last + (bbuff_cur - bbuff_start) as u64; + res.push(WALRingId{start: ring_start, end: ring_end}); } else { // add padding space by moving the point to the end of the block bbuff_cur = self.block_size; @@ -139,7 +172,28 @@ impl WALWriter { self.state.last += (bbuff_cur - bbuff_start) as u64; } for (off, w) in writes.into_iter() { - self.file_pool.write(off, w) + self.file_pool.write(off, w); + } + res.into_boxed_slice() + } + + /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are + /// complete so that it could automatically remove obsolete WAL files. + pub fn peel(&mut self, records: &[WALRingId]) { + for rec in records { + self.io_complete.push(*rec) + } + let orig_fid = self.state.first_fid; + while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) { + if s != self.next_complete { + break + } + let m = self.io_complete.pop().unwrap(); + self.next_complete = m.end + } + let next_fid = self.next_complete >> self.state.file_nbit; + for fid in orig_fid..next_fid { + self.file_pool.remove_file(fid); } } } -- cgit v1.2.3-70-g09d2