diff options
author | Determinant <[email protected]> | 2020-06-10 12:45:25 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-10 12:45:25 -0400 |
commit | 81eb9ce9299b2e31471db3f00457ff3c1a41fb60 (patch) | |
tree | e98f6fcd416ac9ebf22f5fb47eecf2f836d7dc51 | |
parent | f8fd7a86c11bf15a41cd402b2ef8fe457f0c2026 (diff) |
filter out other non-WAL files
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | examples/demo1.rs | 6 | ||||
-rw-r--r-- | src/wal.rs | 26 |
4 files changed, 25 insertions, 9 deletions
@@ -98,6 +98,7 @@ dependencies = [ "lru", "nix", "rand", + "regex", "scan_fmt", ] @@ -10,6 +10,7 @@ edition = "2018" crc = "1.8.1" lru = "0.5.1" scan_fmt = "0.2.5" +regex = "1" [dev-dependencies] hex = "0.4.2" diff --git a/examples/demo1.rs b/examples/demo1.rs index 104c3b0..e0c7d58 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -87,6 +87,8 @@ impl Drop for WALStoreTest { } impl WALStore for WALStoreTest { + type FileNameIter = std::vec::IntoIter<String>; + fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>> { println!("open_file(filename={}, touch={})", filename, touch); let filename = filename.to_string(); @@ -98,13 +100,13 @@ impl WALStore for WALStoreTest { unlinkat(Some(self.rootfd), filename, UnlinkatFlags::NoRemoveDir).or_else(|_| Err(())) } - fn enumerate_files(&self) -> Box<[String]> { + fn enumerate_files(&self) -> Self::FileNameIter { println!("enumerate_files()"); let mut logfiles = Vec::new(); for fname in std::fs::read_dir(&self.rootpath).unwrap() { logfiles.push(fname.unwrap().file_name().into_string().unwrap()) } - logfiles.into_boxed_slice() + logfiles.into_iter() } fn apply_payload(&self, payload: WALBytes) { @@ -55,20 +55,29 @@ pub trait WALFile { fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>; /// Truncate a file to a specified length. fn truncate(&self, length: usize) -> Result<(), ()>; - /// Write data with offset. + /// Write data with offset. We assume the actual writes on the storage medium are _strictly + /// ordered_ the same way as this callback is invoked. We also assume all previous + /// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed + /// by most OS). Additionally, the final write caused by each invocation of this function + /// should be _atomic_ (the entire single write should be all or nothing). fn write(&self, offset: WALPos, data: WALBytes); /// Read data with offset. fn read(&self, offset: WALPos, length: usize) -> WALBytes; } pub trait WALStore { + type FileNameIter: Iterator<Item = String>; + /// Open a file given the filename, create the file if not exists when `touch` is `true`. fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>; /// Unlink a file given the filename. fn remove_file(&self, filename: &str) -> Result<(), ()>; - /// Enumerate all WAL files. - fn enumerate_files(&self) -> Box<[String]>; - /// Apply (redo) the payload during recovery. + /// Enumerate all WAL filenames. It should include all WAL files that are previously opened + /// (created) but not removed. The list could be unordered. + fn enumerate_files(&self) -> Self::FileNameIter; + /// Apply the payload during recovery. This notifies the application should redo the given + /// operation to ensure its state is consistent (the major goal of having a WAL). We assume + /// the application applies the payload by the _order_ of this callback invocation. fn apply_payload(&self, payload: WALBytes); } @@ -282,19 +291,22 @@ impl<F: WALStore> WALWriter<F> { pub struct WALLoader<F: WALStore> { file_pool: WALFilePool<F>, + filename_fmt: regex::Regex } impl<F: WALStore> WALLoader<F> { pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size); - WALLoader{ file_pool } + let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap(); + WALLoader{ file_pool, filename_fmt } } pub fn recover(mut self) -> WALWriter<F> { let block_size = 1 << self.file_pool.block_nbit; let msize = std::mem::size_of::<WALRingBlob>() as u32; - let mut logfiles = self.file_pool.store.enumerate_files(); - // TODO: use regex to filter out invalid files + let mut logfiles: Vec<String> = self.file_pool.store + .enumerate_files() + .filter(|f| self.filename_fmt.is_match(f)).collect(); // TODO: check for missing logfiles logfiles.sort(); let mut chunks = None; |