aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2020-10-12 20:13:26 -0400
committerDeterminant <tederminant@gmail.com>2020-10-12 20:13:26 -0400
commit54ce471c15fda9812d3cd7ef6222701b19892d30 (patch)
tree05f071fbe947242495cba6df84cbb5c32d9b024d
parent395bd19d51c5f5e0bfd3b5897ddce5f6bef5ec79 (diff)
add assertion for empty record payload; make `load` async funcv0.1.6
-rw-r--r--Cargo.lock125
-rw-r--r--Cargo.toml16
-rw-r--r--examples/demo1.rs13
-rw-r--r--src/lib.rs100
-rw-r--r--src/wal.rs30
-rw-r--r--tests/common/mod.rs11
6 files changed, 155 insertions, 140 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 4a278a9..88ae519 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2,12 +2,9 @@
# It is not intended for manual editing.
[[package]]
name = "ahash"
-version = "0.2.18"
+version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6f33b5018f120946c1dcf279194f238a9f146725593ead1c08fa47ff22b0b5d3"
-dependencies = [
- "const-random",
-]
+checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217"
[[package]]
name = "aho-corasick"
@@ -20,9 +17,9 @@ dependencies = [
[[package]]
name = "async-trait"
-version = "0.1.35"
+version = "0.1.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "89cb5d814ab2a47fd66d3266e9efccb53ca4c740b7451043b8ffcf9a6208f3f8"
+checksum = "b246867b8b3b6ae56035f1eb1ed557c1d8eae97f0d53696138a50fa0e3a3b8c0"
dependencies = [
"proc-macro2",
"quote",
@@ -31,12 +28,6 @@ dependencies = [
[[package]]
name = "autocfg"
-version = "0.1.7"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2"
-
-[[package]]
-name = "autocfg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
@@ -75,26 +66,6 @@ dependencies = [
]
[[package]]
-name = "const-random"
-version = "0.1.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
-dependencies = [
- "const-random-macro",
- "proc-macro-hack",
-]
-
-[[package]]
-name = "const-random-macro"
-version = "0.1.8"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
-dependencies = [
- "getrandom",
- "proc-macro-hack",
-]
-
-[[package]]
name = "crc"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -119,16 +90,16 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
dependencies = [
- "autocfg 1.0.0",
+ "autocfg",
"cfg-if",
"lazy_static",
]
[[package]]
name = "futures"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613"
+checksum = "5d8e3078b7b2a8a671cb7a3d17b4760e4181ea243227776ba83fd043b4ca034e"
dependencies = [
"futures-channel",
"futures-core",
@@ -141,9 +112,9 @@ dependencies = [
[[package]]
name = "futures-channel"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
+checksum = "a7a4d35f7401e948629c9c3d6638fb9bf94e0b2121e96c3b428cc4e631f3eb74"
dependencies = [
"futures-core",
"futures-sink",
@@ -151,15 +122,15 @@ dependencies = [
[[package]]
name = "futures-core"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
+checksum = "d674eaa0056896d5ada519900dbf97ead2e46a7b6621e8160d79e2f2e1e2784b"
[[package]]
name = "futures-executor"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314"
+checksum = "cc709ca1da6f66143b8c9bec8e6260181869893714e9b5a490b169b0414144ab"
dependencies = [
"futures-core",
"futures-task",
@@ -168,15 +139,15 @@ dependencies = [
[[package]]
name = "futures-io"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789"
+checksum = "5fc94b64bb39543b4e432f1790b6bf18e3ee3b74653c5449f63310e9a74b123c"
[[package]]
name = "futures-macro"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39"
+checksum = "f57ed14da4603b2554682e9f2ff3c65d7567b53188db96cb71538217fc64581b"
dependencies = [
"proc-macro-hack",
"proc-macro2",
@@ -186,24 +157,24 @@ dependencies = [
[[package]]
name = "futures-sink"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc"
+checksum = "0d8764258ed64ebc5d9ed185cf86a95db5cac810269c5d20ececb32e0088abbd"
[[package]]
name = "futures-task"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626"
+checksum = "4dd26820a9f3637f1302da8bceba3ff33adbe53464b54ca24d4e2d4f1db30f94"
dependencies = [
"once_cell",
]
[[package]]
name = "futures-util"
-version = "0.3.5"
+version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
+checksum = "8a894a0acddba51a2d49a6f4263b1e64b8c579ece8af50fa86503d52cd1eea34"
dependencies = [
"futures-channel",
"futures-core",
@@ -232,7 +203,7 @@ dependencies = [
[[package]]
name = "growth-ring"
-version = "0.1.5"
+version = "0.1.6"
dependencies = [
"async-trait",
"crc",
@@ -250,15 +221,21 @@ dependencies = [
[[package]]
name = "hashbrown"
-version = "0.6.3"
+version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8e6073d0ca812575946eb5f35ff68dbe519907b25c42530389ff946dc84c6ead"
+checksum = "e91b62f79061a0bc2e046024cb7ba44b08419ed238ecbd9adbd787434b9e8c25"
dependencies = [
"ahash",
- "autocfg 0.1.7",
+ "autocfg",
]
[[package]]
+name = "hashbrown"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
+
+[[package]]
name = "hex"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -266,11 +243,12 @@ checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35"
[[package]]
name = "indexmap"
-version = "1.4.0"
+version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c398b2b113b55809ceb9ee3e753fcbac793f1956663f3c36549c1346015c2afe"
+checksum = "55e2e4c765aa53a0424761bf9f41aa7a6ac1efa87238f59560640e27fca028f2"
dependencies = [
- "autocfg 1.0.0",
+ "autocfg",
+ "hashbrown 0.9.1",
]
[[package]]
@@ -292,9 +270,9 @@ dependencies = [
[[package]]
name = "libc"
-version = "0.2.71"
+version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49"
+checksum = "2448f6066e80e3bfc792e9c98bf705b4b0fc6e8ef5b43e5889aff0eaa9c58743"
[[package]]
name = "lock_api"
@@ -307,11 +285,11 @@ dependencies = [
[[package]]
name = "lru"
-version = "0.5.2"
+version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "297efb9401445cf7b6986a583d7ac194023334b46b294ff7da0d36662c1251c2"
+checksum = "111b945ac72ec09eb7bc62a0fbdc3cc6e80555a7245f52a69d3921a75b53b153"
dependencies = [
- "hashbrown",
+ "hashbrown 0.8.2",
]
[[package]]
@@ -328,15 +306,14 @@ checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
[[package]]
name = "nix"
-version = "0.17.0"
+version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "50e4785f2c3b7589a0d0c1dd60285e1188adac4006e8abd6dd578e1567027363"
+checksum = "83450fe6a6142ddd95fb064b746083fc4ef1705fe81f64a64e1d4b39f54a1055"
dependencies = [
"bitflags",
"cc",
"cfg-if",
"libc",
- "void",
]
[[package]]
@@ -480,9 +457,9 @@ checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84"
[[package]]
name = "regex"
-version = "1.3.9"
+version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6"
+checksum = "36f45b719a674bf4b828ff318906d6c133264c793eff7a41e30074a45b5099e2"
dependencies = [
"aho-corasick",
"memchr",
@@ -492,9 +469,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
-version = "0.6.18"
+version = "0.6.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "26412eb97c6b088a6997e05f69403a802a92d520de2f8e63c2b65f9e0f47c4e8"
+checksum = "c17be88d9eaa858870aa5e48cc406c206e4600e983fc4f06bbe5750d93d09761"
[[package]]
name = "scan_fmt"
@@ -550,12 +527,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
[[package]]
-name = "void"
-version = "1.0.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
-
-[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 22b3232..d5fa31c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "growth-ring"
-version = "0.1.5"
+version = "0.1.6"
authors = ["Determinant <tederminant@gmail.com>"]
edition = "2018"
homepage = "https://github.com/Determinant/growth-ring"
@@ -12,19 +12,19 @@ description = "Simple and modular write-ahead-logging implementation."
[dependencies]
crc = "1.8.1"
-lru = "0.5.2"
+lru = "0.6.0"
scan_fmt = "0.2.5"
-regex = "1.3.9"
-async-trait = "0.1.35"
-futures = "0.3.5"
+regex = "1.4.0"
+async-trait = "0.1.41"
+futures = "0.3.6"
libaio-futures = "0.1.7"
-nix = "0.17.0"
-libc = "0.2.71"
+nix = "0.18.0"
+libc = "0.2.79"
[dev-dependencies]
hex = "0.4.2"
rand = "0.7.3"
-indexmap = "1.4.0"
+indexmap = "1.6.0"
[lib]
name = "growthring"
diff --git a/examples/demo1.rs b/examples/demo1.rs
index 6f029ab..31a9b7d 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -1,3 +1,4 @@
+use futures::executor::block_on;
use growthring::{
wal::{WALBytes, WALRingId, WALLoader, WALWriter},
WALStoreAIO,
@@ -32,8 +33,8 @@ fn main() {
let mut loader = WALLoader::new();
loader.file_nbit(9).block_nbit(8);
- let store = WALStoreAIO::new(&wal_dir, true, None).unwrap();
- let mut wal = loader.load(store, recover).unwrap();
+ let store = WALStoreAIO::new(&wal_dir, true, None, None).unwrap();
+ let mut wal = block_on(loader.load(store, recover)).unwrap();
for _ in 0..3 {
test(
["hi", "hello", "lol"]
@@ -50,8 +51,8 @@ fn main() {
);
}
- let store = WALStoreAIO::new(&wal_dir, false, None).unwrap();
- let mut wal = loader.load(store, recover).unwrap();
+ let store = WALStoreAIO::new(&wal_dir, false, None, None).unwrap();
+ let mut wal = block_on(loader.load(store, recover)).unwrap();
for _ in 0..3 {
test(
vec![
@@ -64,8 +65,8 @@ fn main() {
);
}
- let store = WALStoreAIO::new(&wal_dir, false, None).unwrap();
- let mut wal = loader.load(store, recover).unwrap();
+ let store = WALStoreAIO::new(&wal_dir, false, None, None).unwrap();
+ let mut wal = block_on(loader.load(store, recover)).unwrap();
for _ in 0..3 {
let mut ids = Vec::new();
for _ in 0..3 {
diff --git a/src/lib.rs b/src/lib.rs
index 2a67cd3..7674adc 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -10,8 +10,8 @@
//!
//!
//! // Start with empty WAL (truncate = true).
-//! let store = WALStoreAIO::new("./walfiles", true, None).unwrap();
-//! let mut wal = loader.load(store, |_, _| {Ok(())}).unwrap();
+//! let store = WALStoreAIO::new("./walfiles", true, None, None).unwrap();
+//! let mut wal = block_on(loader.load(store, |_, _| {Ok(())})).unwrap();
//! // Write a vector of records to WAL.
//! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() {
//! let ring_id = block_on(f).unwrap().1;
@@ -20,28 +20,28 @@
//!
//!
//! // Load from WAL (truncate = false).
-//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap();
-//! let mut wal = loader.load(store, |payload, ringid| {
+//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap();
+//! let mut wal = block_on(loader.load(store, |payload, ringid| {
//! // redo the operations in your application
//! println!("recover(payload={}, ringid={:?})",
//! std::str::from_utf8(&payload).unwrap(),
//! ringid);
//! Ok(())
-//! }).unwrap();
+//! })).unwrap();
//! // We saw some log playback, even there is no failure.
//! // Let's try to grow the WAL to create many files.
-//! let ring_ids = wal.grow((0..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>())
+//! let ring_ids = wal.grow((1..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>())
//! .into_iter().map(|f| block_on(f).unwrap().1).collect::<Vec<_>>();
//! // Then assume all these records are not longer needed. We can tell WALWriter by the `peel`
//! // method.
//! block_on(wal.peel(ring_ids)).unwrap();
//! // There will only be one remaining file in ./walfiles.
//!
-//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap();
-//! let wal = loader.load(store, |payload, _| {
+//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap();
+//! let wal = block_on(loader.load(store, |payload, _| {
//! println!("payload.len() = {}", payload.len());
//! Ok(())
-//! }).unwrap();
+//! })).unwrap();
//! // After each recovery, the ./walfiles is empty.
//! ```
@@ -49,7 +49,6 @@
pub mod wal;
use async_trait::async_trait;
-use futures::executor::block_on;
use libaiofut::{AIOBuilder, AIOManager};
use libc::off_t;
use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag};
@@ -116,13 +115,12 @@ impl WALFile for WALFileAIO {
})
}
- fn read(
+ async fn read(
&self,
offset: WALPos,
length: usize,
) -> Result<Option<WALBytes>, ()> {
- let (res, data) =
- block_on(self.aiomgr.read(self.fd, offset, length, None));
+ let (res, data) = self.aiomgr.read(self.fd, offset, length, None).await;
res.or_else(|_| Err(())).and_then(|nread| {
Ok(if nread == length { Some(data) } else { None })
})
@@ -131,7 +129,6 @@ impl WALFile for WALFileAIO {
pub struct WALStoreAIO {
rootfd: RawFd,
- rootpath: String,
aiomgr: Rc<AIOManager>,
}
@@ -139,9 +136,9 @@ impl WALStoreAIO {
pub fn new(
wal_dir: &str,
truncate: bool,
+ rootfd: Option<RawFd>,
aiomgr: Option<AIOManager>,
) -> Result<Self, ()> {
- let rootpath = wal_dir.to_string();
let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else(
|_: Result<AIOManager, ()>| {
AIOBuilder::default().build().or(Err(()))
@@ -151,25 +148,55 @@ impl WALStoreAIO {
if truncate {
let _ = std::fs::remove_dir_all(wal_dir);
}
- match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) {
- Err(e) => {
- if truncate {
- panic!("error while creating directory: {}", e)
+ let walfd;
+ match rootfd {
+ None => {
+ match mkdir(
+ wal_dir,
+ Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR,
+ ) {
+ Err(e) => {
+ if truncate {
+ panic!("error while creating directory: {}", e)
+ }
+ }
+ Ok(_) => (),
+ }
+ walfd = match open(
+ wal_dir,
+ OFlag::O_DIRECTORY | OFlag::O_PATH,
+ Mode::empty(),
+ ) {
+ Ok(fd) => fd,
+ Err(_) => panic!("error while opening the WAL directory"),
+ }
+ }
+ Some(fd) => {
+ let ret = unsafe {
+ libc::mkdirat(
+ fd,
+ std::ffi::CString::new(wal_dir).unwrap().as_ptr(),
+ libc::S_IRUSR | libc::S_IWUSR | libc::S_IXUSR,
+ )
+ };
+ if ret != 0 {
+ if truncate {
+ panic!("error while creating directory")
+ }
+ }
+ walfd = match nix::fcntl::openat(
+ fd,
+ wal_dir,
+ OFlag::O_DIRECTORY | OFlag::O_PATH,
+ Mode::empty(),
+ ) {
+ Ok(fd) => fd,
+ Err(_) => panic!("error while opening the WAL directory"),
}
}
- Ok(_) => (),
}
- let rootfd = match open(
- wal_dir,
- OFlag::O_DIRECTORY | OFlag::O_PATH,
- Mode::empty(),
- ) {
- Ok(fd) => fd,
- Err(_) => panic!("error while opening the WAL directory"),
- };
Ok(WALStoreAIO {
- rootfd,
- rootpath,
+ rootfd: walfd,
aiomgr,
})
}
@@ -200,8 +227,17 @@ impl WALStore for WALStoreAIO {
fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
let mut logfiles = Vec::new();
- for fname in std::fs::read_dir(&self.rootpath).unwrap() {
- logfiles.push(fname.unwrap().file_name().into_string().unwrap())
+ for ent in nix::dir::Dir::openat(
+ self.rootfd,
+ "./",
+ OFlag::empty(),
+ Mode::empty(),
+ )
+ .unwrap()
+ .iter()
+ {
+ logfiles
+ .push(ent.unwrap().file_name().to_str().unwrap().to_string())
}
Ok(logfiles.into_iter())
}
diff --git a/src/wal.rs b/src/wal.rs
index deae98d..7975f38 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,5 +1,4 @@
use async_trait::async_trait;
-use futures::executor::block_on;
use futures::future::{self, FutureExt, TryFutureExt};
use std::cell::{RefCell, UnsafeCell};
use std::collections::{hash_map, BinaryHeap, HashMap};
@@ -106,7 +105,7 @@ pub trait WALFile {
/// all or nothing).
async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
/// Read data with offset. Return `Ok(None)` when it reaches EOF.
- fn read(
+ async fn read(
&self,
offset: WALPos,
length: usize,
@@ -408,7 +407,8 @@ impl<F: WALStore> WALWriter<F> {
/// corresponds to one record. When a future resolves to `WALRingId`, it is guaranteed the
/// record is already logged. Then, after finalizing the changes encoded by that record to
/// the persistent storage, the caller can recycle the WAL files by invoking the given
- /// `peel` with the given `WALRingId`s.
+ /// `peel` with the given `WALRingId`s. Note: each serialized record should contain at least 1
+ /// byte (empty record payload will result in assertion failure).
pub fn grow<'a, R: Record + 'a>(
&'a mut self,
records: Vec<R>,
@@ -427,6 +427,7 @@ impl<F: WALStore> WALWriter<F> {
let mut rec = &bytes[..];
let mut rsize = rec.len() as u32;
let mut ring_start = None;
+ assert!(rsize > 0);
while rsize > 0 {
let remain = self.block_size - bbuff_cur;
if remain > msize {
@@ -647,7 +648,10 @@ impl WALLoader {
}
/// Recover by reading the WAL files.
- pub fn load<S: WALStore, F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>(
+ pub async fn load<
+ S: WALStore,
+ F: FnMut(WALBytes, WALRingId) -> Result<(), ()>,
+ >(
&self,
store: S,
mut recover_func: F,
@@ -674,14 +678,14 @@ impl WALLoader {
let mut skip = false;
for fname in logfiles.into_iter() {
let fid = file_pool.get_fid(&fname);
- let f = block_on(file_pool.get_file(fid, false))?;
+ let f = file_pool.get_file(fid, false).await?;
let mut off = 0;
if skip {
f.truncate(0)?;
- block_on(file_pool.store.remove_file(fname))?;
+ file_pool.store.remove_file(fname).await?;
continue;
}
- while let Some(header_raw) = f.read(off, msize as usize)? {
+ while let Some(header_raw) = f.read(off, msize as usize).await? {
let ringid_start = (fid << file_pool.file_nbit) + off;
off += msize as u64;
let header = unsafe {
@@ -693,7 +697,8 @@ impl WALLoader {
match header.rtype {
WALRingType::Full => {
assert!(chunks.is_none());
- let payload = f.read(off, rsize as usize)?.ok_or(())?;
+ let payload =
+ f.read(off, rsize as usize).await?.ok_or(())?;
// TODO: improve the behavior when CRC32 fails
if !self.verify_checksum(&payload, header.crc32)? {
skip = true;
@@ -710,7 +715,8 @@ impl WALLoader {
}
WALRingType::First => {
assert!(chunks.is_none());
- let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ let chunk =
+ f.read(off, rsize as usize).await?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
break;
@@ -721,7 +727,7 @@ impl WALLoader {
WALRingType::Middle => {
if let Some((chunks, _)) = &mut chunks {
let chunk =
- f.read(off, rsize as usize)?.ok_or(())?;
+ f.read(off, rsize as usize).await?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
break;
@@ -734,7 +740,7 @@ impl WALLoader {
if let Some((mut chunks, ringid_start)) = chunks.take()
{
let chunk =
- f.read(off, rsize as usize)?.ok_or(())?;
+ f.read(off, rsize as usize).await?.ok_or(())?;
off += rsize as u64;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
@@ -772,7 +778,7 @@ impl WALLoader {
}
}
f.truncate(0)?;
- block_on(file_pool.store.remove_file(fname))?;
+ file_pool.store.remove_file(fname).await?;
}
file_pool.reset();
Ok(WALWriter::new(
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index 9ec9e8e..fa6c343 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -1,6 +1,7 @@
#[cfg(test)]
#[allow(dead_code)]
use async_trait::async_trait;
+use futures::executor::block_on;
use growthring::wal::{
WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore,
};
@@ -71,7 +72,7 @@ impl<G: FailGen> WALFile for WALFileEmul<G> {
Ok(())
}
- fn read(
+ async fn read(
&self,
offset: WALPos,
length: usize,
@@ -513,13 +514,13 @@ impl PaintingSim {
let mut rng =
<rand::rngs::StdRng as rand::SeedableRng>::seed_from_u64(self.seed);
let mut wal =
- loader.load(WALStoreEmul::new(state, fgen.clone()), |_, _| {
+ block_on(loader.load(WALStoreEmul::new(state, fgen.clone()), |_, _| {
if fgen.next_fail() {
Err(())
} else {
Ok(())
}
- })?;
+ }))?;
for _ in 0..self.n {
let pss = (0..self.m)
.map(|_| {
@@ -616,7 +617,7 @@ impl PaintingSim {
let mut last_idx = 0;
let mut napplied = 0;
canvas.clear_queued();
- wal.load(
+ block_on(wal.load(
WALStoreEmul::new(state, Rc::new(ZeroFailGen)),
|payload, ringid| {
let s = PaintStrokes::from_bytes(&payload);
@@ -625,7 +626,7 @@ impl PaintingSim {
napplied += 1;
Ok(())
},
- )
+ ))
.unwrap();
println!("last = {}/{}, applied = {}", last_idx, ops.len(), napplied);
canvas.paint_all();