aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock58
-rw-r--r--Cargo.toml1
-rw-r--r--src/lib.rs148
3 files changed, 178 insertions, 29 deletions
diff --git a/Cargo.lock b/Cargo.lock
index f9caa86..d6e8d74 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -10,6 +10,15 @@ dependencies = [
]
[[package]]
+name = "aho-corasick"
+version = "0.7.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8716408b8bc624ed7f65d223ddb9ac2d044c0547b6fa4b0d554f3a9540496ada"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
name = "autocfg"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -73,6 +82,7 @@ version = "0.1.0"
dependencies = [
"crc",
"lru",
+ "scan_fmt",
]
[[package]]
@@ -86,6 +96,12 @@ dependencies = [
]
[[package]]
+name = "lazy_static"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
+
+[[package]]
name = "libc"
version = "0.2.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -101,12 +117,54 @@ dependencies = [
]
[[package]]
+name = "memchr"
+version = "2.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
+
+[[package]]
name = "proc-macro-hack"
version = "0.5.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4"
[[package]]
+name = "regex"
+version = "1.3.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6"
+dependencies = [
+ "aho-corasick",
+ "memchr",
+ "regex-syntax",
+ "thread_local",
+]
+
+[[package]]
+name = "regex-syntax"
+version = "0.6.18"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8"
+
+[[package]]
+name = "scan_fmt"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "248286eec0f55678879ef1caec3d76276643ebcb5460d8cb6e732ef40f50aabe"
+dependencies = [
+ "regex",
+]
+
+[[package]]
+name = "thread_local"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
+dependencies = [
+ "lazy_static",
+]
+
+[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index fb77a6a..ee8d867 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -9,6 +9,7 @@ edition = "2018"
[dependencies]
crc = "1.8.1"
lru = "0.5.1"
+scan_fmt = "0.2.5"
[lib]
name = "growthring"
diff --git a/src/lib.rs b/src/lib.rs
index 29f7835..53f4e0f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,7 +1,9 @@
+#[macro_use] extern crate scan_fmt;
use std::collections::BinaryHeap;
#[repr(u8)]
enum WALRingType {
+ #[allow(dead_code)]
Null = 0x0,
Full,
First,
@@ -17,6 +19,7 @@ struct WALRingBlob {
// payload follows
}
+type WALBytes = Box<[u8]>;
type WALFileId = u64;
type WALPos = u64;
@@ -45,21 +48,29 @@ struct WALState {
/// the next position for a record, addressed in the entire WAL space
pub next: WALPos,
/// number of bits for a block
- pub block_nbit: u8,
+ pub block_nbit: u64,
/// number of bits for a file
- pub file_nbit: u8,
+ pub file_nbit: u64,
}
pub trait WALFile {
+ /// Initialize the file space in [offset, offset + length) to zero.
fn allocate(&self, offset: WALPos, length: usize);
- fn write(&self, offset: WALPos, data: Box<[u8]>);
- fn read(&self, offset: WALPos, length: usize) -> Box<[u8]>;
+ /// Write data with offset.
+ fn write(&self, offset: WALPos, data: WALBytes);
+ /// Read data with offset.
+ fn read(&self, offset: WALPos, length: usize) -> WALBytes;
}
pub trait WALStore {
+ /// Open a file given the filename, create the file if not exists when `touch` is `true`.
fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
+ /// Unlink a file given the filename.
fn remove_file(&self, filename: &str) -> bool;
- fn scan_files(&self) -> Box<[&str]>;
+ /// Enumerate all WAL files, ordered by their filenames.
+ fn enumerate_files(&self) -> Box<[String]>;
+ /// Apply (redo) the payload during recovery.
+ fn apply_payload(&self, payload: WALBytes);
}
/// The middle layer that manages WAL file handles and invokes public trait functions to actually
@@ -69,18 +80,19 @@ struct WALFilePool<F: WALStore> {
handles: lru::LruCache<WALFileId, Box<dyn WALFile>>,
file_nbit: u64,
file_size: u64,
- block_size: u64,
+ block_nbit: u64,
}
impl<F: WALStore> WALFilePool<F> {
fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
let file_nbit = file_nbit as u64;
+ let block_nbit = block_nbit as u64;
WALFilePool {
store,
handles: lru::LruCache::new(cache_size),
file_nbit,
file_size: 1 << (file_nbit as u64),
- block_size: 1 << (block_nbit as u64)
+ block_nbit,
}
}
@@ -88,65 +100,71 @@ impl<F: WALStore> WALFilePool<F> {
format!("{:08x}.log", fid)
}
- fn get_file(&mut self, fid: u64) -> &'static dyn WALFile {
+ fn get_file(&mut self, fid: u64, touch: bool) -> &'static dyn WALFile {
let h = match self.handles.get(&fid) {
Some(h) => &**h,
None => {
- self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), true).unwrap());
+ self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch).unwrap());
&**self.handles.get(&fid).unwrap()
}
};
unsafe {&*(h as *const dyn WALFile)}
}
- fn write(&mut self, writes: Vec<(WALPos, Box<[u8]>)>) {
- // pre-allocate the blocks
+ fn get_fid(&mut self, fname: &str) -> WALFileId {
+ scan_fmt!(fname, "{:x}.log", [hex WALFileId]).unwrap()
+ }
+
+ // TODO: evict stale handles
+ fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) {
+ // pre-allocate the file space
let fid = writes[0].0 >> self.file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
- let mut alloc_end = alloc_start + self.block_size;
- let mut h = self.get_file(fid);
- for (off, _) in &writes[1..] {
+ let mut alloc_end = alloc_start + writes[0].1.len() as u64;
+ let mut h = self.get_file(fid, true);
+ for (off, w) in &writes[1..] {
let next_fid = off >> self.file_nbit;
if next_fid != fid {
h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
- h = self.get_file(next_fid);
+ h = self.get_file(next_fid, true);
alloc_start = 0;
- alloc_end = alloc_start + self.block_size;
+ alloc_end = alloc_start + w.len() as u64;
} else {
- alloc_end += self.block_size;
+ alloc_end += w.len() as u64;
}
}
h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
for (off, w) in writes.into_iter() {
- self.get_file(off >> self.file_nbit).write(off, w);
+ self.get_file(off >> self.file_nbit, true).write(off, w);
}
}
fn remove_file(&self, fid: u64) -> bool {
- true
+ self.store.remove_file(&Self::get_fname(fid))
}
}
pub struct WALWriter<F: WALStore> {
state: WALState,
file_pool: WALFilePool<F>,
- block_buffer: Box<[u8]>,
+ block_buffer: WALBytes,
block_size: u32,
next_complete: WALPos,
io_complete: BinaryHeap<WALRingId>
}
impl<F: WALStore> WALWriter<F> {
- fn new(state: WALState, wal_store: F, cache_size: usize) -> Self {
+ fn new(state: WALState, file_pool: WALFilePool<F>) -> Self {
let mut b = Vec::new();
- let block_nbit = state.block_nbit;
- let block_size = 1 << (block_nbit as u32);
- let file_nbit = state.file_nbit;
- let file_size = 1 << (file_nbit as u64);
+ let block_size = 1 << file_pool.block_nbit as u32;
+ //let block_nbit = state.block_nbit;
+ //let block_size = 1 << (block_nbit as u32);
+ //let file_nbit = state.file_nbit;
+ //let file_size = 1 << (file_nbit as u64);
b.resize(block_size as usize, 0);
WALWriter{
state,
- file_pool: WALFilePool::new(wal_store, file_nbit, block_nbit, cache_size),
+ file_pool,
block_buffer: b.into_boxed_slice(),
block_size,
next_complete: 0,
@@ -157,7 +175,7 @@ impl<F: WALStore> WALWriter<F> {
/// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
/// function returns. The caller then has the knowledge of WAL writes so it should defer
/// actual data writes after WAL writes.
- pub fn grow(&mut self, records: &[Box<[u8]>]) -> Box<[WALRingId]> {
+ pub fn grow(&mut self, records: &[WALBytes]) -> Box<[WALRingId]> {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = std::mem::size_of::<WALRingBlob>() as u32;
@@ -176,7 +194,7 @@ impl<F: WALStore> WALWriter<F> {
if remain > msize {
let d = remain - msize;
let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>(
- &mut self.block_buffer[bbuff_cur as usize] as *mut u8)};
+ (&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())};
let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64;
if d >= rsize {
// the remaining rec fits in the block
@@ -249,5 +267,77 @@ impl<F: WALStore> WALWriter<F> {
}
}
-struct WALReader {
+pub struct WALReader<F: WALStore> {
+ file_pool: WALFilePool<F>,
+}
+
+impl<F: WALStore> WALReader<F> {
+ pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
+ let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size);
+ WALReader{ file_pool }
+ }
+
+ pub fn recover(mut self) -> WALWriter<F> {
+ let msize = std::mem::size_of::<WALRingBlob>() as u32;
+ let logfiles = self.file_pool.store.enumerate_files();
+ for fname in logfiles.iter() {
+ let fid = self.file_pool.get_fid(fname);
+ let f = self.file_pool.get_file(fid, false);
+ let mut off = 0;
+ let mut end = false;
+ while !end {
+ let header_raw = f.read(off, msize as usize);
+ let header = unsafe {
+ std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
+ let rsize = header.rsize;
+ off += msize as u64;
+ match header.rtype {
+ WALRingType::Full => {
+ let payload = f.read(off, rsize as usize);
+ self.file_pool.store.apply_payload(payload);
+ off += rsize as u64;
+ },
+ WALRingType::First => {
+ let mut chunks = vec![f.read(off, rsize as usize)];
+ off += rsize as u64;
+ loop {
+ let header_raw = f.read(off, msize as usize);
+ let header = unsafe {
+ std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
+ if let WALRingType::Null = header.rtype {
+ end = true;
+ break
+ }
+ let rsize = header.rsize;
+ let payload = f.read(off, rsize as usize);
+ off += msize as u64;
+ chunks.push(payload);
+ match header.rtype {
+ WALRingType::Middle => (),
+ WALRingType::Last => break,
+ _ => unreachable!()
+ }
+ }
+ let mut payload = Vec::new();
+ payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
+ let mut ps = &mut payload[..];
+ for c in chunks {
+ ps[..c.len()].copy_from_slice(&*c);
+ ps = &mut ps[c.len()..];
+ }
+ self.file_pool.store.apply_payload(payload.into_boxed_slice());
+ },
+ WALRingType::Null => end = true,
+ _ => unreachable!()
+ }
+ }
+ self.file_pool.remove_file(fid);
+ }
+ WALWriter::new(WALState {
+ first_fid: 0,
+ next: 0,
+ block_nbit: self.file_pool.block_nbit,
+ file_nbit: self.file_pool.file_nbit,
+ }, self.file_pool)
+ }
}