summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-06-11 18:20:42 -0400
committerDeterminant <[email protected]>2020-06-11 18:20:42 -0400
commitba04108ee179b00c97708aa6ece6c6fff53101d0 (patch)
tree9e296a728f7825957f9495993d58462f0144a4e5
parentd3137d4c37d66ace3ea4583afa3edfed48f42988 (diff)
tentatively add async fn
-rw-r--r--Cargo.lock12
-rw-r--r--Cargo.toml1
-rw-r--r--src/wal.rs23
-rw-r--r--tests/common/mod.rs8
4 files changed, 31 insertions, 13 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 7e4e668..a8226bb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -19,6 +19,17 @@ dependencies = [
]
[[package]]
+name = "async-trait"
+version = "0.1.35"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "89cb5d814ab2a47fd66d3266e9efccb53ca4c740b7451043b8ffcf9a6208f3f8"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
name = "autocfg"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -193,6 +204,7 @@ dependencies = [
name = "growth-ring"
version = "0.1.0"
dependencies = [
+ "async-trait",
"crc",
"futures",
"hex",
diff --git a/Cargo.toml b/Cargo.toml
index 50f0376..7b302b5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,6 +11,7 @@ crc = "1.8.1"
lru = "0.5.1"
scan_fmt = "0.2.5"
regex = "1"
+async-trait = "0.1"
[dev-dependencies]
hex = "0.4.2"
diff --git a/src/wal.rs b/src/wal.rs
index 39c4390..85b144b 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,4 +1,6 @@
+use async_trait::async_trait;
use std::collections::BinaryHeap;
+use std::future::Future;
#[repr(u8)]
enum WALRingType {
@@ -65,9 +67,10 @@ struct WALState {
file_nbit: u64,
}
+#[async_trait(?Send)]
pub trait WALFile {
/// Initialize the file space in [offset, offset + length) to zero.
- fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
+ async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
/// Truncate a file to a specified length.
fn truncate(&self, length: usize) -> Result<(), ()>;
/// Write data with offset. We assume the actual writes on the storage medium are _strictly
@@ -75,8 +78,8 @@ pub trait WALFile {
/// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed
/// by most OS). Additionally, the final write caused by each invocation of this function
/// should be _atomic_ (the entire single write should be all or nothing).
- fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
- /// Read data with offset.
+ async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
+ /// Read data with offset. Return Ok(None) when it reaches EOF.
fn read(
&self,
offset: WALPos,
@@ -158,7 +161,7 @@ impl<F: WALStore> WALFilePool<F> {
}
// TODO: evict stale handles
- fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> {
+ async fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> {
// pre-allocate the file space
let mut fid = writes[0].0 >> self.file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
@@ -175,7 +178,7 @@ impl<F: WALStore> WALFilePool<F> {
last_h.allocate(
alloc_start,
(alloc_end - alloc_start) as usize,
- )?;
+ ).await?;
last_h = *h;
alloc_start = 0;
alloc_end = alloc_start + w.len() as u64;
@@ -184,10 +187,10 @@ impl<F: WALStore> WALFilePool<F> {
alloc_end += w.len() as u64;
}
}
- last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?;
+ last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize).await?;
for (off, w) in writes.into_iter() {
self.get_file(off >> self.file_nbit, true)?
- .write(off & (self.file_size - 1), w)?;
+ .write(off & (self.file_size - 1), w).await?;
}
Ok(())
}
@@ -231,10 +234,10 @@ impl<F: WALStore> WALWriter<F> {
/// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
/// function returns. The caller then has the knowledge of WAL writes so it should defer
/// actual data writes after WAL writes.
- pub fn grow<T: AsRef<[WALBytes]>>(
- &mut self,
+ pub fn grow<'a, T: AsRef<[WALBytes]>>(
+ &'a mut self,
records: T,
- ) -> (Box<[WALRingId]>, Result<(), ()>) {
+ ) -> (Box<[WALRingId]>, impl Future<Output = Result<(), ()>> + 'a) {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = self.msize as u32;
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index 6e35ea1..ba302f9 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -1,6 +1,7 @@
#[cfg(test)]
#[allow(dead_code)]
extern crate growthring;
+use async_trait::async_trait;
use growthring::wal::{
WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore,
};
@@ -37,8 +38,9 @@ pub struct WALFileEmul<G: FailGen> {
fgen: Rc<G>,
}
+#[async_trait(?Send)]
impl<G: FailGen> WALFile for WALFileEmul<G> {
- fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> {
+ async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> {
if self.fgen.next_fail() {
return Err(());
}
@@ -60,7 +62,7 @@ impl<G: FailGen> WALFile for WALFileEmul<G> {
Ok(())
}
- fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
+ async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
if self.fgen.next_fail() {
return Err(());
}
@@ -548,7 +550,7 @@ impl PaintingSim {
ringid_map.insert(*rid, ops.len() - 1);
}
// grow could fail
- ok?;
+ futures::executor::block_on(ok)?;
// finish appending to WAL
/*
for rid in rids.iter() {