summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs25
1 files changed, 14 insertions, 11 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 5dcb78a..a2b88ad 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -44,13 +44,11 @@ impl PartialOrd for WALRingId {
/// the state for a WAL writer
struct WALState {
/// the first file id of WAL
- pub first_fid: WALFileId,
+ first_fid: WALFileId,
/// the next position for a record, addressed in the entire WAL space
- pub next: WALPos,
- /// number of bits for a block
- pub block_nbit: u64,
+ next: WALPos,
/// number of bits for a file
- pub file_nbit: u64,
+ file_nbit: u64,
}
pub trait WALFile {
@@ -68,7 +66,7 @@ pub trait WALStore {
/// Open a file given the filename, create the file if not exists when `touch` is `true`.
fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
/// Unlink a file given the filename.
- fn remove_file(&self, filename: &str) -> bool;
+ fn remove_file(&self, filename: &str) -> Result<(), ()>;
/// Enumerate all WAL files.
fn enumerate_files(&self) -> Box<[String]>;
/// Apply (redo) the payload during recovery.
@@ -142,7 +140,7 @@ impl<F: WALStore> WALFilePool<F> {
}
}
- fn remove_file(&self, fid: u64) -> bool {
+ fn remove_file(&self, fid: u64) -> Result<(), ()> {
self.store.remove_file(&Self::get_fname(fid))
}
@@ -258,6 +256,8 @@ impl<F: WALStore> WALWriter<F> {
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
/// complete so that it could automatically remove obsolete WAL files.
pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) {
+ let msize = std::mem::size_of::<WALRingBlob>() as u64;
+ let block_size = self.block_size as u64;
for rec in records.as_ref() {
self.io_complete.push(*rec)
}
@@ -266,12 +266,16 @@ impl<F: WALStore> WALWriter<F> {
if s != self.next_complete {
break
}
- let m = self.io_complete.pop().unwrap();
+ let mut m = self.io_complete.pop().unwrap();
+ let block_remain = block_size - (m.end & (block_size - 1));
+ if block_remain <= msize as u64 {
+ m.end += block_remain
+ }
self.next_complete = m.end
}
let next_fid = self.next_complete >> self.state.file_nbit;
for fid in orig_fid..next_fid {
- self.file_pool.remove_file(fid);
+ self.file_pool.remove_file(fid).unwrap();
}
self.state.first_fid = next_fid;
}
@@ -339,13 +343,12 @@ impl<F: WALStore> WALLoader<F> {
}
}
f.truncate(0).unwrap();
- self.file_pool.remove_file(fid);
+ self.file_pool.remove_file(fid).unwrap();
}
self.file_pool.reset();
WALWriter::new(WALState {
first_fid: 0,
next: 0,
- block_nbit: self.file_pool.block_nbit,
file_nbit: self.file_pool.file_nbit,
}, self.file_pool)
}