From c3c606a9f6754b115054569304ae612225da3ed3 Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 12 Jun 2020 15:13:20 -0400 Subject: order writes from different invocations of grow properly --- src/wal.rs | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/src/wal.rs b/src/wal.rs index 6f3b424..454fbd0 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -1,8 +1,9 @@ use async_trait::async_trait; use futures::future::{self, FutureExt, TryFutureExt}; -use std::cell::RefCell; +use std::cell::{RefCell, UnsafeCell}; use std::collections::BinaryHeap; use std::future::Future; +use std::mem::MaybeUninit; use std::pin::Pin; #[repr(u8)] @@ -74,13 +75,10 @@ struct WALState { pub trait WALFile { /// Initialize the file space in [offset, offset + length) to zero. async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>; - /// Truncate a file to a specified length. - fn truncate(&self, length: usize) -> Result<(), ()>; - /// Write data with offset. We assume the actual writes on the storage medium are _strictly - /// ordered_ the same way as this callback is invoked. We also assume all previous - /// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed - /// by most OS). Additionally, the final write caused by each invocation of this function - /// should be _atomic_ (the entire single write should be all or nothing). + /// Write data with offset. We assume all previous `allocate/truncate` invocations are visible + /// if ordered earlier (should be guaranteed by most OS). Additionally, the final write caused + /// by each invocation of this function should be _atomic_ (the entire single write should be + /// all or nothing). async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>; /// Read data with offset. Return Ok(None) when it reaches EOF. fn read( @@ -88,6 +86,8 @@ pub trait WALFile { offset: WALPos, length: usize, ) -> Result, ()>; + /// Truncate a file to a specified length. + fn truncate(&self, length: usize) -> Result<(), ()>; } #[async_trait(?Send)] @@ -120,6 +120,8 @@ pub trait WALStore { struct WALFilePool { store: F, handles: RefCell>>, + last_write: + UnsafeCell>>>>>, file_nbit: u64, file_size: u64, block_nbit: u64, @@ -132,6 +134,9 @@ impl WALFilePool { WALFilePool { store, handles: RefCell::new(lru::LruCache::new(cache_size)), + last_write: UnsafeCell::new(MaybeUninit::new(Box::pin( + future::ready(Ok(())), + ))), file_nbit, file_size: 1 << (file_nbit as u64), block_nbit, @@ -142,11 +147,11 @@ impl WALFilePool { format!("{:08x}.log", fid) } - fn get_file<'a>( - &'a self, + fn get_file( + &self, fid: u64, touch: bool, - ) -> impl Future> { + ) -> impl Future> { async move { if let Some(h) = self.handles.borrow_mut().get(&fid) { return Ok(unsafe { &*(&**h as *const dyn WALFile) }); @@ -186,8 +191,16 @@ impl WALFilePool { let mut fid = writes[0].0 >> file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); let mut alloc_end = alloc_start + writes[0].1.len() as u64; + let last_write = unsafe { + std::mem::replace( + &mut *self.last_write.get(), + std::mem::MaybeUninit::uninit(), + ) + .assume_init() + }; // pre-allocate the file space let alloc = async move { + last_write.await?; let mut last_h: Option< Pin> + 'a>>, > = None; @@ -231,6 +244,12 @@ impl WALFilePool { prev = Box::pin(w.clone()); res.push(Box::pin(w) as Pin + 'a>>) } + unsafe { + (*self.last_write.get()) = MaybeUninit::new(std::mem::transmute::< + Pin + 'a>>, + Pin + 'static>>, + >(prev)) + } res } -- cgit v1.2.3-70-g09d2