summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/demo1.rs57
-rw-r--r--src/lib.rs70
2 files changed, 77 insertions, 50 deletions
diff --git a/examples/demo1.rs b/examples/demo1.rs
index 536aba2..d1d488c 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -1,13 +1,11 @@
-use growthring::{WALFile, WALStore, WALPos, WALBytes, WALLoader, WALWriter};
-
use std::os::unix::io::RawFd;
-use nix::Error::Sys;
-use nix::errno::Errno;
-use nix::unistd::{close, mkdir, sysconf, SysconfVar};
+use nix::unistd::{close, mkdir, unlinkat, UnlinkatFlags, ftruncate};
use nix::fcntl::{open, openat, OFlag, fallocate, FallocateFlags};
-use nix::sys::{stat::Mode, uio::pwrite};
+use nix::sys::{stat::Mode, uio::{pwrite, pread}};
use libc::off_t;
+use growthring::{WALFile, WALStore, WALPos, WALBytes, WALLoader, WALWriter};
+
struct WALFileTest {
filename: String,
fd: RawFd,
@@ -33,26 +31,39 @@ impl Drop for WALFileTest {
}
impl WALFile for WALFileTest {
- fn allocate(&self, offset: WALPos, length: usize) {
+ fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> {
println!("{}.allocate(offset=0x{:x}, end=0x{:x})", self.filename, offset, offset + length as u64);
- fallocate(self.fd, FallocateFlags::FALLOC_FL_ZERO_RANGE, offset as off_t, length as off_t);
+ fallocate(self.fd,
+ FallocateFlags::FALLOC_FL_ZERO_RANGE,
+ offset as off_t, length as off_t).and_then(|_| Ok(())).or_else(|_| Err(()))
}
+
+ fn truncate(&self, length: usize) -> Result<(), ()> {
+ println!("{}.truncate(length={})", self.filename, length);
+ ftruncate(self.fd, length as off_t).or_else(|_| Err(()))
+ }
+
fn write(&self, offset: WALPos, data: WALBytes) {
println!("{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})",
self.filename, offset, offset + data.len() as u64, hex::encode(&data));
- pwrite(self.fd, &*data, offset as off_t);
+ pwrite(self.fd, &*data, offset as off_t).unwrap();
}
fn read(&self, offset: WALPos, length: usize) -> WALBytes {
- unreachable!()
+ let mut buff = Vec::new();
+ buff.resize(length, 0);
+ pread(self.fd, &mut buff[..], offset as off_t).unwrap();
+ buff.into_boxed_slice()
}
}
struct WALStoreTest {
- rootfd: RawFd
+ rootfd: RawFd,
+ rootpath: String
}
impl WALStoreTest {
fn new(wal_dir: &str, truncate: bool) -> Self {
+ let rootpath = wal_dir.to_string();
if truncate {
let _ = std::fs::remove_dir_all(wal_dir);
}
@@ -64,7 +75,7 @@ impl WALStoreTest {
Ok(fd) => fd,
Err(_) => panic!("error while opening the DB")
};
- WALStoreTest { rootfd }
+ WALStoreTest { rootfd, rootpath }
}
}
@@ -76,20 +87,27 @@ impl Drop for WALStoreTest {
impl WALStore for WALStoreTest {
fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>> {
- println!("open_file(filename={}, touch={}", filename, touch);
+ println!("open_file(filename={}, touch={})", filename, touch);
let filename = filename.to_string();
Some(Box::new(WALFileTest::new(self.rootfd, &filename)))
}
+
fn remove_file(&self, filename: &str) -> bool {
println!("remove_file(filename={})", filename);
- true
+ unlinkat(Some(self.rootfd), filename, UnlinkatFlags::NoRemoveDir).is_ok()
}
+
fn enumerate_files(&self) -> Box<[String]> {
println!("enumerate_files()");
- Vec::new().into_boxed_slice()
+ let mut logfiles = Vec::new();
+ for fname in std::fs::read_dir(&self.rootpath).unwrap() {
+ logfiles.push(fname.unwrap().file_name().into_string().unwrap())
+ }
+ logfiles.into_boxed_slice()
}
+
fn apply_payload(&self, payload: WALBytes) {
- println!("apply_payload(payload=0x{})", hex::encode(payload))
+ println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap())
}
}
@@ -110,4 +128,11 @@ fn main() {
for _ in 0..3 {
test(["a".repeat(10), "b".repeat(100), "c".repeat(1000)].iter().map(|s| s.to_string()).collect::<Vec<String>>(), &mut wal)
}
+ let store = WALStoreTest::new("./wal_demo1", false);
+ let mut wal = WALLoader::new(store, 9, 8, 1000).recover();
+ for _ in 0..3 {
+ test(["a".repeat(10), "b".repeat(100), "c".repeat(300), "d".repeat(400)].iter().map(|s| s.to_string()).collect::<Vec<String>>(), &mut wal)
+ }
+ let store = WALStoreTest::new("./wal_demo1", false);
+ let wal = WALLoader::new(store, 9, 8, 1000).recover();
}
diff --git a/src/lib.rs b/src/lib.rs
index 1e1b653..5dcb78a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -55,7 +55,9 @@ struct WALState {
pub trait WALFile {
/// Initialize the file space in [offset, offset + length) to zero.
- fn allocate(&self, offset: WALPos, length: usize);
+ fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
+ /// Truncate a file to a specified length.
+ fn truncate(&self, length: usize) -> Result<(), ()>;
/// Write data with offset.
fn write(&self, offset: WALPos, data: WALBytes);
/// Read data with offset.
@@ -67,7 +69,7 @@ pub trait WALStore {
fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
/// Unlink a file given the filename.
fn remove_file(&self, filename: &str) -> bool;
- /// Enumerate all WAL files, ordered by their filenames.
+ /// Enumerate all WAL files.
fn enumerate_files(&self) -> Box<[String]>;
/// Apply (redo) the payload during recovery.
fn apply_payload(&self, payload: WALBytes);
@@ -112,7 +114,7 @@ impl<F: WALStore> WALFilePool<F> {
}
fn get_fid(&mut self, fname: &str) -> WALFileId {
- scan_fmt!(fname, "{:x}.log", [hex WALFileId]).unwrap()
+ scan_fmt!(fname, "{x}.log", [hex WALFileId]).unwrap()
}
// TODO: evict stale handles
@@ -125,7 +127,7 @@ impl<F: WALStore> WALFilePool<F> {
for (off, w) in &writes[1..] {
let next_fid = off >> self.file_nbit;
if next_fid != fid {
- h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
+ h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
h = self.get_file(next_fid, true);
alloc_start = 0;
alloc_end = alloc_start + w.len() as u64;
@@ -134,7 +136,7 @@ impl<F: WALStore> WALFilePool<F> {
alloc_end += w.len() as u64;
}
}
- h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
+ h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
for (off, w) in writes.into_iter() {
self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w);
}
@@ -143,6 +145,8 @@ impl<F: WALStore> WALFilePool<F> {
fn remove_file(&self, fid: u64) -> bool {
self.store.remove_file(&Self::get_fname(fid))
}
+
+ fn reset(&mut self) { self.handles.clear() }
}
pub struct WALWriter<F: WALStore> {
@@ -201,7 +205,6 @@ impl<F: WALStore> WALWriter<F> {
if d >= rsize {
// the remaining rec fits in the block
let payload = rec;
- println!("rsize {} {}", rsize, d);
blob.crc32 = crc::crc32::checksum_ieee(payload);
blob.rsize = rsize;
blob.rtype = if started {WALRingType::Last} else {WALRingType::Full};
@@ -285,61 +288,60 @@ impl<F: WALStore> WALLoader<F> {
}
pub fn recover(mut self) -> WALWriter<F> {
+ let block_size = 1 << self.file_pool.block_nbit;
let msize = std::mem::size_of::<WALRingBlob>() as u32;
- let logfiles = self.file_pool.store.enumerate_files();
+ let mut logfiles = self.file_pool.store.enumerate_files();
+ // TODO: use regex to filter out invalid files
+ // TODO: check for missing logfiles
+ logfiles.sort();
+ let mut chunks = None;
for fname in logfiles.iter() {
let fid = self.file_pool.get_fid(fname);
let f = self.file_pool.get_file(fid, false);
let mut off = 0;
- let mut end = false;
- while !end {
+ while block_size - (off & (block_size - 1)) > msize as u64 {
let header_raw = f.read(off, msize as usize);
+ off += msize as u64;
let header = unsafe {
std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
let rsize = header.rsize;
- off += msize as u64;
match header.rtype {
WALRingType::Full => {
+ assert!(chunks.is_none());
let payload = f.read(off, rsize as usize);
- self.file_pool.store.apply_payload(payload);
off += rsize as u64;
+ self.file_pool.store.apply_payload(payload);
},
WALRingType::First => {
- let mut chunks = vec![f.read(off, rsize as usize)];
+ assert!(chunks.is_none());
+ chunks = Some(vec![f.read(off, rsize as usize)]);
off += rsize as u64;
- loop {
- let header_raw = f.read(off, msize as usize);
- let header = unsafe {
- std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
- if let WALRingType::Null = header.rtype {
- end = true;
- break
- }
- let rsize = header.rsize;
- let payload = f.read(off, rsize as usize);
- off += msize as u64;
- chunks.push(payload);
- match header.rtype {
- WALRingType::Middle => (),
- WALRingType::Last => break,
- _ => unreachable!()
- }
- }
+ },
+ WALRingType::Middle => {
+ chunks.as_mut().unwrap().push(f.read(off, rsize as usize));
+ off += rsize as u64;
+ },
+ WALRingType::Last => {
+ chunks.as_mut().unwrap().push(f.read(off, rsize as usize));
+ off += rsize as u64;
+
+ let _chunks = chunks.take().unwrap();
let mut payload = Vec::new();
- payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
+ payload.resize(_chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
let mut ps = &mut payload[..];
- for c in chunks {
+ for c in _chunks {
ps[..c.len()].copy_from_slice(&*c);
ps = &mut ps[c.len()..];
}
self.file_pool.store.apply_payload(payload.into_boxed_slice());
},
- WALRingType::Null => end = true,
- _ => unreachable!()
+ WALRingType::Null => break,
}
}
+ f.truncate(0).unwrap();
self.file_pool.remove_file(fid);
}
+ self.file_pool.reset();
WALWriter::new(WALState {
first_fid: 0,
next: 0,