diff options
author | Determinant <[email protected]> | 2020-10-12 20:13:26 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-10-12 20:13:26 -0400 |
commit | 54ce471c15fda9812d3cd7ef6222701b19892d30 (patch) | |
tree | 05f071fbe947242495cba6df84cbb5c32d9b024d /src/wal.rs | |
parent | 395bd19d51c5f5e0bfd3b5897ddce5f6bef5ec79 (diff) |
add assertion for empty record payload; make `load` async funcv0.1.6
Diffstat (limited to 'src/wal.rs')
-rw-r--r-- | src/wal.rs | 30 |
1 files changed, 18 insertions, 12 deletions
@@ -1,5 +1,4 @@ use async_trait::async_trait; -use futures::executor::block_on; use futures::future::{self, FutureExt, TryFutureExt}; use std::cell::{RefCell, UnsafeCell}; use std::collections::{hash_map, BinaryHeap, HashMap}; @@ -106,7 +105,7 @@ pub trait WALFile { /// all or nothing). async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>; /// Read data with offset. Return `Ok(None)` when it reaches EOF. - fn read( + async fn read( &self, offset: WALPos, length: usize, @@ -408,7 +407,8 @@ impl<F: WALStore> WALWriter<F> { /// corresponds to one record. When a future resolves to `WALRingId`, it is guaranteed the /// record is already logged. Then, after finalizing the changes encoded by that record to /// the persistent storage, the caller can recycle the WAL files by invoking the given - /// `peel` with the given `WALRingId`s. + /// `peel` with the given `WALRingId`s. Note: each serialized record should contain at least 1 + /// byte (empty record payload will result in assertion failure). pub fn grow<'a, R: Record + 'a>( &'a mut self, records: Vec<R>, @@ -427,6 +427,7 @@ impl<F: WALStore> WALWriter<F> { let mut rec = &bytes[..]; let mut rsize = rec.len() as u32; let mut ring_start = None; + assert!(rsize > 0); while rsize > 0 { let remain = self.block_size - bbuff_cur; if remain > msize { @@ -647,7 +648,10 @@ impl WALLoader { } /// Recover by reading the WAL files. - pub fn load<S: WALStore, F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>( + pub async fn load< + S: WALStore, + F: FnMut(WALBytes, WALRingId) -> Result<(), ()>, + >( &self, store: S, mut recover_func: F, @@ -674,14 +678,14 @@ impl WALLoader { let mut skip = false; for fname in logfiles.into_iter() { let fid = file_pool.get_fid(&fname); - let f = block_on(file_pool.get_file(fid, false))?; + let f = file_pool.get_file(fid, false).await?; let mut off = 0; if skip { f.truncate(0)?; - block_on(file_pool.store.remove_file(fname))?; + file_pool.store.remove_file(fname).await?; continue; } - while let Some(header_raw) = f.read(off, msize as usize)? { + while let Some(header_raw) = f.read(off, msize as usize).await? { let ringid_start = (fid << file_pool.file_nbit) + off; off += msize as u64; let header = unsafe { @@ -693,7 +697,8 @@ impl WALLoader { match header.rtype { WALRingType::Full => { assert!(chunks.is_none()); - let payload = f.read(off, rsize as usize)?.ok_or(())?; + let payload = + f.read(off, rsize as usize).await?.ok_or(())?; // TODO: improve the behavior when CRC32 fails if !self.verify_checksum(&payload, header.crc32)? { skip = true; @@ -710,7 +715,8 @@ impl WALLoader { } WALRingType::First => { assert!(chunks.is_none()); - let chunk = f.read(off, rsize as usize)?.ok_or(())?; + let chunk = + f.read(off, rsize as usize).await?.ok_or(())?; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; break; @@ -721,7 +727,7 @@ impl WALLoader { WALRingType::Middle => { if let Some((chunks, _)) = &mut chunks { let chunk = - f.read(off, rsize as usize)?.ok_or(())?; + f.read(off, rsize as usize).await?.ok_or(())?; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; break; @@ -734,7 +740,7 @@ impl WALLoader { if let Some((mut chunks, ringid_start)) = chunks.take() { let chunk = - f.read(off, rsize as usize)?.ok_or(())?; + f.read(off, rsize as usize).await?.ok_or(())?; off += rsize as u64; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; @@ -772,7 +778,7 @@ impl WALLoader { } } f.truncate(0)?; - block_on(file_pool.store.remove_file(fname))?; + file_pool.store.remove_file(fname).await?; } file_pool.reset(); Ok(WALWriter::new( |