diff options
author | Determinant <[email protected]> | 2020-06-10 22:35:21 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-10 22:35:21 -0400 |
commit | 557392fe0672bc6b83ef2e0fec2c8ff7a6f22a54 (patch) | |
tree | 63ff6751fb2ef4b44a9e1949cca9df329d7fb9b8 /src | |
parent | 5b3cadb74fa4de64cf1006808167d36dfbc45a8d (diff) |
fix bugs in dealing with long WAL rings
Diffstat (limited to 'src')
-rw-r--r-- | src/wal.rs | 66 |
1 files changed, 42 insertions, 24 deletions
@@ -29,6 +29,7 @@ pub struct WALRingId { } impl WALRingId { + pub fn empty_id() -> Self { WALRingId { start: 0, end: 0 } } pub fn get_start(&self) -> WALPos { self.start } pub fn get_end(&self) -> WALPos { self.end } } @@ -83,7 +84,7 @@ pub trait WALStore { /// Apply the payload during recovery. An invocation of the callback waits the application for /// redoing the given operation to ensure its state is consistent. We assume the necessary /// changes by the payload has already been persistent when the callback returns. - fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()>; + fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()>; } /// The middle layer that manages WAL file handles and invokes public trait functions to actually @@ -204,12 +205,12 @@ impl<F: WALStore> WALWriter<F> { for _rec in records.as_ref() { let mut rec = &_rec[..]; let mut rsize = rec.len() as u32; - let mut started = false; + let mut ring_start = None; while rsize > 0 { let remain = self.block_size - bbuff_cur; if remain > msize { let d = remain - msize; - let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64; + let rs0 = self.state.next + (bbuff_cur - bbuff_start) as u64; let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>( (&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())}; bbuff_cur += msize; @@ -218,19 +219,26 @@ impl<F: WALStore> WALWriter<F> { let payload = rec; blob.crc32 = crc::crc32::checksum_ieee(payload); blob.rsize = rsize; - blob.rtype = if started {WALRingType::Last} else {WALRingType::Full}; + let (rs, rt) = if let Some(rs) = ring_start.take() { + (rs, WALRingType::Last) + } else { + (rs0, WALRingType::Full) + }; + blob.rtype = rt; &mut self.block_buffer[ bbuff_cur as usize.. bbuff_cur as usize + payload.len()].copy_from_slice(payload); bbuff_cur += rsize; rsize = 0; + let end = self.state.next + (bbuff_cur - bbuff_start) as u64; + res.push(WALRingId{start: rs, end }); } else { // the remaining block can only accommodate partial rec let payload = &rec[..d as usize]; blob.crc32 = crc::crc32::checksum_ieee(payload); blob.rsize = d; - blob.rtype = if started {WALRingType::Middle} else { - started = true; + blob.rtype = if ring_start.is_some() {WALRingType::Middle} else { + ring_start = Some(rs0); WALRingType::First }; &mut self.block_buffer[ @@ -240,8 +248,6 @@ impl<F: WALStore> WALWriter<F> { rsize -= d; rec = &rec[d as usize..]; } - let ring_end = self.state.next + (bbuff_cur - bbuff_start) as u64; - res.push(WALRingId{start: ring_start, end: ring_end}); } else { // add padding space by moving the point to the end of the block bbuff_cur = self.block_size; @@ -272,7 +278,7 @@ impl<F: WALStore> WALWriter<F> { let msize = std::mem::size_of::<WALRingBlob>() as u64; let block_size = self.block_size as u64; for rec in records.as_ref() { - self.io_complete.push(*rec) + self.io_complete.push(*rec); } let orig_fid = self.state.first_fid; while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) { @@ -307,6 +313,7 @@ impl<F: WALStore> WALLoader<F> { WALLoader{ file_pool, filename_fmt } } + /// Recover by reading the WAL log files. pub fn recover(mut self) -> Result<WALWriter<F>, ()> { let block_size = 1 << self.file_pool.block_nbit; let msize = std::mem::size_of::<WALRingBlob>() as u32; @@ -321,7 +328,12 @@ impl<F: WALStore> WALLoader<F> { let f = self.file_pool.get_file(fid, false)?; let mut off = 0; while let Some(header_raw) = f.read(off, msize as usize)? { - if block_size - (off & (block_size - 1)) <= msize as u64 { break } + let block_remain = block_size - (off & (block_size - 1)); + if block_remain <= msize as u64 { + off += block_remain; + continue + } + let ringid_start = (fid << self.file_pool.file_nbit) | off; off += msize as u64; let header = unsafe { std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; @@ -331,30 +343,36 @@ impl<F: WALStore> WALLoader<F> { assert!(chunks.is_none()); let payload = f.read(off, rsize as usize)?.ok_or(())?; off += rsize as u64; - self.file_pool.store.apply_payload(payload)?; + self.file_pool.store.apply_payload( + payload, + ringid_start)?; }, WALRingType::First => { assert!(chunks.is_none()); - chunks = Some(vec![f.read(off, rsize as usize)?.ok_or(())?]); + chunks = Some((vec![f.read(off, rsize as usize)?.ok_or(())?], ringid_start)); off += rsize as u64; }, WALRingType::Middle => { - chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?); + if let Some((chunks, _)) = &mut chunks { + chunks.push(f.read(off, rsize as usize)?.ok_or(())?); + } // otherwise ignore the leftover off += rsize as u64; }, WALRingType::Last => { - chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?); + if let Some((mut chunks, ringid_start)) = chunks.take() { + chunks.push(f.read(off, rsize as usize)?.ok_or(())?); + let mut payload = Vec::new(); + payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0); + let mut ps = &mut payload[..]; + for c in chunks { + ps[..c.len()].copy_from_slice(&*c); + ps = &mut ps[c.len()..]; + } + self.file_pool.store.apply_payload( + payload.into_boxed_slice(), + ringid_start)?; + } // otherwise ignore the leftover off += rsize as u64; - - let _chunks = chunks.take().unwrap(); - let mut payload = Vec::new(); - payload.resize(_chunks.iter().fold(0, |acc, v| acc + v.len()), 0); - let mut ps = &mut payload[..]; - for c in _chunks { - ps[..c.len()].copy_from_slice(&*c); - ps = &mut ps[c.len()..]; - } - self.file_pool.store.apply_payload(payload.into_boxed_slice())?; }, WALRingType::Null => break, } |