aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs100
-rw-r--r--src/wal.rs30
2 files changed, 86 insertions, 44 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 2a67cd3..7674adc 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,8 +10,8 @@
//!
//!
//! // Start with empty WAL (truncate = true).
-//! let store = WALStoreAIO::new("./walfiles", true, None).unwrap();
-//! let mut wal = loader.load(store, |_, _| {Ok(())}).unwrap();
+//! let store = WALStoreAIO::new("./walfiles", true, None, None).unwrap();
+//! let mut wal = block_on(loader.load(store, |_, _| {Ok(())})).unwrap();
//! // Write a vector of records to WAL.
//! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() {
//! let ring_id = block_on(f).unwrap().1;
@@ -20,28 +20,28 @@
//!
//!
//! // Load from WAL (truncate = false).
-//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap();
-//! let mut wal = loader.load(store, |payload, ringid| {
+//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap();
+//! let mut wal = block_on(loader.load(store, |payload, ringid| {
//! // redo the operations in your application
//! println!("recover(payload={}, ringid={:?})",
//! std::str::from_utf8(&payload).unwrap(),
//! ringid);
//! Ok(())
-//! }).unwrap();
+//! })).unwrap();
//! // We saw some log playback, even there is no failure.
//! // Let's try to grow the WAL to create many files.
-//! let ring_ids = wal.grow((0..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>())
+//! let ring_ids = wal.grow((1..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>())
//! .into_iter().map(|f| block_on(f).unwrap().1).collect::<Vec<_>>();
//! // Then assume all these records are not longer needed. We can tell WALWriter by the `peel`
//! // method.
//! block_on(wal.peel(ring_ids)).unwrap();
//! // There will only be one remaining file in ./walfiles.
//!
-//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap();
-//! let wal = loader.load(store, |payload, _| {
+//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap();
+//! let wal = block_on(loader.load(store, |payload, _| {
//! println!("payload.len() = {}", payload.len());
//! Ok(())
-//! }).unwrap();
+//! })).unwrap();
//! // After each recovery, the ./walfiles is empty.
//! ```
@@ -49,7 +49,6 @@
pub mod wal;
use async_trait::async_trait;
-use futures::executor::block_on;
use libaiofut::{AIOBuilder, AIOManager};
use libc::off_t;
use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag};
@@ -116,13 +115,12 @@ impl WALFile for WALFileAIO {
})
}
- fn read(
+ async fn read(
&self,
offset: WALPos,
length: usize,
) -> Result<Option<WALBytes>, ()> {
- let (res, data) =
- block_on(self.aiomgr.read(self.fd, offset, length, None));
+ let (res, data) = self.aiomgr.read(self.fd, offset, length, None).await;
res.or_else(|_| Err(())).and_then(|nread| {
Ok(if nread == length { Some(data) } else { None })
})
@@ -131,7 +129,6 @@ impl WALFile for WALFileAIO {
pub struct WALStoreAIO {
rootfd: RawFd,
- rootpath: String,
aiomgr: Rc<AIOManager>,
}
@@ -139,9 +136,9 @@ impl WALStoreAIO {
pub fn new(
wal_dir: &str,
truncate: bool,
+ rootfd: Option<RawFd>,
aiomgr: Option<AIOManager>,
) -> Result<Self, ()> {
- let rootpath = wal_dir.to_string();
let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else(
|_: Result<AIOManager, ()>| {
AIOBuilder::default().build().or(Err(()))
@@ -151,25 +148,55 @@ impl WALStoreAIO {
if truncate {
let _ = std::fs::remove_dir_all(wal_dir);
}
- match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) {
- Err(e) => {
- if truncate {
- panic!("error while creating directory: {}", e)
+ let walfd;
+ match rootfd {
+ None => {
+ match mkdir(
+ wal_dir,
+ Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR,
+ ) {
+ Err(e) => {
+ if truncate {
+ panic!("error while creating directory: {}", e)
+ }
+ }
+ Ok(_) => (),
+ }
+ walfd = match open(
+ wal_dir,
+ OFlag::O_DIRECTORY | OFlag::O_PATH,
+ Mode::empty(),
+ ) {
+ Ok(fd) => fd,
+ Err(_) => panic!("error while opening the WAL directory"),
+ }
+ }
+ Some(fd) => {
+ let ret = unsafe {
+ libc::mkdirat(
+ fd,
+ std::ffi::CString::new(wal_dir).unwrap().as_ptr(),
+ libc::S_IRUSR | libc::S_IWUSR | libc::S_IXUSR,
+ )
+ };
+ if ret != 0 {
+ if truncate {
+ panic!("error while creating directory")
+ }
+ }
+ walfd = match nix::fcntl::openat(
+ fd,
+ wal_dir,
+ OFlag::O_DIRECTORY | OFlag::O_PATH,
+ Mode::empty(),
+ ) {
+ Ok(fd) => fd,
+ Err(_) => panic!("error while opening the WAL directory"),
}
}
- Ok(_) => (),
}
- let rootfd = match open(
- wal_dir,
- OFlag::O_DIRECTORY | OFlag::O_PATH,
- Mode::empty(),
- ) {
- Ok(fd) => fd,
- Err(_) => panic!("error while opening the WAL directory"),
- };
Ok(WALStoreAIO {
- rootfd,
- rootpath,
+ rootfd: walfd,
aiomgr,
})
}
@@ -200,8 +227,17 @@ impl WALStore for WALStoreAIO {
fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
let mut logfiles = Vec::new();
- for fname in std::fs::read_dir(&self.rootpath).unwrap() {
- logfiles.push(fname.unwrap().file_name().into_string().unwrap())
+ for ent in nix::dir::Dir::openat(
+ self.rootfd,
+ "./",
+ OFlag::empty(),
+ Mode::empty(),
+ )
+ .unwrap()
+ .iter()
+ {
+ logfiles
+ .push(ent.unwrap().file_name().to_str().unwrap().to_string())
}
Ok(logfiles.into_iter())
}
diff --git a/src/wal.rs b/src/wal.rs
index deae98d..7975f38 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,5 +1,4 @@
use async_trait::async_trait;
-use futures::executor::block_on;
use futures::future::{self, FutureExt, TryFutureExt};
use std::cell::{RefCell, UnsafeCell};
use std::collections::{hash_map, BinaryHeap, HashMap};
@@ -106,7 +105,7 @@ pub trait WALFile {
/// all or nothing).
async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
/// Read data with offset. Return `Ok(None)` when it reaches EOF.
- fn read(
+ async fn read(
&self,
offset: WALPos,
length: usize,
@@ -408,7 +407,8 @@ impl<F: WALStore> WALWriter<F> {
/// corresponds to one record. When a future resolves to `WALRingId`, it is guaranteed the
/// record is already logged. Then, after finalizing the changes encoded by that record to
/// the persistent storage, the caller can recycle the WAL files by invoking the given
- /// `peel` with the given `WALRingId`s.
+ /// `peel` with the given `WALRingId`s. Note: each serialized record should contain at least 1
+ /// byte (empty record payload will result in assertion failure).
pub fn grow<'a, R: Record + 'a>(
&'a mut self,
records: Vec<R>,
@@ -427,6 +427,7 @@ impl<F: WALStore> WALWriter<F> {
let mut rec = &bytes[..];
let mut rsize = rec.len() as u32;
let mut ring_start = None;
+ assert!(rsize > 0);
while rsize > 0 {
let remain = self.block_size - bbuff_cur;
if remain > msize {
@@ -647,7 +648,10 @@ impl WALLoader {
}
/// Recover by reading the WAL files.
- pub fn load<S: WALStore, F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>(
+ pub async fn load<
+ S: WALStore,
+ F: FnMut(WALBytes, WALRingId) -> Result<(), ()>,
+ >(
&self,
store: S,
mut recover_func: F,
@@ -674,14 +678,14 @@ impl WALLoader {
let mut skip = false;
for fname in logfiles.into_iter() {
let fid = file_pool.get_fid(&fname);
- let f = block_on(file_pool.get_file(fid, false))?;
+ let f = file_pool.get_file(fid, false).await?;
let mut off = 0;
if skip {
f.truncate(0)?;
- block_on(file_pool.store.remove_file(fname))?;
+ file_pool.store.remove_file(fname).await?;
continue;
}
- while let Some(header_raw) = f.read(off, msize as usize)? {
+ while let Some(header_raw) = f.read(off, msize as usize).await? {
let ringid_start = (fid << file_pool.file_nbit) + off;
off += msize as u64;
let header = unsafe {
@@ -693,7 +697,8 @@ impl WALLoader {
match header.rtype {
WALRingType::Full => {
assert!(chunks.is_none());
- let payload = f.read(off, rsize as usize)?.ok_or(())?;
+ let payload =
+ f.read(off, rsize as usize).await?.ok_or(())?;
// TODO: improve the behavior when CRC32 fails
if !self.verify_checksum(&payload, header.crc32)? {
skip = true;
@@ -710,7 +715,8 @@ impl WALLoader {
}
WALRingType::First => {
assert!(chunks.is_none());
- let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ let chunk =
+ f.read(off, rsize as usize).await?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
break;
@@ -721,7 +727,7 @@ impl WALLoader {
WALRingType::Middle => {
if let Some((chunks, _)) = &mut chunks {
let chunk =
- f.read(off, rsize as usize)?.ok_or(())?;
+ f.read(off, rsize as usize).await?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
break;
@@ -734,7 +740,7 @@ impl WALLoader {
if let Some((mut chunks, ringid_start)) = chunks.take()
{
let chunk =
- f.read(off, rsize as usize)?.ok_or(())?;
+ f.read(off, rsize as usize).await?.ok_or(())?;
off += rsize as u64;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
@@ -772,7 +778,7 @@ impl WALLoader {
}
}
f.truncate(0)?;
- block_on(file_pool.store.remove_file(fname))?;
+ file_pool.store.remove_file(fname).await?;
}
file_pool.reset();
Ok(WALWriter::new(