diff options
Diffstat (limited to 'src/lib.rs')
-rw-r--r-- | src/lib.rs | 39 |
1 files changed, 24 insertions, 15 deletions
@@ -10,7 +10,7 @@ //! //! //! // Start with empty WAL (truncate = true). -//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}); +//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}, None).unwrap(); //! let mut wal = loader.load(store).unwrap(); //! // Write a vector of records to WAL. //! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() { @@ -26,7 +26,7 @@ //! std::str::from_utf8(&payload).unwrap(), //! ringid); //! Ok(()) -//! }); +//! }, None).unwrap(); //! let mut wal = loader.load(store).unwrap(); //! // We saw some log playback, even there is no failure. //! // Let's try to grow the WAL to create many files. @@ -40,7 +40,7 @@ //! let store = WALStoreAIO::new("./walfiles", false, |payload, _| { //! println!("payload.len() = {}", payload.len()); //! Ok(()) -//! }); +//! }, None).unwrap(); //! let wal = loader.load(store).unwrap(); //! // After each recovery, the ./walfiles is empty. //! ``` @@ -62,14 +62,14 @@ use wal::{WALBytes, WALFile, WALPos, WALRingId, WALStore}; pub struct WALFileAIO { fd: RawFd, - aiomgr: Rc<RefCell<AIOManager<AIOBatchSchedulerIn>>>, + aiomgr: Rc<AIOManager<AIOBatchSchedulerIn>>, } impl WALFileAIO { pub fn new( rootfd: RawFd, filename: &str, - aiomgr: Rc<RefCell<AIOManager<AIOBatchSchedulerIn>>>, + aiomgr: Rc<AIOManager<AIOBatchSchedulerIn>>, ) -> Result<Self, ()> { openat( rootfd, @@ -108,7 +108,6 @@ impl WALFile for WALFileAIO { async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { self.aiomgr - .borrow_mut() .write(self.fd, offset, data, None) .await .or_else(|_| Err(())) @@ -126,7 +125,7 @@ impl WALFile for WALFileAIO { offset: WALPos, length: usize, ) -> Result<Option<WALBytes>, ()> { - block_on(self.aiomgr.borrow_mut().read(self.fd, offset, length, None)) + block_on(self.aiomgr.read(self.fd, offset, length, None)) .or_else(|_| Err(())) .and_then(|(nread, data)| { Ok(if nread == length { Some(data) } else { None }) @@ -138,12 +137,25 @@ pub struct WALStoreAIO<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> { rootfd: RawFd, rootpath: String, recover_func: RefCell<F>, + aiomgr: Rc<AIOManager<AIOBatchSchedulerIn>>, } impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStoreAIO<F> { - pub fn new(wal_dir: &str, truncate: bool, recover_func: F) -> Self { + pub fn new( + wal_dir: &str, + truncate: bool, + recover_func: F, + aiomgr: Option<AIOManager<AIOBatchSchedulerIn>>, + ) -> Result<Self, ()> { let recover_func = RefCell::new(recover_func); let rootpath = wal_dir.to_string(); + let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else( + |_: Result<AIOManager<AIOBatchSchedulerIn>, ()>| { + AIOManager::new(new_batch_scheduler(None), 128, None, None) + .or(Err(())) + }, + )?); + if truncate { let _ = std::fs::remove_dir_all(wal_dir); } @@ -163,11 +175,12 @@ impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStoreAIO<F> { Ok(fd) => fd, Err(_) => panic!("error while opening the WAL directory"), }; - WALStoreAIO { + Ok(WALStoreAIO { rootfd, rootpath, recover_func, - } + aiomgr, + }) } } @@ -183,11 +196,7 @@ impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStore _touch: bool, ) -> Result<Box<dyn WALFile>, ()> { let filename = filename.to_string(); - let aiomgr = Rc::new(RefCell::new( - AIOManager::new(new_batch_scheduler(None), 10, None, None) - .or(Err(()))?, - )); - WALFileAIO::new(self.rootfd, &filename, aiomgr.clone()) + WALFileAIO::new(self.rootfd, &filename, self.aiomgr.clone()) .and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>)) } |