summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/wal.rs66
1 files changed, 42 insertions, 24 deletions
diff --git a/src/wal.rs b/src/wal.rs
index dd41045..38e4fa2 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -29,6 +29,7 @@ pub struct WALRingId {
}
impl WALRingId {
+ pub fn empty_id() -> Self { WALRingId { start: 0, end: 0 } }
pub fn get_start(&self) -> WALPos { self.start }
pub fn get_end(&self) -> WALPos { self.end }
}
@@ -83,7 +84,7 @@ pub trait WALStore {
/// Apply the payload during recovery. An invocation of the callback waits the application for
/// redoing the given operation to ensure its state is consistent. We assume the necessary
/// changes by the payload has already been persistent when the callback returns.
- fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()>;
+ fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()>;
}
/// The middle layer that manages WAL file handles and invokes public trait functions to actually
@@ -204,12 +205,12 @@ impl<F: WALStore> WALWriter<F> {
for _rec in records.as_ref() {
let mut rec = &_rec[..];
let mut rsize = rec.len() as u32;
- let mut started = false;
+ let mut ring_start = None;
while rsize > 0 {
let remain = self.block_size - bbuff_cur;
if remain > msize {
let d = remain - msize;
- let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64;
+ let rs0 = self.state.next + (bbuff_cur - bbuff_start) as u64;
let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>(
(&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())};
bbuff_cur += msize;
@@ -218,19 +219,26 @@ impl<F: WALStore> WALWriter<F> {
let payload = rec;
blob.crc32 = crc::crc32::checksum_ieee(payload);
blob.rsize = rsize;
- blob.rtype = if started {WALRingType::Last} else {WALRingType::Full};
+ let (rs, rt) = if let Some(rs) = ring_start.take() {
+ (rs, WALRingType::Last)
+ } else {
+ (rs0, WALRingType::Full)
+ };
+ blob.rtype = rt;
&mut self.block_buffer[
bbuff_cur as usize..
bbuff_cur as usize + payload.len()].copy_from_slice(payload);
bbuff_cur += rsize;
rsize = 0;
+ let end = self.state.next + (bbuff_cur - bbuff_start) as u64;
+ res.push(WALRingId{start: rs, end });
} else {
// the remaining block can only accommodate partial rec
let payload = &rec[..d as usize];
blob.crc32 = crc::crc32::checksum_ieee(payload);
blob.rsize = d;
- blob.rtype = if started {WALRingType::Middle} else {
- started = true;
+ blob.rtype = if ring_start.is_some() {WALRingType::Middle} else {
+ ring_start = Some(rs0);
WALRingType::First
};
&mut self.block_buffer[
@@ -240,8 +248,6 @@ impl<F: WALStore> WALWriter<F> {
rsize -= d;
rec = &rec[d as usize..];
}
- let ring_end = self.state.next + (bbuff_cur - bbuff_start) as u64;
- res.push(WALRingId{start: ring_start, end: ring_end});
} else {
// add padding space by moving the point to the end of the block
bbuff_cur = self.block_size;
@@ -272,7 +278,7 @@ impl<F: WALStore> WALWriter<F> {
let msize = std::mem::size_of::<WALRingBlob>() as u64;
let block_size = self.block_size as u64;
for rec in records.as_ref() {
- self.io_complete.push(*rec)
+ self.io_complete.push(*rec);
}
let orig_fid = self.state.first_fid;
while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) {
@@ -307,6 +313,7 @@ impl<F: WALStore> WALLoader<F> {
WALLoader{ file_pool, filename_fmt }
}
+ /// Recover by reading the WAL log files.
pub fn recover(mut self) -> Result<WALWriter<F>, ()> {
let block_size = 1 << self.file_pool.block_nbit;
let msize = std::mem::size_of::<WALRingBlob>() as u32;
@@ -321,7 +328,12 @@ impl<F: WALStore> WALLoader<F> {
let f = self.file_pool.get_file(fid, false)?;
let mut off = 0;
while let Some(header_raw) = f.read(off, msize as usize)? {
- if block_size - (off & (block_size - 1)) <= msize as u64 { break }
+ let block_remain = block_size - (off & (block_size - 1));
+ if block_remain <= msize as u64 {
+ off += block_remain;
+ continue
+ }
+ let ringid_start = (fid << self.file_pool.file_nbit) | off;
off += msize as u64;
let header = unsafe {
std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
@@ -331,30 +343,36 @@ impl<F: WALStore> WALLoader<F> {
assert!(chunks.is_none());
let payload = f.read(off, rsize as usize)?.ok_or(())?;
off += rsize as u64;
- self.file_pool.store.apply_payload(payload)?;
+ self.file_pool.store.apply_payload(
+ payload,
+ ringid_start)?;
},
WALRingType::First => {
assert!(chunks.is_none());
- chunks = Some(vec![f.read(off, rsize as usize)?.ok_or(())?]);
+ chunks = Some((vec![f.read(off, rsize as usize)?.ok_or(())?], ringid_start));
off += rsize as u64;
},
WALRingType::Middle => {
- chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?);
+ if let Some((chunks, _)) = &mut chunks {
+ chunks.push(f.read(off, rsize as usize)?.ok_or(())?);
+ } // otherwise ignore the leftover
off += rsize as u64;
},
WALRingType::Last => {
- chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?);
+ if let Some((mut chunks, ringid_start)) = chunks.take() {
+ chunks.push(f.read(off, rsize as usize)?.ok_or(())?);
+ let mut payload = Vec::new();
+ payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
+ let mut ps = &mut payload[..];
+ for c in chunks {
+ ps[..c.len()].copy_from_slice(&*c);
+ ps = &mut ps[c.len()..];
+ }
+ self.file_pool.store.apply_payload(
+ payload.into_boxed_slice(),
+ ringid_start)?;
+ } // otherwise ignore the leftover
off += rsize as u64;
-
- let _chunks = chunks.take().unwrap();
- let mut payload = Vec::new();
- payload.resize(_chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
- let mut ps = &mut payload[..];
- for c in _chunks {
- ps[..c.len()].copy_from_slice(&*c);
- ps = &mut ps[c.len()..];
- }
- self.file_pool.store.apply_payload(payload.into_boxed_slice())?;
},
WALRingType::Null => break,
}