diff options
author | Determinant <[email protected]> | 2020-06-11 00:48:39 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-11 00:48:39 -0400 |
commit | c7d621279743275a8766fa253268fb7a8e1c90ca (patch) | |
tree | 9460a7e4eb02eb4fcfc4771b81b2f80139bb565d /src/wal.rs | |
parent | 350f2638cb3e15df231381b8fd286595058b7a54 (diff) |
finish the first random test code
Diffstat (limited to 'src/wal.rs')
-rw-r--r-- | src/wal.rs | 53 |
1 files changed, 31 insertions, 22 deletions
@@ -84,7 +84,7 @@ pub trait WALStore { /// Apply the payload during recovery. An invocation of the callback waits the application for /// redoing the given operation to ensure its state is consistent. We assume the necessary /// changes by the payload has already been persistent when the callback returns. - fn apply_payload(&mut self, payload: WALBytes, wal_off: WALPos) -> Result<(), ()>; + fn apply_payload(&mut self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()>; } /// The middle layer that manages WAL file handles and invokes public trait functions to actually @@ -301,31 +301,33 @@ impl<F: WALStore> WALWriter<F> { } } -pub struct WALLoader<F: WALStore> { - file_pool: WALFilePool<F>, +pub struct WALLoader { + file_nbit: u8, + block_nbit: u8, + cache_size: usize, filename_fmt: regex::Regex } -impl<F: WALStore> WALLoader<F> { - pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { - let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size); +impl WALLoader { + pub fn new(file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap(); - WALLoader{ file_pool, filename_fmt } + WALLoader{ file_nbit, block_nbit, cache_size, filename_fmt } } /// Recover by reading the WAL log files. - pub fn recover(mut self) -> Result<WALWriter<F>, ()> { - let block_size = 1 << self.file_pool.block_nbit; + pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> { + let mut file_pool = WALFilePool::new(store, self.file_nbit, self.block_nbit, self.cache_size); + let block_size = 1 << file_pool.block_nbit; let msize = std::mem::size_of::<WALRingBlob>() as u32; - let mut logfiles: Vec<String> = self.file_pool.store + let mut logfiles: Vec<String> = file_pool.store .enumerate_files()? .filter(|f| self.filename_fmt.is_match(f)).collect(); // TODO: check for missing logfiles logfiles.sort(); let mut chunks = None; for fname in logfiles.iter() { - let fid = self.file_pool.get_fid(fname); - let f = self.file_pool.get_file(fid, false)?; + let fid = file_pool.get_fid(fname); + let f = file_pool.get_file(fid, false)?; let mut off = 0; while let Some(header_raw) = f.read(off, msize as usize)? { let block_remain = block_size - (off & (block_size - 1)); @@ -333,7 +335,7 @@ impl<F: WALStore> WALLoader<F> { off += block_remain; continue } - let ringid_start = (fid << self.file_pool.file_nbit) | off; + let ringid_start = (fid << file_pool.file_nbit) | off; off += msize as u64; let header = unsafe { std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; @@ -343,9 +345,12 @@ impl<F: WALStore> WALLoader<F> { assert!(chunks.is_none()); let payload = f.read(off, rsize as usize)?.ok_or(())?; off += rsize as u64; - self.file_pool.store.apply_payload( + file_pool.store.apply_payload( payload, - ringid_start)?; + WALRingId { + start: ringid_start, + end: (fid << file_pool.file_nbit) | off + })?; }, WALRingType::First => { assert!(chunks.is_none()); @@ -361,6 +366,7 @@ impl<F: WALStore> WALLoader<F> { WALRingType::Last => { if let Some((mut chunks, ringid_start)) = chunks.take() { chunks.push(f.read(off, rsize as usize)?.ok_or(())?); + off += rsize as u64; let mut payload = Vec::new(); payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0); let mut ps = &mut payload[..]; @@ -368,23 +374,26 @@ impl<F: WALStore> WALLoader<F> { ps[..c.len()].copy_from_slice(&*c); ps = &mut ps[c.len()..]; } - self.file_pool.store.apply_payload( + file_pool.store.apply_payload( payload.into_boxed_slice(), - ringid_start)?; + WALRingId { + start: ringid_start, + end: (fid << file_pool.file_nbit) | off + })?; } // otherwise ignore the leftover - off += rsize as u64; + else { off += rsize as u64; } }, WALRingType::Null => break, } } f.truncate(0)?; - self.file_pool.remove_file(fid)?; + file_pool.remove_file(fid)?; } - self.file_pool.reset(); + file_pool.reset(); Ok(WALWriter::new(WALState { first_fid: 0, next: 0, - file_nbit: self.file_pool.file_nbit, - }, self.file_pool)) + file_nbit: file_pool.file_nbit, + }, file_pool)) } } |