diff options
author | Determinant <[email protected]> | 2020-06-11 18:20:42 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-11 18:20:42 -0400 |
commit | ba04108ee179b00c97708aa6ece6c6fff53101d0 (patch) | |
tree | 9e296a728f7825957f9495993d58462f0144a4e5 /src/wal.rs | |
parent | d3137d4c37d66ace3ea4583afa3edfed48f42988 (diff) |
tentatively add async fn
Diffstat (limited to 'src/wal.rs')
-rw-r--r-- | src/wal.rs | 23 |
1 files changed, 13 insertions, 10 deletions
@@ -1,4 +1,6 @@ +use async_trait::async_trait; use std::collections::BinaryHeap; +use std::future::Future; #[repr(u8)] enum WALRingType { @@ -65,9 +67,10 @@ struct WALState { file_nbit: u64, } +#[async_trait(?Send)] pub trait WALFile { /// Initialize the file space in [offset, offset + length) to zero. - fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>; + async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>; /// Truncate a file to a specified length. fn truncate(&self, length: usize) -> Result<(), ()>; /// Write data with offset. We assume the actual writes on the storage medium are _strictly @@ -75,8 +78,8 @@ pub trait WALFile { /// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed /// by most OS). Additionally, the final write caused by each invocation of this function /// should be _atomic_ (the entire single write should be all or nothing). - fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>; - /// Read data with offset. + async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>; + /// Read data with offset. Return Ok(None) when it reaches EOF. fn read( &self, offset: WALPos, @@ -158,7 +161,7 @@ impl<F: WALStore> WALFilePool<F> { } // TODO: evict stale handles - fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> { + async fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> { // pre-allocate the file space let mut fid = writes[0].0 >> self.file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); @@ -175,7 +178,7 @@ impl<F: WALStore> WALFilePool<F> { last_h.allocate( alloc_start, (alloc_end - alloc_start) as usize, - )?; + ).await?; last_h = *h; alloc_start = 0; alloc_end = alloc_start + w.len() as u64; @@ -184,10 +187,10 @@ impl<F: WALStore> WALFilePool<F> { alloc_end += w.len() as u64; } } - last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?; + last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize).await?; for (off, w) in writes.into_iter() { self.get_file(off >> self.file_nbit, true)? - .write(off & (self.file_size - 1), w)?; + .write(off & (self.file_size - 1), w).await?; } Ok(()) } @@ -231,10 +234,10 @@ impl<F: WALStore> WALWriter<F> { /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the /// function returns. The caller then has the knowledge of WAL writes so it should defer /// actual data writes after WAL writes. - pub fn grow<T: AsRef<[WALBytes]>>( - &mut self, + pub fn grow<'a, T: AsRef<[WALBytes]>>( + &'a mut self, records: T, - ) -> (Box<[WALRingId]>, Result<(), ()>) { + ) -> (Box<[WALRingId]>, impl Future<Output = Result<(), ()>> + 'a) { let mut res = Vec::new(); let mut writes = Vec::new(); let msize = self.msize as u32; |