From 54ce471c15fda9812d3cd7ef6222701b19892d30 Mon Sep 17 00:00:00 2001 From: Determinant Date: Mon, 12 Oct 2020 20:13:26 -0400 Subject: add assertion for empty record payload; make `load` async func --- src/lib.rs | 100 +++++++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 68 insertions(+), 32 deletions(-) (limited to 'src/lib.rs') diff --git a/src/lib.rs b/src/lib.rs index 2a67cd3..7674adc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,8 @@ //! //! //! // Start with empty WAL (truncate = true). -//! let store = WALStoreAIO::new("./walfiles", true, None).unwrap(); -//! let mut wal = loader.load(store, |_, _| {Ok(())}).unwrap(); +//! let store = WALStoreAIO::new("./walfiles", true, None, None).unwrap(); +//! let mut wal = block_on(loader.load(store, |_, _| {Ok(())})).unwrap(); //! // Write a vector of records to WAL. //! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() { //! let ring_id = block_on(f).unwrap().1; @@ -20,28 +20,28 @@ //! //! //! // Load from WAL (truncate = false). -//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap(); -//! let mut wal = loader.load(store, |payload, ringid| { +//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap(); +//! let mut wal = block_on(loader.load(store, |payload, ringid| { //! // redo the operations in your application //! println!("recover(payload={}, ringid={:?})", //! std::str::from_utf8(&payload).unwrap(), //! ringid); //! Ok(()) -//! }).unwrap(); +//! })).unwrap(); //! // We saw some log playback, even there is no failure. //! // Let's try to grow the WAL to create many files. -//! let ring_ids = wal.grow((0..100).into_iter().map(|i| "a".repeat(i)).collect::>()) +//! let ring_ids = wal.grow((1..100).into_iter().map(|i| "a".repeat(i)).collect::>()) //! .into_iter().map(|f| block_on(f).unwrap().1).collect::>(); //! // Then assume all these records are not longer needed. We can tell WALWriter by the `peel` //! // method. //! block_on(wal.peel(ring_ids)).unwrap(); //! // There will only be one remaining file in ./walfiles. //! -//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap(); -//! let wal = loader.load(store, |payload, _| { +//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap(); +//! let wal = block_on(loader.load(store, |payload, _| { //! println!("payload.len() = {}", payload.len()); //! Ok(()) -//! }).unwrap(); +//! })).unwrap(); //! // After each recovery, the ./walfiles is empty. //! ``` @@ -49,7 +49,6 @@ pub mod wal; use async_trait::async_trait; -use futures::executor::block_on; use libaiofut::{AIOBuilder, AIOManager}; use libc::off_t; use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag}; @@ -116,13 +115,12 @@ impl WALFile for WALFileAIO { }) } - fn read( + async fn read( &self, offset: WALPos, length: usize, ) -> Result, ()> { - let (res, data) = - block_on(self.aiomgr.read(self.fd, offset, length, None)); + let (res, data) = self.aiomgr.read(self.fd, offset, length, None).await; res.or_else(|_| Err(())).and_then(|nread| { Ok(if nread == length { Some(data) } else { None }) }) @@ -131,7 +129,6 @@ impl WALFile for WALFileAIO { pub struct WALStoreAIO { rootfd: RawFd, - rootpath: String, aiomgr: Rc, } @@ -139,9 +136,9 @@ impl WALStoreAIO { pub fn new( wal_dir: &str, truncate: bool, + rootfd: Option, aiomgr: Option, ) -> Result { - let rootpath = wal_dir.to_string(); let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else( |_: Result| { AIOBuilder::default().build().or(Err(())) @@ -151,25 +148,55 @@ impl WALStoreAIO { if truncate { let _ = std::fs::remove_dir_all(wal_dir); } - match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) { - Err(e) => { - if truncate { - panic!("error while creating directory: {}", e) + let walfd; + match rootfd { + None => { + match mkdir( + wal_dir, + Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR, + ) { + Err(e) => { + if truncate { + panic!("error while creating directory: {}", e) + } + } + Ok(_) => (), + } + walfd = match open( + wal_dir, + OFlag::O_DIRECTORY | OFlag::O_PATH, + Mode::empty(), + ) { + Ok(fd) => fd, + Err(_) => panic!("error while opening the WAL directory"), + } + } + Some(fd) => { + let ret = unsafe { + libc::mkdirat( + fd, + std::ffi::CString::new(wal_dir).unwrap().as_ptr(), + libc::S_IRUSR | libc::S_IWUSR | libc::S_IXUSR, + ) + }; + if ret != 0 { + if truncate { + panic!("error while creating directory") + } + } + walfd = match nix::fcntl::openat( + fd, + wal_dir, + OFlag::O_DIRECTORY | OFlag::O_PATH, + Mode::empty(), + ) { + Ok(fd) => fd, + Err(_) => panic!("error while opening the WAL directory"), } } - Ok(_) => (), } - let rootfd = match open( - wal_dir, - OFlag::O_DIRECTORY | OFlag::O_PATH, - Mode::empty(), - ) { - Ok(fd) => fd, - Err(_) => panic!("error while opening the WAL directory"), - }; Ok(WALStoreAIO { - rootfd, - rootpath, + rootfd: walfd, aiomgr, }) } @@ -200,8 +227,17 @@ impl WALStore for WALStoreAIO { fn enumerate_files(&self) -> Result { let mut logfiles = Vec::new(); - for fname in std::fs::read_dir(&self.rootpath).unwrap() { - logfiles.push(fname.unwrap().file_name().into_string().unwrap()) + for ent in nix::dir::Dir::openat( + self.rootfd, + "./", + OFlag::empty(), + Mode::empty(), + ) + .unwrap() + .iter() + { + logfiles + .push(ent.unwrap().file_name().to_str().unwrap().to_string()) } Ok(logfiles.into_iter()) } -- cgit v1.2.3