aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2020-06-12 15:13:20 -0400
committerDeterminant <tederminant@gmail.com>2020-06-12 15:13:20 -0400
commitc3c606a9f6754b115054569304ae612225da3ed3 (patch)
tree64cca78a341379e6b7ee2af26fe8d516d4582beb
parent5b6bb306bb1d20bdab0e1d7752f2b7b7f16a2b7e (diff)
order writes from different invocations of grow properlyasync-writes
-rw-r--r--src/wal.rs41
1 files changed, 30 insertions, 11 deletions
diff --git a/src/wal.rs b/src/wal.rs
index 6f3b424..454fbd0 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,8 +1,9 @@
use async_trait::async_trait;
use futures::future::{self, FutureExt, TryFutureExt};
-use std::cell::RefCell;
+use std::cell::{RefCell, UnsafeCell};
use std::collections::BinaryHeap;
use std::future::Future;
+use std::mem::MaybeUninit;
use std::pin::Pin;
#[repr(u8)]
@@ -74,13 +75,10 @@ struct WALState {
pub trait WALFile {
/// Initialize the file space in [offset, offset + length) to zero.
async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
- /// Truncate a file to a specified length.
- fn truncate(&self, length: usize) -> Result<(), ()>;
- /// Write data with offset. We assume the actual writes on the storage medium are _strictly
- /// ordered_ the same way as this callback is invoked. We also assume all previous
- /// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed
- /// by most OS). Additionally, the final write caused by each invocation of this function
- /// should be _atomic_ (the entire single write should be all or nothing).
+ /// Write data with offset. We assume all previous `allocate/truncate` invocations are visible
+ /// if ordered earlier (should be guaranteed by most OS). Additionally, the final write caused
+ /// by each invocation of this function should be _atomic_ (the entire single write should be
+ /// all or nothing).
async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
/// Read data with offset. Return Ok(None) when it reaches EOF.
fn read(
@@ -88,6 +86,8 @@ pub trait WALFile {
offset: WALPos,
length: usize,
) -> Result<Option<WALBytes>, ()>;
+ /// Truncate a file to a specified length.
+ fn truncate(&self, length: usize) -> Result<(), ()>;
}
#[async_trait(?Send)]
@@ -120,6 +120,8 @@ pub trait WALStore {
struct WALFilePool<F: WALStore> {
store: F,
handles: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>,
+ last_write:
+ UnsafeCell<MaybeUninit<Pin<Box<dyn Future<Output = Result<(), ()>>>>>>,
file_nbit: u64,
file_size: u64,
block_nbit: u64,
@@ -132,6 +134,9 @@ impl<F: WALStore> WALFilePool<F> {
WALFilePool {
store,
handles: RefCell::new(lru::LruCache::new(cache_size)),
+ last_write: UnsafeCell::new(MaybeUninit::new(Box::pin(
+ future::ready(Ok(())),
+ ))),
file_nbit,
file_size: 1 << (file_nbit as u64),
block_nbit,
@@ -142,11 +147,11 @@ impl<F: WALStore> WALFilePool<F> {
format!("{:08x}.log", fid)
}
- fn get_file<'a>(
- &'a self,
+ fn get_file(
+ &self,
fid: u64,
touch: bool,
- ) -> impl Future<Output = Result<&'a dyn WALFile, ()>> {
+ ) -> impl Future<Output = Result<&dyn WALFile, ()>> {
async move {
if let Some(h) = self.handles.borrow_mut().get(&fid) {
return Ok(unsafe { &*(&**h as *const dyn WALFile) });
@@ -186,8 +191,16 @@ impl<F: WALStore> WALFilePool<F> {
let mut fid = writes[0].0 >> file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
let mut alloc_end = alloc_start + writes[0].1.len() as u64;
+ let last_write = unsafe {
+ std::mem::replace(
+ &mut *self.last_write.get(),
+ std::mem::MaybeUninit::uninit(),
+ )
+ .assume_init()
+ };
// pre-allocate the file space
let alloc = async move {
+ last_write.await?;
let mut last_h: Option<
Pin<Box<dyn Future<Output = Result<&'a dyn WALFile, ()>> + 'a>>,
> = None;
@@ -231,6 +244,12 @@ impl<F: WALStore> WALFilePool<F> {
prev = Box::pin(w.clone());
res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _> + 'a>>)
}
+ unsafe {
+ (*self.last_write.get()) = MaybeUninit::new(std::mem::transmute::<
+ Pin<Box<dyn Future<Output = _> + 'a>>,
+ Pin<Box<dyn Future<Output = _> + 'static>>,
+ >(prev))
+ }
res
}