aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/demo1.rs53
-rw-r--r--src/wal.rs79
-rw-r--r--tests/common/mod.rs30
-rw-r--r--tests/rand_fail.rs6
4 files changed, 91 insertions, 77 deletions
diff --git a/examples/demo1.rs b/examples/demo1.rs
index e395adf..40f9562 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -13,15 +13,16 @@ struct WALFileTest {
}
impl WALFileTest {
- fn new(rootfd: RawFd, filename: &str) -> Self {
- let fd = openat(rootfd, filename,
+ fn new(rootfd: RawFd, filename: &str) -> Result<Self, ()> {
+ openat(rootfd, filename,
OFlag::O_CREAT | OFlag::O_RDWR,
- Mode::S_IRUSR | Mode::S_IWUSR).unwrap();
- let filename = filename.to_string();
- WALFileTest {
- filename,
- fd,
- }
+ Mode::S_IRUSR | Mode::S_IWUSR).and_then(|fd| {
+ let filename = filename.to_string();
+ Ok (WALFileTest {
+ filename,
+ fd,
+ })
+ }).or_else(|_| Err(()))
}
}
@@ -44,17 +45,20 @@ impl WALFile for WALFileTest {
ftruncate(self.fd, length as off_t).or_else(|_| Err(()))
}
- fn write(&self, offset: WALPos, data: WALBytes) {
+ fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
println!("{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})",
self.filename, offset, offset + data.len() as u64, hex::encode(&data));
- pwrite(self.fd, &*data, offset as off_t).unwrap();
+ pwrite(self.fd, &*data, offset as off_t)
+ .or_else(|_| Err(()))
+ .and_then(|nwrote| if nwrote == data.len() { Ok(()) } else { Err(()) })
}
- fn read(&self, offset: WALPos, length: usize) -> Option<WALBytes> {
+
+ fn read(&self, offset: WALPos, length: usize) -> Result<Option<WALBytes>, ()> {
let mut buff = Vec::new();
buff.resize(length, 0);
- if pread(self.fd, &mut buff[..], offset as off_t).unwrap() == length {
- Some(buff.into_boxed_slice())
- } else { None }
+ pread(self.fd, &mut buff[..], offset as off_t)
+ .or_else(|_| Err(()))
+ .and_then(|nread| Ok(if nread == length {Some(buff.into_boxed_slice())} else {None}))
}
}
@@ -90,10 +94,10 @@ impl Drop for WALStoreTest {
impl WALStore for WALStoreTest {
type FileNameIter = std::vec::IntoIter<String>;
- fn open_file(&mut self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>> {
+ fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()> {
println!("open_file(filename={}, touch={})", filename, touch);
let filename = filename.to_string();
- Some(Box::new(WALFileTest::new(self.rootfd, &filename)))
+ WALFileTest::new(self.rootfd, &filename).and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>))
}
fn remove_file(&mut self, filename: &str) -> Result<(), ()> {
@@ -101,23 +105,24 @@ impl WALStore for WALStoreTest {
unlinkat(Some(self.rootfd), filename, UnlinkatFlags::NoRemoveDir).or_else(|_| Err(()))
}
- fn enumerate_files(&self) -> Self::FileNameIter {
+ fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
println!("enumerate_files()");
let mut logfiles = Vec::new();
for fname in std::fs::read_dir(&self.rootpath).unwrap() {
logfiles.push(fname.unwrap().file_name().into_string().unwrap())
}
- logfiles.into_iter()
+ Ok(logfiles.into_iter())
}
- fn apply_payload(&mut self, payload: WALBytes) {
- println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap())
+ fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()> {
+ println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap());
+ Ok(())
}
}
fn test(records: Vec<String>, wal: &mut WALWriter<WALStoreTest>) -> Box<[WALRingId]> {
let records: Vec<WALBytes> = records.into_iter().map(|s| s.into_bytes().into_boxed_slice()).collect();
- let ret = wal.grow(&records);
+ let ret = wal.grow(&records).unwrap();
for ring_id in ret.iter() {
println!("got ring id: {:?}", ring_id);
}
@@ -127,7 +132,7 @@ fn test(records: Vec<String>, wal: &mut WALWriter<WALStoreTest>) -> Box<[WALRing
fn main() {
let mut rng = rand::thread_rng();
let store = WALStoreTest::new("./wal_demo1", true);
- let mut wal = WALLoader::new(store, 9, 8, 1000).recover();
+ let mut wal = WALLoader::new(store, 9, 8, 1000).recover().unwrap();
for _ in 0..3 {
test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::<Vec<String>>(), &mut wal);
}
@@ -136,13 +141,13 @@ fn main() {
}
let store = WALStoreTest::new("./wal_demo1", false);
- let mut wal = WALLoader::new(store, 9, 8, 1000).recover();
+ let mut wal = WALLoader::new(store, 9, 8, 1000).recover().unwrap();
for _ in 0..3 {
test(vec!["a".repeat(10), "b".repeat(100), "c".repeat(300), "d".repeat(400)], &mut wal);
}
let store = WALStoreTest::new("./wal_demo1", false);
- let mut wal = WALLoader::new(store, 9, 8, 1000).recover();
+ let mut wal = WALLoader::new(store, 9, 8, 1000).recover().unwrap();
for _ in 0..3 {
let mut ids = Vec::new();
for _ in 0..3 {
diff --git a/src/wal.rs b/src/wal.rs
index b5b3e91..bcc7a19 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -60,25 +60,25 @@ pub trait WALFile {
/// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed
/// by most OS). Additionally, the final write caused by each invocation of this function
/// should be _atomic_ (the entire single write should be all or nothing).
- fn write(&self, offset: WALPos, data: WALBytes);
+ fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
/// Read data with offset.
- fn read(&self, offset: WALPos, length: usize) -> Option<WALBytes>;
+ fn read(&self, offset: WALPos, length: usize) -> Result<Option<WALBytes>, ()>;
}
pub trait WALStore {
type FileNameIter: Iterator<Item = String>;
/// Open a file given the filename, create the file if not exists when `touch` is `true`.
- fn open_file(&mut self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
+ fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()>;
/// Unlink a file given the filename.
fn remove_file(&mut self, filename: &str) -> Result<(), ()>;
/// Enumerate all WAL filenames. It should include all WAL files that are previously opened
/// (created) but not removed. The list could be unordered.
- fn enumerate_files(&self) -> Self::FileNameIter;
- /// Apply the payload during recovery. This notifies the application should redo the given
- /// operation to ensure its state is consistent (the major goal of having a WAL). We assume
- /// the application applies the payload by the _order_ of this callback invocation.
- fn apply_payload(&mut self, payload: WALBytes);
+ fn enumerate_files(&self) -> Result<Self::FileNameIter, ()>;
+ /// Apply the payload during recovery. An invocation of the callback waits the application for
+ /// redoing the given operation to ensure its state is consistent. We assume the necessary
+ /// changes by the payload has already been persistent when the callback returns.
+ fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()>;
}
/// The middle layer that manages WAL file handles and invokes public trait functions to actually
@@ -108,15 +108,15 @@ impl<F: WALStore> WALFilePool<F> {
format!("{:08x}.log", fid)
}
- fn get_file(&mut self, fid: u64, touch: bool) -> &'static dyn WALFile {
+ fn get_file(&mut self, fid: u64, touch: bool) -> Result<&'static dyn WALFile, ()> {
let h = match self.handles.get(&fid) {
Some(h) => &**h,
None => {
- self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch).unwrap());
+ self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch)?);
&**self.handles.get(&fid).unwrap()
}
};
- unsafe {&*(h as *const dyn WALFile)}
+ Ok(unsafe {&*(h as *const dyn WALFile)})
}
fn get_fid(&mut self, fname: &str) -> WALFileId {
@@ -124,17 +124,20 @@ impl<F: WALStore> WALFilePool<F> {
}
// TODO: evict stale handles
- fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) {
+ fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> {
// pre-allocate the file space
let mut fid = writes[0].0 >> self.file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
let mut alloc_end = alloc_start + writes[0].1.len() as u64;
- let mut h = self.get_file(fid, true);
- for (off, w) in &writes[1..] {
+ let files = writes.iter().map(|(off, _)|
+ self.get_file((*off) >> self.file_nbit, true)).collect::<Result<Vec<&dyn WALFile>, ()>>()?;
+ // prepare file handles
+ let mut last_h = files[0];
+ for ((off, w), h) in writes[1..].iter().zip(files[1..].iter()) {
let next_fid = off >> self.file_nbit;
if next_fid != fid {
- h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
- h = self.get_file(next_fid, true);
+ last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?;
+ last_h = *h;
alloc_start = 0;
alloc_end = alloc_start + w.len() as u64;
fid = next_fid;
@@ -142,10 +145,11 @@ impl<F: WALStore> WALFilePool<F> {
alloc_end += w.len() as u64;
}
}
- h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
+ last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?;
for (off, w) in writes.into_iter() {
- self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w);
+ self.get_file(off >> self.file_nbit, true)?.write(off & (self.file_size - 1), w)?;
}
+ Ok(())
}
fn remove_file(&mut self, fid: u64) -> Result<(), ()> {
@@ -186,7 +190,7 @@ impl<F: WALStore> WALWriter<F> {
/// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
/// function returns. The caller then has the knowledge of WAL writes so it should defer
/// actual data writes after WAL writes.
- pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> Box<[WALRingId]> {
+ pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> Result<Box<[WALRingId]>, ()> {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = std::mem::size_of::<WALRingBlob>() as u32;
@@ -257,13 +261,13 @@ impl<F: WALStore> WALWriter<F> {
.to_vec().into_boxed_slice()));
self.state.next += (bbuff_cur - bbuff_start) as u64;
}
- self.file_pool.write(writes);
- res.into_boxed_slice()
+ self.file_pool.write(writes)?;
+ Ok(res.into_boxed_slice())
}
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
/// complete so that it could automatically remove obsolete WAL files.
- pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) {
+ pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) -> Result<(), ()> {
let msize = std::mem::size_of::<WALRingBlob>() as u64;
let block_size = self.block_size as u64;
for rec in records.as_ref() {
@@ -283,9 +287,10 @@ impl<F: WALStore> WALWriter<F> {
}
let next_fid = self.next_complete >> self.state.file_nbit;
for fid in orig_fid..next_fid {
- self.file_pool.remove_file(fid).unwrap();
+ self.file_pool.remove_file(fid)?;
}
self.state.first_fid = next_fid;
+ Ok(())
}
}
@@ -301,20 +306,20 @@ impl<F: WALStore> WALLoader<F> {
WALLoader{ file_pool, filename_fmt }
}
- pub fn recover(mut self) -> WALWriter<F> {
+ pub fn recover(mut self) -> Result<WALWriter<F>, ()> {
let block_size = 1 << self.file_pool.block_nbit;
let msize = std::mem::size_of::<WALRingBlob>() as u32;
let mut logfiles: Vec<String> = self.file_pool.store
- .enumerate_files()
+ .enumerate_files()?
.filter(|f| self.filename_fmt.is_match(f)).collect();
// TODO: check for missing logfiles
logfiles.sort();
let mut chunks = None;
for fname in logfiles.iter() {
let fid = self.file_pool.get_fid(fname);
- let f = self.file_pool.get_file(fid, false);
+ let f = self.file_pool.get_file(fid, false)?;
let mut off = 0;
- while let Some(header_raw) = f.read(off, msize as usize) {
+ while let Some(header_raw) = f.read(off, msize as usize)? {
if block_size - (off & (block_size - 1)) <= msize as u64 { break }
off += msize as u64;
let header = unsafe {
@@ -323,21 +328,21 @@ impl<F: WALStore> WALLoader<F> {
match header.rtype {
WALRingType::Full => {
assert!(chunks.is_none());
- let payload = f.read(off, rsize as usize).unwrap();
+ let payload = f.read(off, rsize as usize)?.ok_or(())?;
off += rsize as u64;
- self.file_pool.store.apply_payload(payload);
+ self.file_pool.store.apply_payload(payload)?;
},
WALRingType::First => {
assert!(chunks.is_none());
- chunks = Some(vec![f.read(off, rsize as usize).unwrap()]);
+ chunks = Some(vec![f.read(off, rsize as usize)?.ok_or(())?]);
off += rsize as u64;
},
WALRingType::Middle => {
- chunks.as_mut().unwrap().push(f.read(off, rsize as usize).unwrap());
+ chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?);
off += rsize as u64;
},
WALRingType::Last => {
- chunks.as_mut().unwrap().push(f.read(off, rsize as usize).unwrap());
+ chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?);
off += rsize as u64;
let _chunks = chunks.take().unwrap();
@@ -348,19 +353,19 @@ impl<F: WALStore> WALLoader<F> {
ps[..c.len()].copy_from_slice(&*c);
ps = &mut ps[c.len()..];
}
- self.file_pool.store.apply_payload(payload.into_boxed_slice());
+ self.file_pool.store.apply_payload(payload.into_boxed_slice())?;
},
WALRingType::Null => break,
}
}
- f.truncate(0).unwrap();
- self.file_pool.remove_file(fid).unwrap();
+ f.truncate(0)?;
+ self.file_pool.remove_file(fid)?;
}
self.file_pool.reset();
- WALWriter::new(WALState {
+ Ok(WALWriter::new(WALState {
first_fid: 0,
next: 0,
file_nbit: self.file_pool.file_nbit,
- }, self.file_pool)
+ }, self.file_pool))
}
}
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index fd98539..7a7e42a 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -32,9 +32,11 @@ impl std::ops::Deref for FileContentEmul {
fn deref(&self) -> &Self::Target {&self.0}
}
+type FailGen = std::iter::Iterator<Item = bool>;
+
/// Emulate the a virtual file handle.
pub struct WALFileEmul {
- file: Rc<FileContentEmul>
+ file: Rc<FileContentEmul>,
}
impl WALFile for WALFileEmul {
@@ -52,17 +54,18 @@ impl WALFile for WALFileEmul {
Ok(())
}
- fn write(&self, offset: WALPos, data: WALBytes) {
+ fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
let offset = offset as usize;
&self.file.borrow_mut()[offset..offset + data.len()].copy_from_slice(&data);
+ Ok(())
}
- fn read(&self, offset: WALPos, length: usize) -> Option<WALBytes> {
+ fn read(&self, offset: WALPos, length: usize) -> Result<Option<WALBytes>, ()> {
let offset = offset as usize;
let file = self.file.borrow();
- if offset + length > file.len() { None }
+ if offset + length > file.len() { Ok(None) }
else {
- Some((&file[offset..offset + length]).to_vec().into_boxed_slice())
+ Ok(Some((&file[offset..offset + length]).to_vec().into_boxed_slice()))
}
}
}
@@ -87,14 +90,14 @@ impl<'a> WALStoreEmul<'a> {
impl<'a> WALStore for WALStoreEmul<'a> {
type FileNameIter = std::vec::IntoIter<String>;
- fn open_file(&mut self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>> {
+ fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()> {
match self.0.files.entry(filename.to_string()) {
- Entry::Occupied(e) => Some(Box::new(WALFileEmul { file: e.get().clone() })),
+ Entry::Occupied(e) => Ok(Box::new(WALFileEmul { file: e.get().clone() })),
Entry::Vacant(e) => if touch {
- Some(Box::new(
+ Ok(Box::new(
WALFileEmul { file: e.insert(Rc::new(FileContentEmul::new())).clone() }))
} else {
- None
+ Err(())
}
}
}
@@ -103,15 +106,16 @@ impl<'a> WALStore for WALStoreEmul<'a> {
self.0.files.remove(filename).ok_or(()).and_then(|_| Ok(()))
}
- fn enumerate_files(&self) -> Self::FileNameIter {
+ fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
let mut logfiles = Vec::new();
for (fname, _) in self.0.files.iter() {
logfiles.push(fname.clone())
}
- logfiles.into_iter()
+ Ok(logfiles.into_iter())
}
- fn apply_payload(&mut self, payload: WALBytes) {
- println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap())
+ fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()> {
+ println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap());
+ Ok(())
}
}
diff --git a/tests/rand_fail.rs b/tests/rand_fail.rs
index 988da05..7499694 100644
--- a/tests/rand_fail.rs
+++ b/tests/rand_fail.rs
@@ -7,7 +7,7 @@ mod common;
fn test(records: Vec<String>, wal: &mut WALWriter<common::WALStoreEmul>) -> Box<[WALRingId]> {
let records: Vec<WALBytes> = records.into_iter().map(|s| s.into_bytes().into_boxed_slice()).collect();
- let ret = wal.grow(&records);
+ let ret = wal.grow(&records).unwrap();
for ring_id in ret.iter() {
println!("got ring id: {:?}", ring_id);
}
@@ -17,9 +17,9 @@ fn test(records: Vec<String>, wal: &mut WALWriter<common::WALStoreEmul>) -> Box<
#[test]
fn test_rand_fail() {
let mut state = common::WALStoreEmulState::new();
- let mut wal = WALLoader::new(common::WALStoreEmul::new(&mut state), 9, 8, 1000).recover();
+ let mut wal = WALLoader::new(common::WALStoreEmul::new(&mut state), 9, 8, 1000).recover().unwrap();
for _ in 0..3 {
test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::<Vec<String>>(), &mut wal);
}
- let mut wal = WALLoader::new(common::WALStoreEmul::new(&mut state), 9, 8, 1000).recover();
+ let mut wal = WALLoader::new(common::WALStoreEmul::new(&mut state), 9, 8, 1000).recover().unwrap();
}