aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock6
-rw-r--r--Cargo.toml10
-rw-r--r--examples/demo1.rs6
-rw-r--r--src/lib.rs39
-rw-r--r--tests/common/mod.rs2
5 files changed, 36 insertions, 27 deletions
diff --git a/Cargo.lock b/Cargo.lock
index dbe9b6b..96962a3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -232,7 +232,7 @@ dependencies = [
[[package]]
name = "growth-ring"
-version = "0.1.1"
+version = "0.1.2"
dependencies = [
"async-trait",
"crc",
@@ -281,9 +281,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libaio-futures"
-version = "0.1.1"
+version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "049dd6ea8958e5c868151b08c68fa771daa979304ab3de8f5bab58126ba53e3b"
+checksum = "e831955e07aa19f495c993327d647d6a49bc9e5d6856238c00fd1833a1a06eee"
dependencies = [
"crossbeam-channel",
"libc",
diff --git a/Cargo.toml b/Cargo.toml
index a30512d..a8a82ef 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "growth-ring"
-version = "0.1.1"
+version = "0.1.2"
authors = ["Determinant <[email protected]>"]
edition = "2018"
homepage = "https://github.com/Determinant/growth-ring"
@@ -14,10 +14,10 @@ description = "Simple and modular write-ahead-logging implementation."
crc = "1.8.1"
lru = "0.5.1"
scan_fmt = "0.2.5"
-regex = "1"
-async-trait = "0.1"
-futures = "0.3"
-libaio-futures = "0.1.1"
+regex = "1.3.9"
+async-trait = "0.1.35"
+futures = "0.3.5"
+libaio-futures = "0.1.2"
nix = "0.17.0"
libc = "0.2.71"
diff --git a/examples/demo1.rs b/examples/demo1.rs
index e214177..d51801d 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -32,7 +32,7 @@ fn main() {
let mut loader = WALLoader::new();
loader.file_nbit(9).block_nbit(8);
- let store = WALStoreAIO::new(&wal_dir, true, recover);
+ let store = WALStoreAIO::new(&wal_dir, true, recover, None).unwrap();
let mut wal = loader.load(store).unwrap();
for _ in 0..3 {
test(
@@ -50,7 +50,7 @@ fn main() {
);
}
- let store = WALStoreAIO::new(&wal_dir, false, recover);
+ let store = WALStoreAIO::new(&wal_dir, false, recover, None).unwrap();
let mut wal = loader.load(store).unwrap();
for _ in 0..3 {
test(
@@ -64,7 +64,7 @@ fn main() {
);
}
- let store = WALStoreAIO::new(&wal_dir, false, recover);
+ let store = WALStoreAIO::new(&wal_dir, false, recover, None).unwrap();
let mut wal = loader.load(store).unwrap();
for _ in 0..3 {
let mut ids = Vec::new();
diff --git a/src/lib.rs b/src/lib.rs
index 447da08..f7b3f73 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,7 +10,7 @@
//!
//!
//! // Start with empty WAL (truncate = true).
-//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())});
+//! let store = WALStoreAIO::new("./walfiles", true, |_, _| {Ok(())}, None).unwrap();
//! let mut wal = loader.load(store).unwrap();
//! // Write a vector of records to WAL.
//! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() {
@@ -26,7 +26,7 @@
//! std::str::from_utf8(&payload).unwrap(),
//! ringid);
//! Ok(())
-//! });
+//! }, None).unwrap();
//! let mut wal = loader.load(store).unwrap();
//! // We saw some log playback, even there is no failure.
//! // Let's try to grow the WAL to create many files.
@@ -40,7 +40,7 @@
//! let store = WALStoreAIO::new("./walfiles", false, |payload, _| {
//! println!("payload.len() = {}", payload.len());
//! Ok(())
-//! });
+//! }, None).unwrap();
//! let wal = loader.load(store).unwrap();
//! // After each recovery, the ./walfiles is empty.
//! ```
@@ -62,14 +62,14 @@ use wal::{WALBytes, WALFile, WALPos, WALRingId, WALStore};
pub struct WALFileAIO {
fd: RawFd,
- aiomgr: Rc<RefCell<AIOManager<AIOBatchSchedulerIn>>>,
+ aiomgr: Rc<AIOManager<AIOBatchSchedulerIn>>,
}
impl WALFileAIO {
pub fn new(
rootfd: RawFd,
filename: &str,
- aiomgr: Rc<RefCell<AIOManager<AIOBatchSchedulerIn>>>,
+ aiomgr: Rc<AIOManager<AIOBatchSchedulerIn>>,
) -> Result<Self, ()> {
openat(
rootfd,
@@ -108,7 +108,6 @@ impl WALFile for WALFileAIO {
async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
self.aiomgr
- .borrow_mut()
.write(self.fd, offset, data, None)
.await
.or_else(|_| Err(()))
@@ -126,7 +125,7 @@ impl WALFile for WALFileAIO {
offset: WALPos,
length: usize,
) -> Result<Option<WALBytes>, ()> {
- block_on(self.aiomgr.borrow_mut().read(self.fd, offset, length, None))
+ block_on(self.aiomgr.read(self.fd, offset, length, None))
.or_else(|_| Err(()))
.and_then(|(nread, data)| {
Ok(if nread == length { Some(data) } else { None })
@@ -138,12 +137,25 @@ pub struct WALStoreAIO<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> {
rootfd: RawFd,
rootpath: String,
recover_func: RefCell<F>,
+ aiomgr: Rc<AIOManager<AIOBatchSchedulerIn>>,
}
impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStoreAIO<F> {
- pub fn new(wal_dir: &str, truncate: bool, recover_func: F) -> Self {
+ pub fn new(
+ wal_dir: &str,
+ truncate: bool,
+ recover_func: F,
+ aiomgr: Option<AIOManager<AIOBatchSchedulerIn>>,
+ ) -> Result<Self, ()> {
let recover_func = RefCell::new(recover_func);
let rootpath = wal_dir.to_string();
+ let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else(
+ |_: Result<AIOManager<AIOBatchSchedulerIn>, ()>| {
+ AIOManager::new(new_batch_scheduler(None), 128, None, None)
+ .or(Err(()))
+ },
+ )?);
+
if truncate {
let _ = std::fs::remove_dir_all(wal_dir);
}
@@ -163,11 +175,12 @@ impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStoreAIO<F> {
Ok(fd) => fd,
Err(_) => panic!("error while opening the WAL directory"),
};
- WALStoreAIO {
+ Ok(WALStoreAIO {
rootfd,
rootpath,
recover_func,
- }
+ aiomgr,
+ })
}
}
@@ -183,11 +196,7 @@ impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStore
_touch: bool,
) -> Result<Box<dyn WALFile>, ()> {
let filename = filename.to_string();
- let aiomgr = Rc::new(RefCell::new(
- AIOManager::new(new_batch_scheduler(None), 10, None, None)
- .or(Err(()))?,
- ));
- WALFileAIO::new(self.rootfd, &filename, aiomgr.clone())
+ WALFileAIO::new(self.rootfd, &filename, self.aiomgr.clone())
.and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>))
}
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index 247b2ee..769f422 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -3,7 +3,7 @@
#[allow(dead_code)]
use async_trait::async_trait;
use growthring::wal::{
- WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, RecoverPolicy
+ WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore
};
use indexmap::{map::Entry, IndexMap};
use rand::Rng;