summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-06-10 01:32:16 -0400
committerDeterminant <[email protected]>2020-06-10 01:32:16 -0400
commit895c761cc46f48907dc8442297f84c8959692b49 (patch)
treee95a53a53964d42544fac88ea2dd73e448872a4d /src
parent1e981e69a6cf55ebe095f1584da20bae6a259d0c (diff)
basic WAL functionality works
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs70
1 files changed, 36 insertions, 34 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 1e1b653..5dcb78a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -55,7 +55,9 @@ struct WALState {
pub trait WALFile {
/// Initialize the file space in [offset, offset + length) to zero.
- fn allocate(&self, offset: WALPos, length: usize);
+ fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
+ /// Truncate a file to a specified length.
+ fn truncate(&self, length: usize) -> Result<(), ()>;
/// Write data with offset.
fn write(&self, offset: WALPos, data: WALBytes);
/// Read data with offset.
@@ -67,7 +69,7 @@ pub trait WALStore {
fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
/// Unlink a file given the filename.
fn remove_file(&self, filename: &str) -> bool;
- /// Enumerate all WAL files, ordered by their filenames.
+ /// Enumerate all WAL files.
fn enumerate_files(&self) -> Box<[String]>;
/// Apply (redo) the payload during recovery.
fn apply_payload(&self, payload: WALBytes);
@@ -112,7 +114,7 @@ impl<F: WALStore> WALFilePool<F> {
}
fn get_fid(&mut self, fname: &str) -> WALFileId {
- scan_fmt!(fname, "{:x}.log", [hex WALFileId]).unwrap()
+ scan_fmt!(fname, "{x}.log", [hex WALFileId]).unwrap()
}
// TODO: evict stale handles
@@ -125,7 +127,7 @@ impl<F: WALStore> WALFilePool<F> {
for (off, w) in &writes[1..] {
let next_fid = off >> self.file_nbit;
if next_fid != fid {
- h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
+ h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
h = self.get_file(next_fid, true);
alloc_start = 0;
alloc_end = alloc_start + w.len() as u64;
@@ -134,7 +136,7 @@ impl<F: WALStore> WALFilePool<F> {
alloc_end += w.len() as u64;
}
}
- h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
+ h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
for (off, w) in writes.into_iter() {
self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w);
}
@@ -143,6 +145,8 @@ impl<F: WALStore> WALFilePool<F> {
fn remove_file(&self, fid: u64) -> bool {
self.store.remove_file(&Self::get_fname(fid))
}
+
+ fn reset(&mut self) { self.handles.clear() }
}
pub struct WALWriter<F: WALStore> {
@@ -201,7 +205,6 @@ impl<F: WALStore> WALWriter<F> {
if d >= rsize {
// the remaining rec fits in the block
let payload = rec;
- println!("rsize {} {}", rsize, d);
blob.crc32 = crc::crc32::checksum_ieee(payload);
blob.rsize = rsize;
blob.rtype = if started {WALRingType::Last} else {WALRingType::Full};
@@ -285,61 +288,60 @@ impl<F: WALStore> WALLoader<F> {
}
pub fn recover(mut self) -> WALWriter<F> {
+ let block_size = 1 << self.file_pool.block_nbit;
let msize = std::mem::size_of::<WALRingBlob>() as u32;
- let logfiles = self.file_pool.store.enumerate_files();
+ let mut logfiles = self.file_pool.store.enumerate_files();
+ // TODO: use regex to filter out invalid files
+ // TODO: check for missing logfiles
+ logfiles.sort();
+ let mut chunks = None;
for fname in logfiles.iter() {
let fid = self.file_pool.get_fid(fname);
let f = self.file_pool.get_file(fid, false);
let mut off = 0;
- let mut end = false;
- while !end {
+ while block_size - (off & (block_size - 1)) > msize as u64 {
let header_raw = f.read(off, msize as usize);
+ off += msize as u64;
let header = unsafe {
std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
let rsize = header.rsize;
- off += msize as u64;
match header.rtype {
WALRingType::Full => {
+ assert!(chunks.is_none());
let payload = f.read(off, rsize as usize);
- self.file_pool.store.apply_payload(payload);
off += rsize as u64;
+ self.file_pool.store.apply_payload(payload);
},
WALRingType::First => {
- let mut chunks = vec![f.read(off, rsize as usize)];
+ assert!(chunks.is_none());
+ chunks = Some(vec![f.read(off, rsize as usize)]);
off += rsize as u64;
- loop {
- let header_raw = f.read(off, msize as usize);
- let header = unsafe {
- std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
- if let WALRingType::Null = header.rtype {
- end = true;
- break
- }
- let rsize = header.rsize;
- let payload = f.read(off, rsize as usize);
- off += msize as u64;
- chunks.push(payload);
- match header.rtype {
- WALRingType::Middle => (),
- WALRingType::Last => break,
- _ => unreachable!()
- }
- }
+ },
+ WALRingType::Middle => {
+ chunks.as_mut().unwrap().push(f.read(off, rsize as usize));
+ off += rsize as u64;
+ },
+ WALRingType::Last => {
+ chunks.as_mut().unwrap().push(f.read(off, rsize as usize));
+ off += rsize as u64;
+
+ let _chunks = chunks.take().unwrap();
let mut payload = Vec::new();
- payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
+ payload.resize(_chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
let mut ps = &mut payload[..];
- for c in chunks {
+ for c in _chunks {
ps[..c.len()].copy_from_slice(&*c);
ps = &mut ps[c.len()..];
}
self.file_pool.store.apply_payload(payload.into_boxed_slice());
},
- WALRingType::Null => end = true,
- _ => unreachable!()
+ WALRingType::Null => break,
}
}
+ f.truncate(0).unwrap();
self.file_pool.remove_file(fid);
}
+ self.file_pool.reset();
WALWriter::new(WALState {
first_fid: 0,
next: 0,