aboutsummaryrefslogtreecommitdiff
path: root/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs100
1 files changed, 68 insertions, 32 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 2a67cd3..7674adc 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,8 +10,8 @@
//!
//!
//! // Start with empty WAL (truncate = true).
-//! let store = WALStoreAIO::new("./walfiles", true, None).unwrap();
-//! let mut wal = loader.load(store, |_, _| {Ok(())}).unwrap();
+//! let store = WALStoreAIO::new("./walfiles", true, None, None).unwrap();
+//! let mut wal = block_on(loader.load(store, |_, _| {Ok(())})).unwrap();
//! // Write a vector of records to WAL.
//! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() {
//! let ring_id = block_on(f).unwrap().1;
@@ -20,28 +20,28 @@
//!
//!
//! // Load from WAL (truncate = false).
-//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap();
-//! let mut wal = loader.load(store, |payload, ringid| {
+//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap();
+//! let mut wal = block_on(loader.load(store, |payload, ringid| {
//! // redo the operations in your application
//! println!("recover(payload={}, ringid={:?})",
//! std::str::from_utf8(&payload).unwrap(),
//! ringid);
//! Ok(())
-//! }).unwrap();
+//! })).unwrap();
//! // We saw some log playback, even there is no failure.
//! // Let's try to grow the WAL to create many files.
-//! let ring_ids = wal.grow((0..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>())
+//! let ring_ids = wal.grow((1..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>())
//! .into_iter().map(|f| block_on(f).unwrap().1).collect::<Vec<_>>();
//! // Then assume all these records are not longer needed. We can tell WALWriter by the `peel`
//! // method.
//! block_on(wal.peel(ring_ids)).unwrap();
//! // There will only be one remaining file in ./walfiles.
//!
-//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap();
-//! let wal = loader.load(store, |payload, _| {
+//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap();
+//! let wal = block_on(loader.load(store, |payload, _| {
//! println!("payload.len() = {}", payload.len());
//! Ok(())
-//! }).unwrap();
+//! })).unwrap();
//! // After each recovery, the ./walfiles is empty.
//! ```
@@ -49,7 +49,6 @@
pub mod wal;
use async_trait::async_trait;
-use futures::executor::block_on;
use libaiofut::{AIOBuilder, AIOManager};
use libc::off_t;
use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag};
@@ -116,13 +115,12 @@ impl WALFile for WALFileAIO {
})
}
- fn read(
+ async fn read(
&self,
offset: WALPos,
length: usize,
) -> Result<Option<WALBytes>, ()> {
- let (res, data) =
- block_on(self.aiomgr.read(self.fd, offset, length, None));
+ let (res, data) = self.aiomgr.read(self.fd, offset, length, None).await;
res.or_else(|_| Err(())).and_then(|nread| {
Ok(if nread == length { Some(data) } else { None })
})
@@ -131,7 +129,6 @@ impl WALFile for WALFileAIO {
pub struct WALStoreAIO {
rootfd: RawFd,
- rootpath: String,
aiomgr: Rc<AIOManager>,
}
@@ -139,9 +136,9 @@ impl WALStoreAIO {
pub fn new(
wal_dir: &str,
truncate: bool,
+ rootfd: Option<RawFd>,
aiomgr: Option<AIOManager>,
) -> Result<Self, ()> {
- let rootpath = wal_dir.to_string();
let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else(
|_: Result<AIOManager, ()>| {
AIOBuilder::default().build().or(Err(()))
@@ -151,25 +148,55 @@ impl WALStoreAIO {
if truncate {
let _ = std::fs::remove_dir_all(wal_dir);
}
- match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) {
- Err(e) => {
- if truncate {
- panic!("error while creating directory: {}", e)
+ let walfd;
+ match rootfd {
+ None => {
+ match mkdir(
+ wal_dir,
+ Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR,
+ ) {
+ Err(e) => {
+ if truncate {
+ panic!("error while creating directory: {}", e)
+ }
+ }
+ Ok(_) => (),
+ }
+ walfd = match open(
+ wal_dir,
+ OFlag::O_DIRECTORY | OFlag::O_PATH,
+ Mode::empty(),
+ ) {
+ Ok(fd) => fd,
+ Err(_) => panic!("error while opening the WAL directory"),
+ }
+ }
+ Some(fd) => {
+ let ret = unsafe {
+ libc::mkdirat(
+ fd,
+ std::ffi::CString::new(wal_dir).unwrap().as_ptr(),
+ libc::S_IRUSR | libc::S_IWUSR | libc::S_IXUSR,
+ )
+ };
+ if ret != 0 {
+ if truncate {
+ panic!("error while creating directory")
+ }
+ }
+ walfd = match nix::fcntl::openat(
+ fd,
+ wal_dir,
+ OFlag::O_DIRECTORY | OFlag::O_PATH,
+ Mode::empty(),
+ ) {
+ Ok(fd) => fd,
+ Err(_) => panic!("error while opening the WAL directory"),
}
}
- Ok(_) => (),
}
- let rootfd = match open(
- wal_dir,
- OFlag::O_DIRECTORY | OFlag::O_PATH,
- Mode::empty(),
- ) {
- Ok(fd) => fd,
- Err(_) => panic!("error while opening the WAL directory"),
- };
Ok(WALStoreAIO {
- rootfd,
- rootpath,
+ rootfd: walfd,
aiomgr,
})
}
@@ -200,8 +227,17 @@ impl WALStore for WALStoreAIO {
fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
let mut logfiles = Vec::new();
- for fname in std::fs::read_dir(&self.rootpath).unwrap() {
- logfiles.push(fname.unwrap().file_name().into_string().unwrap())
+ for ent in nix::dir::Dir::openat(
+ self.rootfd,
+ "./",
+ OFlag::empty(),
+ Mode::empty(),
+ )
+ .unwrap()
+ .iter()
+ {
+ logfiles
+ .push(ent.unwrap().file_name().to_str().unwrap().to_string())
}
Ok(logfiles.into_iter())
}