diff options
Diffstat (limited to 'src/wal.rs')
-rw-r--r-- | src/wal.rs | 79 |
1 files changed, 42 insertions, 37 deletions
@@ -60,25 +60,25 @@ pub trait WALFile { /// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed /// by most OS). Additionally, the final write caused by each invocation of this function /// should be _atomic_ (the entire single write should be all or nothing). - fn write(&self, offset: WALPos, data: WALBytes); + fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>; /// Read data with offset. - fn read(&self, offset: WALPos, length: usize) -> Option<WALBytes>; + fn read(&self, offset: WALPos, length: usize) -> Result<Option<WALBytes>, ()>; } pub trait WALStore { type FileNameIter: Iterator<Item = String>; /// Open a file given the filename, create the file if not exists when `touch` is `true`. - fn open_file(&mut self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>; + fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()>; /// Unlink a file given the filename. fn remove_file(&mut self, filename: &str) -> Result<(), ()>; /// Enumerate all WAL filenames. It should include all WAL files that are previously opened /// (created) but not removed. The list could be unordered. - fn enumerate_files(&self) -> Self::FileNameIter; - /// Apply the payload during recovery. This notifies the application should redo the given - /// operation to ensure its state is consistent (the major goal of having a WAL). We assume - /// the application applies the payload by the _order_ of this callback invocation. - fn apply_payload(&mut self, payload: WALBytes); + fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>; + /// Apply the payload during recovery. An invocation of the callback waits the application for + /// redoing the given operation to ensure its state is consistent. We assume the necessary + /// changes by the payload has already been persistent when the callback returns. + fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()>; } /// The middle layer that manages WAL file handles and invokes public trait functions to actually @@ -108,15 +108,15 @@ impl<F: WALStore> WALFilePool<F> { format!("{:08x}.log", fid) } - fn get_file(&mut self, fid: u64, touch: bool) -> &'static dyn WALFile { + fn get_file(&mut self, fid: u64, touch: bool) -> Result<&'static dyn WALFile, ()> { let h = match self.handles.get(&fid) { Some(h) => &**h, None => { - self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch).unwrap()); + self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch)?); &**self.handles.get(&fid).unwrap() } }; - unsafe {&*(h as *const dyn WALFile)} + Ok(unsafe {&*(h as *const dyn WALFile)}) } fn get_fid(&mut self, fname: &str) -> WALFileId { @@ -124,17 +124,20 @@ impl<F: WALStore> WALFilePool<F> { } // TODO: evict stale handles - fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) { + fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> { // pre-allocate the file space let mut fid = writes[0].0 >> self.file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); let mut alloc_end = alloc_start + writes[0].1.len() as u64; - let mut h = self.get_file(fid, true); - for (off, w) in &writes[1..] { + let files = writes.iter().map(|(off, _)| + self.get_file((*off) >> self.file_nbit, true)).collect::<Result<Vec<&dyn WALFile>, ()>>()?; + // prepare file handles + let mut last_h = files[0]; + for ((off, w), h) in writes[1..].iter().zip(files[1..].iter()) { let next_fid = off >> self.file_nbit; if next_fid != fid { - h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap(); - h = self.get_file(next_fid, true); + last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?; + last_h = *h; alloc_start = 0; alloc_end = alloc_start + w.len() as u64; fid = next_fid; @@ -142,10 +145,11 @@ impl<F: WALStore> WALFilePool<F> { alloc_end += w.len() as u64; } } - h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap(); + last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?; for (off, w) in writes.into_iter() { - self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w); + self.get_file(off >> self.file_nbit, true)?.write(off & (self.file_size - 1), w)?; } + Ok(()) } fn remove_file(&mut self, fid: u64) -> Result<(), ()> { @@ -186,7 +190,7 @@ impl<F: WALStore> WALWriter<F> { /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the /// function returns. The caller then has the knowledge of WAL writes so it should defer /// actual data writes after WAL writes. - pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> Box<[WALRingId]> { + pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> Result<Box<[WALRingId]>, ()> { let mut res = Vec::new(); let mut writes = Vec::new(); let msize = std::mem::size_of::<WALRingBlob>() as u32; @@ -257,13 +261,13 @@ impl<F: WALStore> WALWriter<F> { .to_vec().into_boxed_slice())); self.state.next += (bbuff_cur - bbuff_start) as u64; } - self.file_pool.write(writes); - res.into_boxed_slice() + self.file_pool.write(writes)?; + Ok(res.into_boxed_slice()) } /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are /// complete so that it could automatically remove obsolete WAL files. - pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) { + pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) -> Result<(), ()> { let msize = std::mem::size_of::<WALRingBlob>() as u64; let block_size = self.block_size as u64; for rec in records.as_ref() { @@ -283,9 +287,10 @@ impl<F: WALStore> WALWriter<F> { } let next_fid = self.next_complete >> self.state.file_nbit; for fid in orig_fid..next_fid { - self.file_pool.remove_file(fid).unwrap(); + self.file_pool.remove_file(fid)?; } self.state.first_fid = next_fid; + Ok(()) } } @@ -301,20 +306,20 @@ impl<F: WALStore> WALLoader<F> { WALLoader{ file_pool, filename_fmt } } - pub fn recover(mut self) -> WALWriter<F> { + pub fn recover(mut self) -> Result<WALWriter<F>, ()> { let block_size = 1 << self.file_pool.block_nbit; let msize = std::mem::size_of::<WALRingBlob>() as u32; let mut logfiles: Vec<String> = self.file_pool.store - .enumerate_files() + .enumerate_files()? .filter(|f| self.filename_fmt.is_match(f)).collect(); // TODO: check for missing logfiles logfiles.sort(); let mut chunks = None; for fname in logfiles.iter() { let fid = self.file_pool.get_fid(fname); - let f = self.file_pool.get_file(fid, false); + let f = self.file_pool.get_file(fid, false)?; let mut off = 0; - while let Some(header_raw) = f.read(off, msize as usize) { + while let Some(header_raw) = f.read(off, msize as usize)? { if block_size - (off & (block_size - 1)) <= msize as u64 { break } off += msize as u64; let header = unsafe { @@ -323,21 +328,21 @@ impl<F: WALStore> WALLoader<F> { match header.rtype { WALRingType::Full => { assert!(chunks.is_none()); - let payload = f.read(off, rsize as usize).unwrap(); + let payload = f.read(off, rsize as usize)?.ok_or(())?; off += rsize as u64; - self.file_pool.store.apply_payload(payload); + self.file_pool.store.apply_payload(payload)?; }, WALRingType::First => { assert!(chunks.is_none()); - chunks = Some(vec![f.read(off, rsize as usize).unwrap()]); + chunks = Some(vec![f.read(off, rsize as usize)?.ok_or(())?]); off += rsize as u64; }, WALRingType::Middle => { - chunks.as_mut().unwrap().push(f.read(off, rsize as usize).unwrap()); + chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?); off += rsize as u64; }, WALRingType::Last => { - chunks.as_mut().unwrap().push(f.read(off, rsize as usize).unwrap()); + chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?); off += rsize as u64; let _chunks = chunks.take().unwrap(); @@ -348,19 +353,19 @@ impl<F: WALStore> WALLoader<F> { ps[..c.len()].copy_from_slice(&*c); ps = &mut ps[c.len()..]; } - self.file_pool.store.apply_payload(payload.into_boxed_slice()); + self.file_pool.store.apply_payload(payload.into_boxed_slice())?; }, WALRingType::Null => break, } } - f.truncate(0).unwrap(); - self.file_pool.remove_file(fid).unwrap(); + f.truncate(0)?; + self.file_pool.remove_file(fid)?; } self.file_pool.reset(); - WALWriter::new(WALState { + Ok(WALWriter::new(WALState { first_fid: 0, next: 0, file_nbit: self.file_pool.file_nbit, - }, self.file_pool) + }, self.file_pool)) } } |