summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lib.rs62
1 files changed, 58 insertions, 4 deletions
diff --git a/src/lib.rs b/src/lib.rs
index e4ae0ea..2716cb5 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,3 +1,5 @@
+use std::collections::BinaryHeap;
+
#[repr(u8)]
enum WALRingType {
Null = 0x0,
@@ -16,10 +18,27 @@ struct WALRingBlob {
type WALFileId = u32;
type WALPos = u64;
-type WALWrite = (WALPos, Box<[u8]>);
+
+#[derive(Eq, PartialEq, Copy, Clone)]
+pub struct WALRingId {
+ start: WALPos,
+ end: WALPos
+}
+
+impl Ord for WALRingId {
+ fn cmp(&self, other: &WALRingId) -> std::cmp::Ordering {
+ other.start.cmp(&self.start).then_with(|| other.end.cmp(&self.end))
+ }
+}
+
+impl PartialOrd for WALRingId {
+ fn partial_cmp(&self, other: &WALRingId) -> Option<std::cmp::Ordering> {
+ Some(self.cmp(other))
+ }
+}
pub struct WALState {
- pub base: WALPos,
+ pub first_fid: u64,
pub last: WALPos,
pub block_nbit: u8,
pub file_nbit: u8,
@@ -53,6 +72,9 @@ impl<F: WALStore> WALFilePool<F> {
}
fn write(&mut self, offset: u64, data: Box<[u8]>) {
}
+ fn remove_file(&self, fid: u64) -> bool {
+ true
+ }
}
pub struct WALWriter<F: WALStore> {
@@ -60,6 +82,8 @@ pub struct WALWriter<F: WALStore> {
file_pool: WALFilePool<F>,
block_buffer: Box<[u8]>,
block_size: u32,
+ next_complete: WALPos,
+ io_complete: BinaryHeap<WALRingId>
}
impl<F: WALStore> WALWriter<F> {
@@ -73,10 +97,16 @@ impl<F: WALStore> WALWriter<F> {
file_pool: WALFilePool::new(wal_store, file_size, cache_size),
block_buffer: b.into_boxed_slice(),
block_size,
+ next_complete: 0,
+ io_complete: BinaryHeap::new(),
}
}
- pub fn grow(&mut self, records: &[Box<[u8]>]) {
+ /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
+ /// function returns. The caller then has the knowledge of WAL writes so it should defer
+ /// actual data writes after WAL writes.
+ pub fn grow(&mut self, records: &[Box<[u8]>]) -> Box<[WALRingId]> {
+ let mut res = Vec::new();
let mut writes = Vec::new();
let msize = std::mem::size_of::<WALRingBlob>() as u32;
// the global offest of the begining of the block
@@ -95,6 +125,7 @@ impl<F: WALStore> WALWriter<F> {
let d = remain - msize;
let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>(
&mut self.block_buffer[bbuff_cur as usize] as *mut u8)};
+ let ring_start = self.state.last + (bbuff_cur - bbuff_start) as u64;
if d >= rsize {
// the remaining rec fits in the block
let payload = rec;
@@ -118,6 +149,8 @@ impl<F: WALStore> WALWriter<F> {
bbuff_cur += d;
rec = &rec[d as usize..];
}
+ let ring_end = self.state.last + (bbuff_cur - bbuff_start) as u64;
+ res.push(WALRingId{start: ring_start, end: ring_end});
} else {
// add padding space by moving the point to the end of the block
bbuff_cur = self.block_size;
@@ -139,7 +172,28 @@ impl<F: WALStore> WALWriter<F> {
self.state.last += (bbuff_cur - bbuff_start) as u64;
}
for (off, w) in writes.into_iter() {
- self.file_pool.write(off, w)
+ self.file_pool.write(off, w);
+ }
+ res.into_boxed_slice()
+ }
+
+ /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
+ /// complete so that it could automatically remove obsolete WAL files.
+ pub fn peel(&mut self, records: &[WALRingId]) {
+ for rec in records {
+ self.io_complete.push(*rec)
+ }
+ let orig_fid = self.state.first_fid;
+ while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) {
+ if s != self.next_complete {
+ break
+ }
+ let m = self.io_complete.pop().unwrap();
+ self.next_complete = m.end
+ }
+ let next_fid = self.next_complete >> self.state.file_nbit;
+ for fid in orig_fid..next_fid {
+ self.file_pool.remove_file(fid);
}
}
}