From 769ffcbe2bd1d268f5214c393a7817faf8944162 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 16 Jun 2020 22:01:33 -0400 Subject: allow using customized aiomgr --- Cargo.lock | 6 +++--- Cargo.toml | 10 +++++----- examples/demo1.rs | 6 +++--- src/lib.rs | 39 ++++++++++++++++++++++++--------------- tests/common/mod.rs | 2 +- 5 files changed, 36 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dbe9b6b..96962a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,7 +232,7 @@ dependencies = [ [[package]] name = "growth-ring" -version = "0.1.1" +version = "0.1.2" dependencies = [ "async-trait", "crc", @@ -281,9 +281,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libaio-futures" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "049dd6ea8958e5c868151b08c68fa771daa979304ab3de8f5bab58126ba53e3b" +checksum = "e831955e07aa19f495c993327d647d6a49bc9e5d6856238c00fd1833a1a06eee" dependencies = [ "crossbeam-channel", "libc", diff --git a/Cargo.toml b/Cargo.toml index a30512d..a8a82ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "growth-ring" -version = "0.1.1" +version = "0.1.2" authors = ["Determinant "] edition = "2018" homepage = "https://github.com/Determinant/growth-ring" @@ -14,10 +14,10 @@ description = "Simple and modular write-ahead-logging implementation." crc = "1.8.1" lru = "0.5.1" scan_fmt = "0.2.5" -regex = "1" -async-trait = "0.1" -futures = "0.3" -libaio-futures = "0.1.1" +regex = "1.3.9" +async-trait = "0.1.35" +futures = "0.3.5" +libaio-futures = "0.1.2" nix = "0.17.0" libc = "0.2.71" diff --git a/examples/demo1.rs b/examples/demo1.rs index e214177..d51801d 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -32,7 +32,7 @@ fn main() { let mut loader = WALLoader::new(); loader.file_nbit(9).block_nbit(8); - let store = WALStoreAIO::new(&wal_dir, true, recover); + let store = WALStoreAIO::new(&wal_dir, true, recover, None).unwrap(); let mut wal = loader.load(store).unwrap(); for _ in 0..3 { test( @@ -50,7 +50,7 @@ fn main() { ); } - let store = WALStoreAIO::new(&wal_dir, false, recover); + let store = WALStoreAIO::new(&wal_dir, false, recover, None).unwrap(); let mut wal = loader.load(store).unwrap(); for _ in 0..3 { test( @@ -64,7 +64,7 @@ fn main() { ); } - let store = WALStoreAIO::new(&wal_dir, false, recover); + let store = WALStoreAIO::new(&wal_dir, false, recover, None).unwrap(); let mut wal = loader.load(store).unwrap(); for _ in 0..3 { let mut ids = Vec::new(); diff --git a/src/lib.rs b/src/lib.rs index 447da08..f7b3f73 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ //! //! //! // Start with empty WAL (truncate = true). -//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}); +//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}, None).unwrap(); //! let mut wal = loader.load(store).unwrap(); //! // Write a vector of records to WAL. //! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() { @@ -26,7 +26,7 @@ //! std::str::from_utf8(&payload).unwrap(), //! ringid); //! Ok(()) -//! }); +//! }, None).unwrap(); //! let mut wal = loader.load(store).unwrap(); //! // We saw some log playback, even there is no failure. //! // Let's try to grow the WAL to create many files. @@ -40,7 +40,7 @@ //! let store = WALStoreAIO::new("./walfiles", false, |payload, _| { //! println!("payload.len() = {}", payload.len()); //! Ok(()) -//! }); +//! }, None).unwrap(); //! let wal = loader.load(store).unwrap(); //! // After each recovery, the ./walfiles is empty. //! ``` @@ -62,14 +62,14 @@ use wal::{WALBytes, WALFile, WALPos, WALRingId, WALStore}; pub struct WALFileAIO { fd: RawFd, - aiomgr: Rc>>, + aiomgr: Rc>, } impl WALFileAIO { pub fn new( rootfd: RawFd, filename: &str, - aiomgr: Rc>>, + aiomgr: Rc>, ) -> Result { openat( rootfd, @@ -108,7 +108,6 @@ impl WALFile for WALFileAIO { async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { self.aiomgr - .borrow_mut() .write(self.fd, offset, data, None) .await .or_else(|_| Err(())) @@ -126,7 +125,7 @@ impl WALFile for WALFileAIO { offset: WALPos, length: usize, ) -> Result, ()> { - block_on(self.aiomgr.borrow_mut().read(self.fd, offset, length, None)) + block_on(self.aiomgr.read(self.fd, offset, length, None)) .or_else(|_| Err(())) .and_then(|(nread, data)| { Ok(if nread == length { Some(data) } else { None }) @@ -138,12 +137,25 @@ pub struct WALStoreAIO Result<(), ()>> { rootfd: RawFd, rootpath: String, recover_func: RefCell, + aiomgr: Rc>, } impl Result<(), ()>> WALStoreAIO { - pub fn new(wal_dir: &str, truncate: bool, recover_func: F) -> Self { + pub fn new( + wal_dir: &str, + truncate: bool, + recover_func: F, + aiomgr: Option>, + ) -> Result { let recover_func = RefCell::new(recover_func); let rootpath = wal_dir.to_string(); + let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else( + |_: Result, ()>| { + AIOManager::new(new_batch_scheduler(None), 128, None, None) + .or(Err(())) + }, + )?); + if truncate { let _ = std::fs::remove_dir_all(wal_dir); } @@ -163,11 +175,12 @@ impl Result<(), ()>> WALStoreAIO { Ok(fd) => fd, Err(_) => panic!("error while opening the WAL directory"), }; - WALStoreAIO { + Ok(WALStoreAIO { rootfd, rootpath, recover_func, - } + aiomgr, + }) } } @@ -183,11 +196,7 @@ impl Result<(), ()>> WALStore _touch: bool, ) -> Result, ()> { let filename = filename.to_string(); - let aiomgr = Rc::new(RefCell::new( - AIOManager::new(new_batch_scheduler(None), 10, None, None) - .or(Err(()))?, - )); - WALFileAIO::new(self.rootfd, &filename, aiomgr.clone()) + WALFileAIO::new(self.rootfd, &filename, self.aiomgr.clone()) .and_then(|f| Ok(Box::new(f) as Box)) } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 247b2ee..769f422 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -3,7 +3,7 @@ #[allow(dead_code)] use async_trait::async_trait; use growthring::wal::{ - WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, RecoverPolicy + WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore }; use indexmap::{map::Entry, IndexMap}; use rand::Rng; -- cgit v1.2.3