diff options
author | Determinant <[email protected]> | 2020-06-09 14:54:00 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-09 14:54:00 -0400 |
commit | 75578432bb61c09715f3389734f9cdbf285cb9d5 (patch) | |
tree | 6a3b37721427ea3669e7312e681bd774943b4b9e | |
parent | 5296c8b4fe4214784fc76cff76589fcbd3a060c3 (diff) |
...
-rw-r--r-- | src/lib.rs | 62 |
1 files changed, 58 insertions, 4 deletions
@@ -1,3 +1,5 @@ +use std::collections::BinaryHeap; + #[repr(u8)] enum WALRingType { Null = 0x0, @@ -16,10 +18,27 @@ struct WALRingBlob { type WALFileId = u32; type WALPos = u64; -type WALWrite = (WALPos, Box<[u8]>); + +#[derive(Eq, PartialEq, Copy, Clone)] +pub struct WALRingId { + start: WALPos, + end: WALPos +} + +impl Ord for WALRingId { + fn cmp(&self, other: &WALRingId) -> std::cmp::Ordering { + other.start.cmp(&self.start).then_with(|| other.end.cmp(&self.end)) + } +} + +impl PartialOrd for WALRingId { + fn partial_cmp(&self, other: &WALRingId) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} pub struct WALState { - pub base: WALPos, + pub first_fid: u64, pub last: WALPos, pub block_nbit: u8, pub file_nbit: u8, @@ -53,6 +72,9 @@ impl<F: WALStore> WALFilePool<F> { } fn write(&mut self, offset: u64, data: Box<[u8]>) { } + fn remove_file(&self, fid: u64) -> bool { + true + } } pub struct WALWriter<F: WALStore> { @@ -60,6 +82,8 @@ pub struct WALWriter<F: WALStore> { file_pool: WALFilePool<F>, block_buffer: Box<[u8]>, block_size: u32, + next_complete: WALPos, + io_complete: BinaryHeap<WALRingId> } impl<F: WALStore> WALWriter<F> { @@ -73,10 +97,16 @@ impl<F: WALStore> WALWriter<F> { file_pool: WALFilePool::new(wal_store, file_size, cache_size), block_buffer: b.into_boxed_slice(), block_size, + next_complete: 0, + io_complete: BinaryHeap::new(), } } - pub fn grow(&mut self, records: &[Box<[u8]>]) { + /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the + /// function returns. The caller then has the knowledge of WAL writes so it should defer + /// actual data writes after WAL writes. + pub fn grow(&mut self, records: &[Box<[u8]>]) -> Box<[WALRingId]> { + let mut res = Vec::new(); let mut writes = Vec::new(); let msize = std::mem::size_of::<WALRingBlob>() as u32; // the global offest of the begining of the block @@ -95,6 +125,7 @@ impl<F: WALStore> WALWriter<F> { let d = remain - msize; let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>( &mut self.block_buffer[bbuff_cur as usize] as *mut u8)}; + let ring_start = self.state.last + (bbuff_cur - bbuff_start) as u64; if d >= rsize { // the remaining rec fits in the block let payload = rec; @@ -118,6 +149,8 @@ impl<F: WALStore> WALWriter<F> { bbuff_cur += d; rec = &rec[d as usize..]; } + let ring_end = self.state.last + (bbuff_cur - bbuff_start) as u64; + res.push(WALRingId{start: ring_start, end: ring_end}); } else { // add padding space by moving the point to the end of the block bbuff_cur = self.block_size; @@ -139,7 +172,28 @@ impl<F: WALStore> WALWriter<F> { self.state.last += (bbuff_cur - bbuff_start) as u64; } for (off, w) in writes.into_iter() { - self.file_pool.write(off, w) + self.file_pool.write(off, w); + } + res.into_boxed_slice() + } + + /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are + /// complete so that it could automatically remove obsolete WAL files. + pub fn peel(&mut self, records: &[WALRingId]) { + for rec in records { + self.io_complete.push(*rec) + } + let orig_fid = self.state.first_fid; + while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) { + if s != self.next_complete { + break + } + let m = self.io_complete.pop().unwrap(); + self.next_complete = m.end + } + let next_fid = self.next_complete >> self.state.file_nbit; + for fid in orig_fid..next_fid { + self.file_pool.remove_file(fid); } } } |