diff options
author | Determinant <[email protected]> | 2020-06-12 16:41:19 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-12 16:41:19 -0400 |
commit | e39324e62ab4e09fb0dfc7784519e77fedca65cb (patch) | |
tree | 4d8c863b1bf125bce62ecf9f2dd433c144a586eb /src/wal.rs | |
parent | 37ac8fdadb79e041dfdd3038f30f48e828b280f8 (diff) |
...
Diffstat (limited to 'src/wal.rs')
-rw-r--r-- | src/wal.rs | 41 |
1 files changed, 25 insertions, 16 deletions
@@ -24,8 +24,8 @@ struct WALRingBlob { // payload follows } +type WALFileId = u64; pub type WALBytes = Box<[u8]>; -pub type WALFileId = u64; pub type WALPos = u64; #[derive(Eq, PartialEq, Copy, Clone, Debug, Hash)] @@ -77,12 +77,12 @@ struct WALState { pub trait WALFile { /// Initialize the file space in [offset, offset + length) to zero. async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>; - /// Write data with offset. We assume all previous `allocate/truncate` invocations are visible - /// if ordered earlier (should be guaranteed by most OS). Additionally, the final write caused + /// Write data with offset. We assume all previous `allocate`/`truncate` invocations are visible + /// if ordered earlier (should be guaranteed by most OS). Additionally, the write caused /// by each invocation of this function should be _atomic_ (the entire single write should be /// all or nothing). async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>; - /// Read data with offset. Return Ok(None) when it reaches EOF. + /// Read data with offset. Return `Ok(None)` when it reaches EOF. fn read( &self, offset: WALPos, @@ -259,7 +259,11 @@ impl<F: WALStore> WALFilePool<F> { res } - fn remove_files<'a>(&'a mut self, fid_s: u64, fid_e: u64) -> impl Future<Output = Result<(), ()>> + 'a { + fn remove_files<'a>( + &'a mut self, + fid_s: u64, + fid_e: u64, + ) -> impl Future<Output = Result<(), ()>> + 'a { let last_peel = unsafe { std::mem::replace( &mut *self.last_peel.get(), @@ -279,11 +283,12 @@ impl<F: WALStore> WALFilePool<F> { r.await? } Ok(()) - }.shared(); + } + .shared(); unsafe { - (*self.last_peel.get()) = MaybeUninit::new( - std::mem::transmute( - Box::pin(p.clone()) as Pin<Box<dyn Future<Output = _> + 'a>>)) + (*self.last_peel.get()) = + MaybeUninit::new(std::mem::transmute(Box::pin(p.clone()) + as Pin<Box<dyn Future<Output = _> + 'a>>)) } p } @@ -316,9 +321,11 @@ impl<F: WALStore> WALWriter<F> { } } - /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the - /// function returns. The caller then has the knowledge of WAL writes so it should defer - /// actual data writes after WAL writes. + /// Submit a sequence of records to WAL. It returns a vector of futures, each of which + /// corresponds to one record. When a future resolves to `WALRingId`, it is guaranteed the + /// record is already logged. Then, after finalizing the changes encoded by that record to + /// the persistent storage, the caller can recycle the WAL files by invoking the given + /// `peel` with the given `WALRingId`s. pub fn grow<'a, T: AsRef<[WALBytes]>>( &'a mut self, records: T, @@ -455,8 +462,9 @@ impl<F: WALStore> WALWriter<F> { res } - /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are - /// complete so that it could automatically remove obsolete WAL files. + /// Inform the `WALWriter` that some data writes are complete so that it could automatically + /// remove obsolete WAL files. The given list of `WALRingId` does not need to be ordered and + /// could be of arbitrary length. pub fn peel<'a, T: AsRef<[WALRingId]>>( &'a mut self, records: T, @@ -468,7 +476,8 @@ impl<F: WALStore> WALWriter<F> { state.io_complete.push(*rec); } let orig_fid = state.first_fid; - while let Some(s) = state.io_complete.peek().and_then(|&e| Some(e.start)) + while let Some(s) = + state.io_complete.peek().and_then(|&e| Some(e.start)) { if s != state.next_complete { break; @@ -509,7 +518,7 @@ impl WALLoader { } } - /// Recover by reading the WAL log files. + /// Recover by reading the WAL files. pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> { let mut file_pool = WALFilePool::new( store, |