diff options
author | Determinant <[email protected]> | 2020-06-12 15:13:20 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-12 15:13:20 -0400 |
commit | c3c606a9f6754b115054569304ae612225da3ed3 (patch) | |
tree | 64cca78a341379e6b7ee2af26fe8d516d4582beb /src/wal.rs | |
parent | 5b6bb306bb1d20bdab0e1d7752f2b7b7f16a2b7e (diff) |
order writes from different invocations of grow properlyasync-writes
Diffstat (limited to 'src/wal.rs')
-rw-r--r-- | src/wal.rs | 41 |
1 files changed, 30 insertions, 11 deletions
@@ -1,8 +1,9 @@ use async_trait::async_trait; use futures::future::{self, FutureExt, TryFutureExt}; -use std::cell::RefCell; +use std::cell::{RefCell, UnsafeCell}; use std::collections::BinaryHeap; use std::future::Future; +use std::mem::MaybeUninit; use std::pin::Pin; #[repr(u8)] @@ -74,13 +75,10 @@ struct WALState { pub trait WALFile { /// Initialize the file space in [offset, offset + length) to zero. async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>; - /// Truncate a file to a specified length. - fn truncate(&self, length: usize) -> Result<(), ()>; - /// Write data with offset. We assume the actual writes on the storage medium are _strictly - /// ordered_ the same way as this callback is invoked. We also assume all previous - /// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed - /// by most OS). Additionally, the final write caused by each invocation of this function - /// should be _atomic_ (the entire single write should be all or nothing). + /// Write data with offset. We assume all previous `allocate/truncate` invocations are visible + /// if ordered earlier (should be guaranteed by most OS). Additionally, the final write caused + /// by each invocation of this function should be _atomic_ (the entire single write should be + /// all or nothing). async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>; /// Read data with offset. Return Ok(None) when it reaches EOF. fn read( @@ -88,6 +86,8 @@ pub trait WALFile { offset: WALPos, length: usize, ) -> Result<Option<WALBytes>, ()>; + /// Truncate a file to a specified length. + fn truncate(&self, length: usize) -> Result<(), ()>; } #[async_trait(?Send)] @@ -120,6 +120,8 @@ pub trait WALStore { struct WALFilePool<F: WALStore> { store: F, handles: RefCell<lru::LruCache<WALFileId, Box<dyn WALFile>>>, + last_write: + UnsafeCell<MaybeUninit<Pin<Box<dyn Future<Output = Result<(), ()>>>>>>, file_nbit: u64, file_size: u64, block_nbit: u64, @@ -132,6 +134,9 @@ impl<F: WALStore> WALFilePool<F> { WALFilePool { store, handles: RefCell::new(lru::LruCache::new(cache_size)), + last_write: UnsafeCell::new(MaybeUninit::new(Box::pin( + future::ready(Ok(())), + ))), file_nbit, file_size: 1 << (file_nbit as u64), block_nbit, @@ -142,11 +147,11 @@ impl<F: WALStore> WALFilePool<F> { format!("{:08x}.log", fid) } - fn get_file<'a>( - &'a self, + fn get_file( + &self, fid: u64, touch: bool, - ) -> impl Future<Output = Result<&'a dyn WALFile, ()>> { + ) -> impl Future<Output = Result<&dyn WALFile, ()>> { async move { if let Some(h) = self.handles.borrow_mut().get(&fid) { return Ok(unsafe { &*(&**h as *const dyn WALFile) }); @@ -186,8 +191,16 @@ impl<F: WALStore> WALFilePool<F> { let mut fid = writes[0].0 >> file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); let mut alloc_end = alloc_start + writes[0].1.len() as u64; + let last_write = unsafe { + std::mem::replace( + &mut *self.last_write.get(), + std::mem::MaybeUninit::uninit(), + ) + .assume_init() + }; // pre-allocate the file space let alloc = async move { + last_write.await?; let mut last_h: Option< Pin<Box<dyn Future<Output = Result<&'a dyn WALFile, ()>> + 'a>>, > = None; @@ -231,6 +244,12 @@ impl<F: WALStore> WALFilePool<F> { prev = Box::pin(w.clone()); res.push(Box::pin(w) as Pin<Box<dyn Future<Output = _> + 'a>>) } + unsafe { + (*self.last_write.get()) = MaybeUninit::new(std::mem::transmute::< + Pin<Box<dyn Future<Output = _> + 'a>>, + Pin<Box<dyn Future<Output = _> + 'static>>, + >(prev)) + } res } |