aboutsummaryrefslogtreecommitdiff
path: root/src/wal.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wal.rs')
-rw-r--r--src/wal.rs79
1 files changed, 42 insertions, 37 deletions
diff --git a/src/wal.rs b/src/wal.rs
index b5b3e91..bcc7a19 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -60,25 +60,25 @@ pub trait WALFile {
/// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed
/// by most OS). Additionally, the final write caused by each invocation of this function
/// should be _atomic_ (the entire single write should be all or nothing).
- fn write(&self, offset: WALPos, data: WALBytes);
+ fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
/// Read data with offset.
- fn read(&self, offset: WALPos, length: usize) -> Option<WALBytes>;
+ fn read(&self, offset: WALPos, length: usize) -> Result<Option<WALBytes>, ()>;
}
pub trait WALStore {
type FileNameIter: Iterator<Item = String>;
/// Open a file given the filename, create the file if not exists when `touch` is `true`.
- fn open_file(&mut self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
+ fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()>;
/// Unlink a file given the filename.
fn remove_file(&mut self, filename: &str) -> Result<(), ()>;
/// Enumerate all WAL filenames. It should include all WAL files that are previously opened
/// (created) but not removed. The list could be unordered.
- fn enumerate_files(&self) -> Self::FileNameIter;
- /// Apply the payload during recovery. This notifies the application should redo the given
- /// operation to ensure its state is consistent (the major goal of having a WAL). We assume
- /// the application applies the payload by the _order_ of this callback invocation.
- fn apply_payload(&mut self, payload: WALBytes);
+ fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>;
+ /// Apply the payload during recovery. An invocation of the callback waits the application for
+ /// redoing the given operation to ensure its state is consistent. We assume the necessary
+ /// changes by the payload has already been persistent when the callback returns.
+ fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()>;
}
/// The middle layer that manages WAL file handles and invokes public trait functions to actually
@@ -108,15 +108,15 @@ impl<F: WALStore> WALFilePool<F> {
format!("{:08x}.log", fid)
}
- fn get_file(&mut self, fid: u64, touch: bool) -> &'static dyn WALFile {
+ fn get_file(&mut self, fid: u64, touch: bool) -> Result<&'static dyn WALFile, ()> {
let h = match self.handles.get(&fid) {
Some(h) => &**h,
None => {
- self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch).unwrap());
+ self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch)?);
&**self.handles.get(&fid).unwrap()
}
};
- unsafe {&*(h as *const dyn WALFile)}
+ Ok(unsafe {&*(h as *const dyn WALFile)})
}
fn get_fid(&mut self, fname: &str) -> WALFileId {
@@ -124,17 +124,20 @@ impl<F: WALStore> WALFilePool<F> {
}
// TODO: evict stale handles
- fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) {
+ fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> {
// pre-allocate the file space
let mut fid = writes[0].0 >> self.file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
let mut alloc_end = alloc_start + writes[0].1.len() as u64;
- let mut h = self.get_file(fid, true);
- for (off, w) in &writes[1..] {
+ let files = writes.iter().map(|(off, _)|
+ self.get_file((*off) >> self.file_nbit, true)).collect::<Result<Vec<&dyn WALFile>, ()>>()?;
+ // prepare file handles
+ let mut last_h = files[0];
+ for ((off, w), h) in writes[1..].iter().zip(files[1..].iter()) {
let next_fid = off >> self.file_nbit;
if next_fid != fid {
- h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
- h = self.get_file(next_fid, true);
+ last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?;
+ last_h = *h;
alloc_start = 0;
alloc_end = alloc_start + w.len() as u64;
fid = next_fid;
@@ -142,10 +145,11 @@ impl<F: WALStore> WALFilePool<F> {
alloc_end += w.len() as u64;
}
}
- h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
+ last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?;
for (off, w) in writes.into_iter() {
- self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w);
+ self.get_file(off >> self.file_nbit, true)?.write(off & (self.file_size - 1), w)?;
}
+ Ok(())
}
fn remove_file(&mut self, fid: u64) -> Result<(), ()> {
@@ -186,7 +190,7 @@ impl<F: WALStore> WALWriter<F> {
/// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
/// function returns. The caller then has the knowledge of WAL writes so it should defer
/// actual data writes after WAL writes.
- pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> Box<[WALRingId]> {
+ pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> Result<Box<[WALRingId]>, ()> {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = std::mem::size_of::<WALRingBlob>() as u32;
@@ -257,13 +261,13 @@ impl<F: WALStore> WALWriter<F> {
.to_vec().into_boxed_slice()));
self.state.next += (bbuff_cur - bbuff_start) as u64;
}
- self.file_pool.write(writes);
- res.into_boxed_slice()
+ self.file_pool.write(writes)?;
+ Ok(res.into_boxed_slice())
}
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
/// complete so that it could automatically remove obsolete WAL files.
- pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) {
+ pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) -> Result<(), ()> {
let msize = std::mem::size_of::<WALRingBlob>() as u64;
let block_size = self.block_size as u64;
for rec in records.as_ref() {
@@ -283,9 +287,10 @@ impl<F: WALStore> WALWriter<F> {
}
let next_fid = self.next_complete >> self.state.file_nbit;
for fid in orig_fid..next_fid {
- self.file_pool.remove_file(fid).unwrap();
+ self.file_pool.remove_file(fid)?;
}
self.state.first_fid = next_fid;
+ Ok(())
}
}
@@ -301,20 +306,20 @@ impl<F: WALStore> WALLoader<F> {
WALLoader{ file_pool, filename_fmt }
}
- pub fn recover(mut self) -> WALWriter<F> {
+ pub fn recover(mut self) -> Result<WALWriter<F>, ()> {
let block_size = 1 << self.file_pool.block_nbit;
let msize = std::mem::size_of::<WALRingBlob>() as u32;
let mut logfiles: Vec<String> = self.file_pool.store
- .enumerate_files()
+ .enumerate_files()?
.filter(|f| self.filename_fmt.is_match(f)).collect();
// TODO: check for missing logfiles
logfiles.sort();
let mut chunks = None;
for fname in logfiles.iter() {
let fid = self.file_pool.get_fid(fname);
- let f = self.file_pool.get_file(fid, false);
+ let f = self.file_pool.get_file(fid, false)?;
let mut off = 0;
- while let Some(header_raw) = f.read(off, msize as usize) {
+ while let Some(header_raw) = f.read(off, msize as usize)? {
if block_size - (off & (block_size - 1)) <= msize as u64 { break }
off += msize as u64;
let header = unsafe {
@@ -323,21 +328,21 @@ impl<F: WALStore> WALLoader<F> {
match header.rtype {
WALRingType::Full => {
assert!(chunks.is_none());
- let payload = f.read(off, rsize as usize).unwrap();
+ let payload = f.read(off, rsize as usize)?.ok_or(())?;
off += rsize as u64;
- self.file_pool.store.apply_payload(payload);
+ self.file_pool.store.apply_payload(payload)?;
},
WALRingType::First => {
assert!(chunks.is_none());
- chunks = Some(vec![f.read(off, rsize as usize).unwrap()]);
+ chunks = Some(vec![f.read(off, rsize as usize)?.ok_or(())?]);
off += rsize as u64;
},
WALRingType::Middle => {
- chunks.as_mut().unwrap().push(f.read(off, rsize as usize).unwrap());
+ chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?);
off += rsize as u64;
},
WALRingType::Last => {
- chunks.as_mut().unwrap().push(f.read(off, rsize as usize).unwrap());
+ chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?);
off += rsize as u64;
let _chunks = chunks.take().unwrap();
@@ -348,19 +353,19 @@ impl<F: WALStore> WALLoader<F> {
ps[..c.len()].copy_from_slice(&*c);
ps = &mut ps[c.len()..];
}
- self.file_pool.store.apply_payload(payload.into_boxed_slice());
+ self.file_pool.store.apply_payload(payload.into_boxed_slice())?;
},
WALRingType::Null => break,
}
}
- f.truncate(0).unwrap();
- self.file_pool.remove_file(fid).unwrap();
+ f.truncate(0)?;
+ self.file_pool.remove_file(fid)?;
}
self.file_pool.reset();
- WALWriter::new(WALState {
+ Ok(WALWriter::new(WALState {
first_fid: 0,
next: 0,
file_nbit: self.file_pool.file_nbit,
- }, self.file_pool)
+ }, self.file_pool))
}
}