From 769ffcbe2bd1d268f5214c393a7817faf8944162 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 16 Jun 2020 22:01:33 -0400 Subject: allow using customized aiomgr --- src/lib.rs | 39 ++++++++++++++++++++++++--------------- 1 file changed, 24 insertions(+), 15 deletions(-) (limited to 'src/lib.rs') diff --git a/src/lib.rs b/src/lib.rs index 447da08..f7b3f73 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ //! //! //! // Start with empty WAL (truncate = true). -//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}); +//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}, None).unwrap(); //! let mut wal = loader.load(store).unwrap(); //! // Write a vector of records to WAL. //! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() { @@ -26,7 +26,7 @@ //! std::str::from_utf8(&payload).unwrap(), //! ringid); //! Ok(()) -//! }); +//! }, None).unwrap(); //! let mut wal = loader.load(store).unwrap(); //! // We saw some log playback, even there is no failure. //! // Let's try to grow the WAL to create many files. @@ -40,7 +40,7 @@ //! let store = WALStoreAIO::new("./walfiles", false, |payload, _| { //! println!("payload.len() = {}", payload.len()); //! Ok(()) -//! }); +//! }, None).unwrap(); //! let wal = loader.load(store).unwrap(); //! // After each recovery, the ./walfiles is empty. //! ``` @@ -62,14 +62,14 @@ use wal::{WALBytes, WALFile, WALPos, WALRingId, WALStore}; pub struct WALFileAIO { fd: RawFd, - aiomgr: Rc>>, + aiomgr: Rc>, } impl WALFileAIO { pub fn new( rootfd: RawFd, filename: &str, - aiomgr: Rc>>, + aiomgr: Rc>, ) -> Result { openat( rootfd, @@ -108,7 +108,6 @@ impl WALFile for WALFileAIO { async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { self.aiomgr - .borrow_mut() .write(self.fd, offset, data, None) .await .or_else(|_| Err(())) @@ -126,7 +125,7 @@ impl WALFile for WALFileAIO { offset: WALPos, length: usize, ) -> Result, ()> { - block_on(self.aiomgr.borrow_mut().read(self.fd, offset, length, None)) + block_on(self.aiomgr.read(self.fd, offset, length, None)) .or_else(|_| Err(())) .and_then(|(nread, data)| { Ok(if nread == length { Some(data) } else { None }) @@ -138,12 +137,25 @@ pub struct WALStoreAIO Result<(), ()>> { rootfd: RawFd, rootpath: String, recover_func: RefCell, + aiomgr: Rc>, } impl Result<(), ()>> WALStoreAIO { - pub fn new(wal_dir: &str, truncate: bool, recover_func: F) -> Self { + pub fn new( + wal_dir: &str, + truncate: bool, + recover_func: F, + aiomgr: Option>, + ) -> Result { let recover_func = RefCell::new(recover_func); let rootpath = wal_dir.to_string(); + let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else( + |_: Result, ()>| { + AIOManager::new(new_batch_scheduler(None), 128, None, None) + .or(Err(())) + }, + )?); + if truncate { let _ = std::fs::remove_dir_all(wal_dir); } @@ -163,11 +175,12 @@ impl Result<(), ()>> WALStoreAIO { Ok(fd) => fd, Err(_) => panic!("error while opening the WAL directory"), }; - WALStoreAIO { + Ok(WALStoreAIO { rootfd, rootpath, recover_func, - } + aiomgr, + }) } } @@ -183,11 +196,7 @@ impl Result<(), ()>> WALStore _touch: bool, ) -> Result, ()> { let filename = filename.to_string(); - let aiomgr = Rc::new(RefCell::new( - AIOManager::new(new_batch_scheduler(None), 10, None, None) - .or(Err(()))?, - )); - WALFileAIO::new(self.rootfd, &filename, aiomgr.clone()) + WALFileAIO::new(self.rootfd, &filename, self.aiomgr.clone()) .and_then(|f| Ok(Box::new(f) as Box)) } -- cgit v1.2.3