summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-06-10 12:45:25 -0400
committerDeterminant <[email protected]>2020-06-10 12:45:25 -0400
commit81eb9ce9299b2e31471db3f00457ff3c1a41fb60 (patch)
treee98f6fcd416ac9ebf22f5fb47eecf2f836d7dc51
parentf8fd7a86c11bf15a41cd402b2ef8fe457f0c2026 (diff)
filter out other non-WAL files
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--examples/demo1.rs6
-rw-r--r--src/wal.rs26
4 files changed, 25 insertions, 9 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 7b26bbc..34e5a1d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -98,6 +98,7 @@ dependencies = [
"lru",
"nix",
"rand",
+ "regex",
"scan_fmt",
]
diff --git a/Cargo.toml b/Cargo.toml
index 1593173..1a60577 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,6 +10,7 @@ edition = "2018"
crc = "1.8.1"
lru = "0.5.1"
scan_fmt = "0.2.5"
+regex = "1"
[dev-dependencies]
hex = "0.4.2"
diff --git a/examples/demo1.rs b/examples/demo1.rs
index 104c3b0..e0c7d58 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -87,6 +87,8 @@ impl Drop for WALStoreTest {
}
impl WALStore for WALStoreTest {
+ type FileNameIter = std::vec::IntoIter<String>;
+
fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>> {
println!("open_file(filename={}, touch={})", filename, touch);
let filename = filename.to_string();
@@ -98,13 +100,13 @@ impl WALStore for WALStoreTest {
unlinkat(Some(self.rootfd), filename, UnlinkatFlags::NoRemoveDir).or_else(|_| Err(()))
}
- fn enumerate_files(&self) -> Box<[String]> {
+ fn enumerate_files(&self) -> Self::FileNameIter {
println!("enumerate_files()");
let mut logfiles = Vec::new();
for fname in std::fs::read_dir(&self.rootpath).unwrap() {
logfiles.push(fname.unwrap().file_name().into_string().unwrap())
}
- logfiles.into_boxed_slice()
+ logfiles.into_iter()
}
fn apply_payload(&self, payload: WALBytes) {
diff --git a/src/wal.rs b/src/wal.rs
index 05ffd01..0ba35fb 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -55,20 +55,29 @@ pub trait WALFile {
fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
/// Truncate a file to a specified length.
fn truncate(&self, length: usize) -> Result<(), ()>;
- /// Write data with offset.
+ /// Write data with offset. We assume the actual writes on the storage medium are _strictly
+ /// ordered_ the same way as this callback is invoked. We also assume all previous
+ /// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed
+ /// by most OS). Additionally, the final write caused by each invocation of this function
+ /// should be _atomic_ (the entire single write should be all or nothing).
fn write(&self, offset: WALPos, data: WALBytes);
/// Read data with offset.
fn read(&self, offset: WALPos, length: usize) -> WALBytes;
}
pub trait WALStore {
+ type FileNameIter: Iterator<Item = String>;
+
/// Open a file given the filename, create the file if not exists when `touch` is `true`.
fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
/// Unlink a file given the filename.
fn remove_file(&self, filename: &str) -> Result<(), ()>;
- /// Enumerate all WAL files.
- fn enumerate_files(&self) -> Box<[String]>;
- /// Apply (redo) the payload during recovery.
+ /// Enumerate all WAL filenames. It should include all WAL files that are previously opened
+ /// (created) but not removed. The list could be unordered.
+ fn enumerate_files(&self) -> Self::FileNameIter;
+ /// Apply the payload during recovery. This notifies the application should redo the given
+ /// operation to ensure its state is consistent (the major goal of having a WAL). We assume
+ /// the application applies the payload by the _order_ of this callback invocation.
fn apply_payload(&self, payload: WALBytes);
}
@@ -282,19 +291,22 @@ impl<F: WALStore> WALWriter<F> {
pub struct WALLoader<F: WALStore> {
file_pool: WALFilePool<F>,
+ filename_fmt: regex::Regex
}
impl<F: WALStore> WALLoader<F> {
pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size);
- WALLoader{ file_pool }
+ let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap();
+ WALLoader{ file_pool, filename_fmt }
}
pub fn recover(mut self) -> WALWriter<F> {
let block_size = 1 << self.file_pool.block_nbit;
let msize = std::mem::size_of::<WALRingBlob>() as u32;
- let mut logfiles = self.file_pool.store.enumerate_files();
- // TODO: use regex to filter out invalid files
+ let mut logfiles: Vec<String> = self.file_pool.store
+ .enumerate_files()
+ .filter(|f| self.filename_fmt.is_match(f)).collect();
// TODO: check for missing logfiles
logfiles.sort();
let mut chunks = None;