summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-06-10 22:35:21 -0400
committerDeterminant <[email protected]>2020-06-10 22:35:21 -0400
commit557392fe0672bc6b83ef2e0fec2c8ff7a6f22a54 (patch)
tree63ff6751fb2ef4b44a9e1949cca9df329d7fb9b8
parent5b3cadb74fa4de64cf1006808167d36dfbc45a8d (diff)
fix bugs in dealing with long WAL rings
-rw-r--r--examples/demo1.rs6
-rw-r--r--src/wal.rs66
-rw-r--r--tests/common/mod.rs64
-rw-r--r--tests/rand_fail.rs65
4 files changed, 128 insertions, 73 deletions
diff --git a/examples/demo1.rs b/examples/demo1.rs
index 40f9562..48d7c2c 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -114,8 +114,10 @@ impl WALStore for WALStoreTest {
Ok(logfiles.into_iter())
}
- fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()> {
- println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap());
+ fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()> {
+ println!("apply_payload(payload={}, wal_off={})",
+ std::str::from_utf8(&payload).unwrap(),
+ wal_off);
Ok(())
}
}
diff --git a/src/wal.rs b/src/wal.rs
index dd41045..38e4fa2 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -29,6 +29,7 @@ pub struct WALRingId {
}
impl WALRingId {
+ pub fn empty_id() -> Self { WALRingId { start: 0, end: 0 } }
pub fn get_start(&self) -> WALPos { self.start }
pub fn get_end(&self) -> WALPos { self.end }
}
@@ -83,7 +84,7 @@ pub trait WALStore {
/// Apply the payload during recovery. An invocation of the callback waits the application for
/// redoing the given operation to ensure its state is consistent. We assume the necessary
/// changes by the payload has already been persistent when the callback returns.
- fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()>;
+ fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()>;
}
/// The middle layer that manages WAL file handles and invokes public trait functions to actually
@@ -204,12 +205,12 @@ impl<F: WALStore> WALWriter<F> {
for _rec in records.as_ref() {
let mut rec = &_rec[..];
let mut rsize = rec.len() as u32;
- let mut started = false;
+ let mut ring_start = None;
while rsize > 0 {
let remain = self.block_size - bbuff_cur;
if remain > msize {
let d = remain - msize;
- let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64;
+ let rs0 = self.state.next + (bbuff_cur - bbuff_start) as u64;
let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>(
(&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())};
bbuff_cur += msize;
@@ -218,19 +219,26 @@ impl<F: WALStore> WALWriter<F> {
let payload = rec;
blob.crc32 = crc::crc32::checksum_ieee(payload);
blob.rsize = rsize;
- blob.rtype = if started {WALRingType::Last} else {WALRingType::Full};
+ let (rs, rt) = if let Some(rs) = ring_start.take() {
+ (rs, WALRingType::Last)
+ } else {
+ (rs0, WALRingType::Full)
+ };
+ blob.rtype = rt;
&mut self.block_buffer[
bbuff_cur as usize..
bbuff_cur as usize + payload.len()].copy_from_slice(payload);
bbuff_cur += rsize;
rsize = 0;
+ let end = self.state.next + (bbuff_cur - bbuff_start) as u64;
+ res.push(WALRingId{start: rs, end });
} else {
// the remaining block can only accommodate partial rec
let payload = &rec[..d as usize];
blob.crc32 = crc::crc32::checksum_ieee(payload);
blob.rsize = d;
- blob.rtype = if started {WALRingType::Middle} else {
- started = true;
+ blob.rtype = if ring_start.is_some() {WALRingType::Middle} else {
+ ring_start = Some(rs0);
WALRingType::First
};
&mut self.block_buffer[
@@ -240,8 +248,6 @@ impl<F: WALStore> WALWriter<F> {
rsize -= d;
rec = &rec[d as usize..];
}
- let ring_end = self.state.next + (bbuff_cur - bbuff_start) as u64;
- res.push(WALRingId{start: ring_start, end: ring_end});
} else {
// add padding space by moving the point to the end of the block
bbuff_cur = self.block_size;
@@ -272,7 +278,7 @@ impl<F: WALStore> WALWriter<F> {
let msize = std::mem::size_of::<WALRingBlob>() as u64;
let block_size = self.block_size as u64;
for rec in records.as_ref() {
- self.io_complete.push(*rec)
+ self.io_complete.push(*rec);
}
let orig_fid = self.state.first_fid;
while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) {
@@ -307,6 +313,7 @@ impl<F: WALStore> WALLoader<F> {
WALLoader{ file_pool, filename_fmt }
}
+ /// Recover by reading the WAL log files.
pub fn recover(mut self) -> Result<WALWriter<F>, ()> {
let block_size = 1 << self.file_pool.block_nbit;
let msize = std::mem::size_of::<WALRingBlob>() as u32;
@@ -321,7 +328,12 @@ impl<F: WALStore> WALLoader<F> {
let f = self.file_pool.get_file(fid, false)?;
let mut off = 0;
while let Some(header_raw) = f.read(off, msize as usize)? {
- if block_size - (off & (block_size - 1)) <= msize as u64 { break }
+ let block_remain = block_size - (off & (block_size - 1));
+ if block_remain <= msize as u64 {
+ off += block_remain;
+ continue
+ }
+ let ringid_start = (fid << self.file_pool.file_nbit) | off;
off += msize as u64;
let header = unsafe {
std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
@@ -331,30 +343,36 @@ impl<F: WALStore> WALLoader<F> {
assert!(chunks.is_none());
let payload = f.read(off, rsize as usize)?.ok_or(())?;
off += rsize as u64;
- self.file_pool.store.apply_payload(payload)?;
+ self.file_pool.store.apply_payload(
+ payload,
+ ringid_start)?;
},
WALRingType::First => {
assert!(chunks.is_none());
- chunks = Some(vec![f.read(off, rsize as usize)?.ok_or(())?]);
+ chunks = Some((vec![f.read(off, rsize as usize)?.ok_or(())?], ringid_start));
off += rsize as u64;
},
WALRingType::Middle => {
- chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?);
+ if let Some((chunks, _)) = &mut chunks {
+ chunks.push(f.read(off, rsize as usize)?.ok_or(())?);
+ } // otherwise ignore the leftover
off += rsize as u64;
},
WALRingType::Last => {
- chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?);
+ if let Some((mut chunks, ringid_start)) = chunks.take() {
+ chunks.push(f.read(off, rsize as usize)?.ok_or(())?);
+ let mut payload = Vec::new();
+ payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
+ let mut ps = &mut payload[..];
+ for c in chunks {
+ ps[..c.len()].copy_from_slice(&*c);
+ ps = &mut ps[c.len()..];
+ }
+ self.file_pool.store.apply_payload(
+ payload.into_boxed_slice(),
+ ringid_start)?;
+ } // otherwise ignore the leftover
off += rsize as u64;
-
- let _chunks = chunks.take().unwrap();
- let mut payload = Vec::new();
- payload.resize(_chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
- let mut ps = &mut payload[..];
- for c in _chunks {
- ps[..c.len()].copy_from_slice(&*c);
- ps = &mut ps[c.len()..];
- }
- self.file_pool.store.apply_payload(payload.into_boxed_slice())?;
},
WALRingType::Null => break,
}
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index 9572c5c..ce27e16 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -2,7 +2,7 @@
#[allow(dead_code)]
extern crate growthring;
-use growthring::wal::{WALFile, WALStore, WALPos, WALBytes};
+use growthring::wal::{WALFile, WALStore, WALPos, WALBytes, WALRingId};
use indexmap::{IndexMap, map::Entry};
use rand::Rng;
use std::collections::{HashMap, hash_map};
@@ -122,6 +122,7 @@ impl<'a, G: 'static + FailGen> WALStore for WALStoreEmul<'a, G> {
}
fn remove_file(&mut self, filename: &str) -> Result<(), ()> {
+ println!("remove {}", filename);
if self.fgen.next_fail() { return Err(()) }
self.state.files.remove(filename).ok_or(()).and_then(|_| Ok(()))
}
@@ -135,9 +136,11 @@ impl<'a, G: 'static + FailGen> WALStore for WALStoreEmul<'a, G> {
Ok(logfiles.into_iter())
}
- fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()> {
+ fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()> {
if self.fgen.next_fail() { return Err(()) }
- println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap());
+ println!("apply_payload(payload=0x{}, wal_off={})",
+ hex::encode(&payload),
+ wal_off);
Ok(())
}
}
@@ -211,17 +214,13 @@ impl PaintStrokes {
PaintStrokes(res)
}
- pub fn gen_rand<R: rand::Rng>(min_pos: u32, max_pos: u32, max_col: u32, n: usize, rng: &mut R) -> PaintStrokes {
+ pub fn gen_rand<R: rand::Rng>(max_pos: u32, max_len: u32, max_col: u32, n: usize, rng: &mut R) -> PaintStrokes {
+ assert!(max_pos > 0);
let mut strokes = Self::new();
for _ in 0..n {
- let mut s = 0;
- let mut e = 0;
- while s == e {
- s = rng.gen_range(min_pos, max_pos);
- e = rng.gen_range(min_pos, max_pos);
- }
- if s > e { std::mem::swap(&mut s, &mut e) }
- strokes.stroke(s, e, rng.gen_range(0, max_col))
+ let pos = rng.gen_range(0, max_pos);
+ let len = rng.gen_range(1, max_pos - pos + 1);
+ strokes.stroke(pos, pos + len, rng.gen_range(0, max_col))
}
strokes
}
@@ -249,8 +248,8 @@ fn test_paint_strokes() {
}
pub struct Canvas {
- waiting: HashMap<WALPos, usize>,
- queue: IndexMap<u32, VecDeque<(u32, WALPos)>>,
+ waiting: HashMap<WALRingId, usize>,
+ queue: IndexMap<u32, VecDeque<(u32, WALRingId)>>,
canvas: Box<[u32]>
}
@@ -267,36 +266,36 @@ impl Canvas {
}
}
- fn get_waiting(&mut self, sid: WALPos) -> &mut usize {
- match self.waiting.entry(sid) {
+ fn get_waiting(&mut self, rid: WALRingId) -> &mut usize {
+ match self.waiting.entry(rid) {
hash_map::Entry::Occupied(e) => e.into_mut(),
hash_map::Entry::Vacant(e) => e.insert(0)
}
}
- fn get_queued(&mut self, pos: u32) -> &mut VecDeque<(u32, WALPos)> {
+ fn get_queued(&mut self, pos: u32) -> &mut VecDeque<(u32, WALRingId)> {
match self.queue.entry(pos) {
Entry::Occupied(e) => e.into_mut(),
Entry::Vacant(e) => e.insert(VecDeque::new())
}
}
- pub fn prepaint(&mut self, strokes: &PaintStrokes, sid: &WALPos) {
- let sid = *sid;
+ pub fn prepaint(&mut self, strokes: &PaintStrokes, rid: &WALRingId) {
+ let rid = *rid;
let mut nwait = 0;
for (s, e, c) in strokes.0.iter() {
for i in *s..*e {
nwait += 1;
- self.get_queued(i).push_back((*c, sid))
+ self.get_queued(i).push_back((*c, rid))
}
}
- *self.get_waiting(sid) += nwait
+ *self.get_waiting(rid) = nwait
}
// TODO: allow customized scheduler
/// Schedule to paint one position, randomly. It optionally returns a finished batch write
/// identified by its start position of WALRingId.
- pub fn rand_paint<R: rand::Rng>(&mut self, rng: &mut R) -> Option<(Option<WALPos>, u32)> {
+ pub fn rand_paint<R: rand::Rng>(&mut self, rng: &mut R) -> Option<(Option<WALRingId>, u32)> {
if self.queue.is_empty() { return None }
let idx = rng.gen_range(0, self.queue.len());
let (pos, _) = self.queue.get_index_mut(idx).unwrap();
@@ -304,15 +303,15 @@ impl Canvas {
Some((self.paint(pos), pos))
}
- pub fn paint(&mut self, pos: u32) -> Option<WALPos> {
+ pub fn paint(&mut self, pos: u32) -> Option<WALRingId> {
let q = self.queue.get_mut(&pos).unwrap();
- let (c, sid) = q.pop_front().unwrap();
+ let (c, rid) = q.pop_front().unwrap();
if q.is_empty() { self.queue.remove(&pos); }
self.canvas[pos as usize] = c;
- let cnt = self.waiting.get_mut(&sid).unwrap();
+ let cnt = self.waiting.get_mut(&rid).unwrap();
*cnt -= 1;
if *cnt == 0 {
- Some(sid)
+ Some(rid)
} else { None }
}
@@ -335,17 +334,18 @@ fn test_canvas() {
let mut canvas1 = Canvas::new(100);
let mut canvas2 = Canvas::new(100);
let canvas3 = Canvas::new(101);
+ let dummy = WALRingId::empty_id();
let (s1, s2) = RNG.with(|rng| {
let rng = &mut *rng.borrow_mut();
- (PaintStrokes::gen_rand(0, 100, 256, 2, rng),
- PaintStrokes::gen_rand(0, 100, 256, 2, rng))
+ (PaintStrokes::gen_rand(100, 10, 256, 2, rng),
+ PaintStrokes::gen_rand(100, 10, 256, 2, rng))
});
assert!(canvas1.is_same(&canvas2));
assert!(!canvas2.is_same(&canvas3));
- canvas1.prepaint(&s1, &0);
- canvas1.prepaint(&s2, &0);
- canvas2.prepaint(&s1, &0);
- canvas2.prepaint(&s2, &0);
+ canvas1.prepaint(&s1, &dummy);
+ canvas1.prepaint(&s2, &dummy);
+ canvas2.prepaint(&s1, &dummy);
+ canvas2.prepaint(&s2, &dummy);
assert!(canvas1.is_same(&canvas2));
RNG.with(|rng| canvas1.rand_paint(&mut *rng.borrow_mut()));
assert!(!canvas1.is_same(&canvas2));
diff --git a/tests/rand_fail.rs b/tests/rand_fail.rs
index b4e30ac..0cd519c 100644
--- a/tests/rand_fail.rs
+++ b/tests/rand_fail.rs
@@ -1,26 +1,61 @@
#[cfg(test)]
-extern crate growthring;
-use growthring::wal::{WALLoader, WALWriter, WALStore, WALRingId, WALBytes};
-
mod common;
+use growthring::wal::{WALLoader, WALWriter, WALStore, WALRingId, WALBytes, WALPos};
+use common::{FailGen, SingleFailGen, Canvas, WALStoreEmulState, WALStoreEmul, PaintStrokes};
-fn test<S: WALStore>(records: Vec<String>, wal: &mut WALWriter<S>) -> Box<[WALRingId]> {
- let records: Vec<WALBytes> = records.into_iter().map(|s| s.into_bytes().into_boxed_slice()).collect();
- let ret = wal.grow(&records).unwrap();
- for ring_id in ret.iter() {
- println!("got ring id: {:?}", ring_id);
+fn run<F: WALStore, R: rand::Rng>(n: usize, m: usize, k: usize,
+ canvas: &mut Canvas, wal: &mut WALWriter<F>, trace: &mut Vec<u32>,
+ rng: &mut R) -> Result<(), ()> {
+ for i in 0..n {
+ let s = (0..m).map(|_|
+ PaintStrokes::gen_rand(1000, 10, 256, 5, rng)).collect::<Vec<PaintStrokes>>();
+ let recs = s.iter().map(|e| e.to_bytes()).collect::<Vec<WALBytes>>();
+ // write ahead
+ let rids = wal.grow(recs)?;
+ for rid in rids.iter() {
+ println!("got ring id: {:?}", rid);
+ }
+ // WAL append done
+ // prepare data writes
+ for (e, rid) in s.iter().zip(rids.iter()) {
+ canvas.prepaint(e, &*rid)
+ }
+ // run the scheduler for a bit
+ for _ in 0..k {
+ if let Some((fin_rid, t)) = canvas.rand_paint(rng) {
+ if let Some(rid) = fin_rid {
+ wal.peel(&[rid])?
+ }
+ trace.push(t);
+ } else { break }
+ }
+ }
+ while let Some((fin_rid, t)) = canvas.rand_paint(rng) {
+ if let Some(rid) = fin_rid {
+ wal.peel(&[rid])?
+ }
+ trace.push(t);
}
- ret
+ canvas.print(40);
+ Ok(())
+}
+
+fn check<F: WALStore>(canvas: &mut Canvas, wal: &mut WALLoader<F>, trace: &Vec<u8>) -> bool {
+ true
}
#[test]
fn test_rand_fail() {
- let fgen = common::SingleFailGen::new(100);
- let mut state = common::WALStoreEmulState::new();
- let mut wal = WALLoader::new(common::WALStoreEmul::new(&mut state, fgen), 9, 8, 1000).recover().unwrap();
- for _ in 0..3 {
- test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::<Vec<String>>(), &mut wal);
- }
+ let fgen = SingleFailGen::new(100000);
+ let n = 100;
+ let m = 10;
+ let k = 100;
+ let mut rng = rand::thread_rng();
+ let mut state = WALStoreEmulState::new();
+ let mut wal = WALLoader::new(WALStoreEmul::new(&mut state, fgen), 9, 8, 1000).recover().unwrap();
+ let mut trace: Vec<u32> = Vec::new();
+ let mut canvas = Canvas::new(1000);
+ run(n, m, k, &mut canvas, &mut wal, &mut trace, &mut rng).unwrap();
WALLoader::new(common::WALStoreEmul::new(&mut state, common::ZeroFailGen), 9, 8, 1000).recover().unwrap();
}