aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/wal.rs30
1 files changed, 18 insertions, 12 deletions
diff --git a/src/wal.rs b/src/wal.rs
index 46baeb8..fe85c41 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -171,13 +171,15 @@ pub struct WALWriter<F: WALStore> {
block_buffer: WALBytes,
block_size: u32,
next_complete: WALPos,
- io_complete: BinaryHeap<WALRingId>
+ io_complete: BinaryHeap<WALRingId>,
+ msize: usize
}
impl<F: WALStore> WALWriter<F> {
fn new(state: WALState, file_pool: WALFilePool<F>) -> Self {
let mut b = Vec::new();
let block_size = 1 << file_pool.block_nbit as u32;
+ let msize = std::mem::size_of::<WALRingBlob>();
b.resize(block_size as usize, 0);
WALWriter{
state,
@@ -186,6 +188,7 @@ impl<F: WALStore> WALWriter<F> {
block_size,
next_complete: 0,
io_complete: BinaryHeap::new(),
+ msize
}
}
@@ -195,7 +198,7 @@ impl<F: WALStore> WALWriter<F> {
pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> (Box<[WALRingId]>, Result<(), ()>) {
let mut res = Vec::new();
let mut writes = Vec::new();
- let msize = std::mem::size_of::<WALRingBlob>() as u32;
+ let msize = self.msize as u32;
// the global offest of the begining of the block
// the start of the unwritten data
let mut bbuff_start = self.state.next as u32 & (self.block_size - 1);
@@ -275,7 +278,7 @@ impl<F: WALStore> WALWriter<F> {
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
/// complete so that it could automatically remove obsolete WAL files.
pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) -> Result<(), ()> {
- let msize = std::mem::size_of::<WALRingBlob>() as u64;
+ let msize = self.msize as u64;
let block_size = self.block_size as u64;
for rec in records.as_ref() {
self.io_complete.push(*rec);
@@ -305,13 +308,17 @@ pub struct WALLoader {
file_nbit: u8,
block_nbit: u8,
cache_size: usize,
+ msize: usize,
filename_fmt: regex::Regex
}
impl WALLoader {
pub fn new(file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
+ let msize = std::mem::size_of::<WALRingBlob>();
+ assert!(file_nbit > block_nbit);
+ assert!(msize < 1 << block_nbit);
let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap();
- WALLoader{ file_nbit, block_nbit, cache_size, filename_fmt }
+ WALLoader{ file_nbit, block_nbit, cache_size, msize, filename_fmt }
}
/// Recover by reading the WAL log files.
@@ -330,12 +337,7 @@ impl WALLoader {
let f = file_pool.get_file(fid, false)?;
let mut off = 0;
while let Some(header_raw) = f.read(off, msize as usize)? {
- let block_remain = block_size - (off & (block_size - 1));
- if block_remain <= msize as u64 {
- off += block_remain;
- continue
- }
- let ringid_start = (fid << file_pool.file_nbit) | off;
+ let ringid_start = (fid << file_pool.file_nbit) + off;
off += msize as u64;
let header = unsafe {
std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
@@ -349,7 +351,7 @@ impl WALLoader {
payload,
WALRingId {
start: ringid_start,
- end: (fid << file_pool.file_nbit) | off
+ end: (fid << file_pool.file_nbit) + off
})?;
},
WALRingType::First => {
@@ -378,13 +380,17 @@ impl WALLoader {
payload.into_boxed_slice(),
WALRingId {
start: ringid_start,
- end: (fid << file_pool.file_nbit) | off
+ end: (fid << file_pool.file_nbit) + off
})?;
} // otherwise ignore the leftover
else { off += rsize as u64; }
},
WALRingType::Null => break,
}
+ let block_remain = block_size - (off & (block_size - 1));
+ if block_remain <= msize as u64 {
+ off += block_remain;
+ }
}
f.truncate(0)?;
file_pool.remove_file(fid)?;