summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-06-09 23:19:11 -0400
committerDeterminant <[email protected]>2020-06-09 23:19:11 -0400
commit4831ae815f26170174545ae87e9fe960bfce5b8c (patch)
tree0161c2ff585d8176f33306ee0f2cf01b628d5a9a /src
parent56bc442c4da0e6e75401d50f206fe56e1bf8adbb (diff)
...
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs45
1 files changed, 26 insertions, 19 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 53f4e0f..1e1b653 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -11,7 +11,7 @@ enum WALRingType {
Last
}
-#[repr(C)]
+#[repr(packed)]
struct WALRingBlob {
crc32: u32,
rsize: u32,
@@ -19,11 +19,11 @@ struct WALRingBlob {
// payload follows
}
-type WALBytes = Box<[u8]>;
-type WALFileId = u64;
-type WALPos = u64;
+pub type WALBytes = Box<[u8]>;
+pub type WALFileId = u64;
+pub type WALPos = u64;
-#[derive(Eq, PartialEq, Copy, Clone)]
+#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub struct WALRingId {
start: WALPos,
end: WALPos
@@ -118,7 +118,7 @@ impl<F: WALStore> WALFilePool<F> {
// TODO: evict stale handles
fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) {
// pre-allocate the file space
- let fid = writes[0].0 >> self.file_nbit;
+ let mut fid = writes[0].0 >> self.file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
let mut alloc_end = alloc_start + writes[0].1.len() as u64;
let mut h = self.get_file(fid, true);
@@ -129,13 +129,14 @@ impl<F: WALStore> WALFilePool<F> {
h = self.get_file(next_fid, true);
alloc_start = 0;
alloc_end = alloc_start + w.len() as u64;
+ fid = next_fid;
} else {
alloc_end += w.len() as u64;
}
}
h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
for (off, w) in writes.into_iter() {
- self.get_file(off >> self.file_nbit, true).write(off, w);
+ self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w);
}
}
@@ -175,7 +176,7 @@ impl<F: WALStore> WALWriter<F> {
/// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
/// function returns. The caller then has the knowledge of WAL writes so it should defer
/// actual data writes after WAL writes.
- pub fn grow(&mut self, records: &[WALBytes]) -> Box<[WALRingId]> {
+ pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> Box<[WALRingId]> {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = std::mem::size_of::<WALRingBlob>() as u32;
@@ -185,7 +186,7 @@ impl<F: WALStore> WALWriter<F> {
// the end of the unwritten data
let mut bbuff_cur = bbuff_start;
- for _rec in records {
+ for _rec in records.as_ref() {
let mut rec = &_rec[..];
let mut rsize = rec.len() as u32;
let mut started = false;
@@ -193,18 +194,22 @@ impl<F: WALStore> WALWriter<F> {
let remain = self.block_size - bbuff_cur;
if remain > msize {
let d = remain - msize;
+ let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64;
let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>(
(&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())};
- let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64;
+ bbuff_cur += msize;
if d >= rsize {
// the remaining rec fits in the block
let payload = rec;
+ println!("rsize {} {}", rsize, d);
blob.crc32 = crc::crc32::checksum_ieee(payload);
blob.rsize = rsize;
blob.rtype = if started {WALRingType::Last} else {WALRingType::Full};
- rsize = 0;
- &mut self.block_buffer[bbuff_cur as usize..].copy_from_slice(payload);
+ &mut self.block_buffer[
+ bbuff_cur as usize..
+ bbuff_cur as usize + payload.len()].copy_from_slice(payload);
bbuff_cur += rsize;
+ rsize = 0;
} else {
// the remaining block can only accommodate partial rec
let payload = &rec[..d as usize];
@@ -214,9 +219,11 @@ impl<F: WALStore> WALWriter<F> {
started = true;
WALRingType::First
};
- rsize -= d;
- &mut self.block_buffer[bbuff_cur as usize..].copy_from_slice(payload);
+ &mut self.block_buffer[
+ bbuff_cur as usize..
+ bbuff_cur as usize + payload.len()].copy_from_slice(payload);
bbuff_cur += d;
+ rsize -= d;
rec = &rec[d as usize..];
}
let ring_end = self.state.next + (bbuff_cur - bbuff_start) as u64;
@@ -247,8 +254,8 @@ impl<F: WALStore> WALWriter<F> {
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
/// complete so that it could automatically remove obsolete WAL files.
- pub fn peel(&mut self, records: &[WALRingId]) {
- for rec in records {
+ pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) {
+ for rec in records.as_ref() {
self.io_complete.push(*rec)
}
let orig_fid = self.state.first_fid;
@@ -267,14 +274,14 @@ impl<F: WALStore> WALWriter<F> {
}
}
-pub struct WALReader<F: WALStore> {
+pub struct WALLoader<F: WALStore> {
file_pool: WALFilePool<F>,
}
-impl<F: WALStore> WALReader<F> {
+impl<F: WALStore> WALLoader<F> {
pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size);
- WALReader{ file_pool }
+ WALLoader{ file_pool }
}
pub fn recover(mut self) -> WALWriter<F> {