aboutsummaryrefslogtreecommitdiff
path: root/src/wal.rs
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-06-12 16:41:19 -0400
committerDeterminant <[email protected]>2020-06-12 16:41:19 -0400
commite39324e62ab4e09fb0dfc7784519e77fedca65cb (patch)
tree4d8c863b1bf125bce62ecf9f2dd433c144a586eb /src/wal.rs
parent37ac8fdadb79e041dfdd3038f30f48e828b280f8 (diff)
...
Diffstat (limited to 'src/wal.rs')
-rw-r--r--src/wal.rs41
1 files changed, 25 insertions, 16 deletions
diff --git a/src/wal.rs b/src/wal.rs
index 8d92d01..ff76bdb 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -24,8 +24,8 @@ struct WALRingBlob {
// payload follows
}
+type WALFileId = u64;
pub type WALBytes = Box<[u8]>;
-pub type WALFileId = u64;
pub type WALPos = u64;
#[derive(Eq, PartialEq, Copy, Clone, Debug, Hash)]
@@ -77,12 +77,12 @@ struct WALState {
pub trait WALFile {
/// Initialize the file space in [offset, offset + length) to zero.
async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
- /// Write data with offset. We assume all previous `allocate/truncate` invocations are visible
- /// if ordered earlier (should be guaranteed by most OS). Additionally, the final write caused
+ /// Write data with offset. We assume all previous `allocate`/`truncate` invocations are visible
+ /// if ordered earlier (should be guaranteed by most OS). Additionally, the write caused
/// by each invocation of this function should be _atomic_ (the entire single write should be
/// all or nothing).
async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
- /// Read data with offset. Return Ok(None) when it reaches EOF.
+ /// Read data with offset. Return `Ok(None)` when it reaches EOF.
fn read(
&self,
offset: WALPos,
@@ -259,7 +259,11 @@ impl<F: WALStore> WALFilePool<F> {
res
}
- fn remove_files<'a>(&'a mut self, fid_s: u64, fid_e: u64) -> impl Future<Output = Result<(), ()>> + 'a {
+ fn remove_files<'a>(
+ &'a mut self,
+ fid_s: u64,
+ fid_e: u64,
+ ) -> impl Future<Output = Result<(), ()>> + 'a {
let last_peel = unsafe {
std::mem::replace(
&mut *self.last_peel.get(),
@@ -279,11 +283,12 @@ impl<F: WALStore> WALFilePool<F> {
r.await?
}
Ok(())
- }.shared();
+ }
+ .shared();
unsafe {
- (*self.last_peel.get()) = MaybeUninit::new(
- std::mem::transmute(
- Box::pin(p.clone()) as Pin<Box<dyn Future<Output = _> + 'a>>))
+ (*self.last_peel.get()) =
+ MaybeUninit::new(std::mem::transmute(Box::pin(p.clone())
+ as Pin<Box<dyn Future<Output = _> + 'a>>))
}
p
}
@@ -316,9 +321,11 @@ impl<F: WALStore> WALWriter<F> {
}
}
- /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
- /// function returns. The caller then has the knowledge of WAL writes so it should defer
- /// actual data writes after WAL writes.
+ /// Submit a sequence of records to WAL. It returns a vector of futures, each of which
+ /// corresponds to one record. When a future resolves to `WALRingId`, it is guaranteed the
+ /// record is already logged. Then, after finalizing the changes encoded by that record to
+ /// the persistent storage, the caller can recycle the WAL files by invoking the given
+ /// `peel` with the given `WALRingId`s.
pub fn grow<'a, T: AsRef<[WALBytes]>>(
&'a mut self,
records: T,
@@ -455,8 +462,9 @@ impl<F: WALStore> WALWriter<F> {
res
}
- /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
- /// complete so that it could automatically remove obsolete WAL files.
+ /// Inform the `WALWriter` that some data writes are complete so that it could automatically
+ /// remove obsolete WAL files. The given list of `WALRingId` does not need to be ordered and
+ /// could be of arbitrary length.
pub fn peel<'a, T: AsRef<[WALRingId]>>(
&'a mut self,
records: T,
@@ -468,7 +476,8 @@ impl<F: WALStore> WALWriter<F> {
state.io_complete.push(*rec);
}
let orig_fid = state.first_fid;
- while let Some(s) = state.io_complete.peek().and_then(|&e| Some(e.start))
+ while let Some(s) =
+ state.io_complete.peek().and_then(|&e| Some(e.start))
{
if s != state.next_complete {
break;
@@ -509,7 +518,7 @@ impl WALLoader {
}
}
- /// Recover by reading the WAL log files.
+ /// Recover by reading the WAL files.
pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> {
let mut file_pool = WALFilePool::new(
store,