summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/wal.rs23
1 files changed, 13 insertions, 10 deletions
diff --git a/src/wal.rs b/src/wal.rs
index 39c4390..85b144b 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,4 +1,6 @@
+use async_trait::async_trait;
use std::collections::BinaryHeap;
+use std::future::Future;
#[repr(u8)]
enum WALRingType {
@@ -65,9 +67,10 @@ struct WALState {
file_nbit: u64,
}
+#[async_trait(?Send)]
pub trait WALFile {
/// Initialize the file space in [offset, offset + length) to zero.
- fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
+ async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
/// Truncate a file to a specified length.
fn truncate(&self, length: usize) -> Result<(), ()>;
/// Write data with offset. We assume the actual writes on the storage medium are _strictly
@@ -75,8 +78,8 @@ pub trait WALFile {
/// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed
/// by most OS). Additionally, the final write caused by each invocation of this function
/// should be _atomic_ (the entire single write should be all or nothing).
- fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
- /// Read data with offset.
+ async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
+ /// Read data with offset. Return Ok(None) when it reaches EOF.
fn read(
&self,
offset: WALPos,
@@ -158,7 +161,7 @@ impl<F: WALStore> WALFilePool<F> {
}
// TODO: evict stale handles
- fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> {
+ async fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> {
// pre-allocate the file space
let mut fid = writes[0].0 >> self.file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
@@ -175,7 +178,7 @@ impl<F: WALStore> WALFilePool<F> {
last_h.allocate(
alloc_start,
(alloc_end - alloc_start) as usize,
- )?;
+ ).await?;
last_h = *h;
alloc_start = 0;
alloc_end = alloc_start + w.len() as u64;
@@ -184,10 +187,10 @@ impl<F: WALStore> WALFilePool<F> {
alloc_end += w.len() as u64;
}
}
- last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?;
+ last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize).await?;
for (off, w) in writes.into_iter() {
self.get_file(off >> self.file_nbit, true)?
- .write(off & (self.file_size - 1), w)?;
+ .write(off & (self.file_size - 1), w).await?;
}
Ok(())
}
@@ -231,10 +234,10 @@ impl<F: WALStore> WALWriter<F> {
/// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
/// function returns. The caller then has the knowledge of WAL writes so it should defer
/// actual data writes after WAL writes.
- pub fn grow<T: AsRef<[WALBytes]>>(
- &mut self,
+ pub fn grow<'a, T: AsRef<[WALBytes]>>(
+ &'a mut self,
records: T,
- ) -> (Box<[WALRingId]>, Result<(), ()>) {
+ ) -> (Box<[WALRingId]>, impl Future<Output = Result<(), ()>> + 'a) {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = self.msize as u32;