aboutsummaryrefslogtreecommitdiff
path: root/src/wal.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wal.rs')
-rw-r--r--src/wal.rs30
1 files changed, 18 insertions, 12 deletions
diff --git a/src/wal.rs b/src/wal.rs
index deae98d..7975f38 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -1,5 +1,4 @@
use async_trait::async_trait;
-use futures::executor::block_on;
use futures::future::{self, FutureExt, TryFutureExt};
use std::cell::{RefCell, UnsafeCell};
use std::collections::{hash_map, BinaryHeap, HashMap};
@@ -106,7 +105,7 @@ pub trait WALFile {
/// all or nothing).
async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
/// Read data with offset. Return `Ok(None)` when it reaches EOF.
- fn read(
+ async fn read(
&self,
offset: WALPos,
length: usize,
@@ -408,7 +407,8 @@ impl<F: WALStore> WALWriter<F> {
/// corresponds to one record. When a future resolves to `WALRingId`, it is guaranteed the
/// record is already logged. Then, after finalizing the changes encoded by that record to
/// the persistent storage, the caller can recycle the WAL files by invoking the given
- /// `peel` with the given `WALRingId`s.
+ /// `peel` with the given `WALRingId`s. Note: each serialized record should contain at least 1
+ /// byte (empty record payload will result in assertion failure).
pub fn grow<'a, R: Record + 'a>(
&'a mut self,
records: Vec<R>,
@@ -427,6 +427,7 @@ impl<F: WALStore> WALWriter<F> {
let mut rec = &bytes[..];
let mut rsize = rec.len() as u32;
let mut ring_start = None;
+ assert!(rsize > 0);
while rsize > 0 {
let remain = self.block_size - bbuff_cur;
if remain > msize {
@@ -647,7 +648,10 @@ impl WALLoader {
}
/// Recover by reading the WAL files.
- pub fn load<S: WALStore, F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>(
+ pub async fn load<
+ S: WALStore,
+ F: FnMut(WALBytes, WALRingId) -> Result<(), ()>,
+ >(
&self,
store: S,
mut recover_func: F,
@@ -674,14 +678,14 @@ impl WALLoader {
let mut skip = false;
for fname in logfiles.into_iter() {
let fid = file_pool.get_fid(&fname);
- let f = block_on(file_pool.get_file(fid, false))?;
+ let f = file_pool.get_file(fid, false).await?;
let mut off = 0;
if skip {
f.truncate(0)?;
- block_on(file_pool.store.remove_file(fname))?;
+ file_pool.store.remove_file(fname).await?;
continue;
}
- while let Some(header_raw) = f.read(off, msize as usize)? {
+ while let Some(header_raw) = f.read(off, msize as usize).await? {
let ringid_start = (fid << file_pool.file_nbit) + off;
off += msize as u64;
let header = unsafe {
@@ -693,7 +697,8 @@ impl WALLoader {
match header.rtype {
WALRingType::Full => {
assert!(chunks.is_none());
- let payload = f.read(off, rsize as usize)?.ok_or(())?;
+ let payload =
+ f.read(off, rsize as usize).await?.ok_or(())?;
// TODO: improve the behavior when CRC32 fails
if !self.verify_checksum(&payload, header.crc32)? {
skip = true;
@@ -710,7 +715,8 @@ impl WALLoader {
}
WALRingType::First => {
assert!(chunks.is_none());
- let chunk = f.read(off, rsize as usize)?.ok_or(())?;
+ let chunk =
+ f.read(off, rsize as usize).await?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
break;
@@ -721,7 +727,7 @@ impl WALLoader {
WALRingType::Middle => {
if let Some((chunks, _)) = &mut chunks {
let chunk =
- f.read(off, rsize as usize)?.ok_or(())?;
+ f.read(off, rsize as usize).await?.ok_or(())?;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
break;
@@ -734,7 +740,7 @@ impl WALLoader {
if let Some((mut chunks, ringid_start)) = chunks.take()
{
let chunk =
- f.read(off, rsize as usize)?.ok_or(())?;
+ f.read(off, rsize as usize).await?.ok_or(())?;
off += rsize as u64;
if !self.verify_checksum(&chunk, header.crc32)? {
skip = true;
@@ -772,7 +778,7 @@ impl WALLoader {
}
}
f.truncate(0)?;
- block_on(file_pool.store.remove_file(fname))?;
+ file_pool.store.remove_file(fname).await?;
}
file_pool.reset();
Ok(WALWriter::new(