From 617c515ec1834501d4835076de3adcfb495d8ef9 Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 10 Jun 2020 15:56:05 -0400 Subject: add basic error handling --- examples/demo1.rs | 53 +++++++++++++++++++---------------- src/wal.rs | 79 ++++++++++++++++++++++++++++------------------------- tests/common/mod.rs | 30 +++++++++++--------- tests/rand_fail.rs | 6 ++-- 4 files changed, 91 insertions(+), 77 deletions(-) diff --git a/examples/demo1.rs b/examples/demo1.rs index e395adf..40f9562 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -13,15 +13,16 @@ struct WALFileTest { } impl WALFileTest { - fn new(rootfd: RawFd, filename: &str) -> Self { - let fd = openat(rootfd, filename, + fn new(rootfd: RawFd, filename: &str) -> Result { + openat(rootfd, filename, OFlag::O_CREAT | OFlag::O_RDWR, - Mode::S_IRUSR | Mode::S_IWUSR).unwrap(); - let filename = filename.to_string(); - WALFileTest { - filename, - fd, - } + Mode::S_IRUSR | Mode::S_IWUSR).and_then(|fd| { + let filename = filename.to_string(); + Ok (WALFileTest { + filename, + fd, + }) + }).or_else(|_| Err(())) } } @@ -44,17 +45,20 @@ impl WALFile for WALFileTest { ftruncate(self.fd, length as off_t).or_else(|_| Err(())) } - fn write(&self, offset: WALPos, data: WALBytes) { + fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { println!("{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})", self.filename, offset, offset + data.len() as u64, hex::encode(&data)); - pwrite(self.fd, &*data, offset as off_t).unwrap(); + pwrite(self.fd, &*data, offset as off_t) + .or_else(|_| Err(())) + .and_then(|nwrote| if nwrote == data.len() { Ok(()) } else { Err(()) }) } - fn read(&self, offset: WALPos, length: usize) -> Option { + + fn read(&self, offset: WALPos, length: usize) -> Result, ()> { let mut buff = Vec::new(); buff.resize(length, 0); - if pread(self.fd, &mut buff[..], offset as off_t).unwrap() == length { - Some(buff.into_boxed_slice()) - } else { None } + pread(self.fd, &mut buff[..], offset as off_t) + .or_else(|_| Err(())) + .and_then(|nread| Ok(if nread == length {Some(buff.into_boxed_slice())} else {None})) } } @@ -90,10 +94,10 @@ impl Drop for WALStoreTest { impl WALStore for WALStoreTest { type FileNameIter = std::vec::IntoIter; - fn open_file(&mut self, filename: &str, touch: bool) -> Option> { + fn open_file(&mut self, filename: &str, touch: bool) -> Result, ()> { println!("open_file(filename={}, touch={})", filename, touch); let filename = filename.to_string(); - Some(Box::new(WALFileTest::new(self.rootfd, &filename))) + WALFileTest::new(self.rootfd, &filename).and_then(|f| Ok(Box::new(f) as Box)) } fn remove_file(&mut self, filename: &str) -> Result<(), ()> { @@ -101,23 +105,24 @@ impl WALStore for WALStoreTest { unlinkat(Some(self.rootfd), filename, UnlinkatFlags::NoRemoveDir).or_else(|_| Err(())) } - fn enumerate_files(&self) -> Self::FileNameIter { + fn enumerate_files(&self) -> Result { println!("enumerate_files()"); let mut logfiles = Vec::new(); for fname in std::fs::read_dir(&self.rootpath).unwrap() { logfiles.push(fname.unwrap().file_name().into_string().unwrap()) } - logfiles.into_iter() + Ok(logfiles.into_iter()) } - fn apply_payload(&mut self, payload: WALBytes) { - println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap()) + fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()> { + println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap()); + Ok(()) } } fn test(records: Vec, wal: &mut WALWriter) -> Box<[WALRingId]> { let records: Vec = records.into_iter().map(|s| s.into_bytes().into_boxed_slice()).collect(); - let ret = wal.grow(&records); + let ret = wal.grow(&records).unwrap(); for ring_id in ret.iter() { println!("got ring id: {:?}", ring_id); } @@ -127,7 +132,7 @@ fn test(records: Vec, wal: &mut WALWriter) -> Box<[WALRing fn main() { let mut rng = rand::thread_rng(); let store = WALStoreTest::new("./wal_demo1", true); - let mut wal = WALLoader::new(store, 9, 8, 1000).recover(); + let mut wal = WALLoader::new(store, 9, 8, 1000).recover().unwrap(); for _ in 0..3 { test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::>(), &mut wal); } @@ -136,13 +141,13 @@ fn main() { } let store = WALStoreTest::new("./wal_demo1", false); - let mut wal = WALLoader::new(store, 9, 8, 1000).recover(); + let mut wal = WALLoader::new(store, 9, 8, 1000).recover().unwrap(); for _ in 0..3 { test(vec!["a".repeat(10), "b".repeat(100), "c".repeat(300), "d".repeat(400)], &mut wal); } let store = WALStoreTest::new("./wal_demo1", false); - let mut wal = WALLoader::new(store, 9, 8, 1000).recover(); + let mut wal = WALLoader::new(store, 9, 8, 1000).recover().unwrap(); for _ in 0..3 { let mut ids = Vec::new(); for _ in 0..3 { diff --git a/src/wal.rs b/src/wal.rs index b5b3e91..bcc7a19 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -60,25 +60,25 @@ pub trait WALFile { /// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed /// by most OS). Additionally, the final write caused by each invocation of this function /// should be _atomic_ (the entire single write should be all or nothing). - fn write(&self, offset: WALPos, data: WALBytes); + fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>; /// Read data with offset. - fn read(&self, offset: WALPos, length: usize) -> Option; + fn read(&self, offset: WALPos, length: usize) -> Result, ()>; } pub trait WALStore { type FileNameIter: Iterator; /// Open a file given the filename, create the file if not exists when `touch` is `true`. - fn open_file(&mut self, filename: &str, touch: bool) -> Option>; + fn open_file(&mut self, filename: &str, touch: bool) -> Result, ()>; /// Unlink a file given the filename. fn remove_file(&mut self, filename: &str) -> Result<(), ()>; /// Enumerate all WAL filenames. It should include all WAL files that are previously opened /// (created) but not removed. The list could be unordered. - fn enumerate_files(&self) -> Self::FileNameIter; - /// Apply the payload during recovery. This notifies the application should redo the given - /// operation to ensure its state is consistent (the major goal of having a WAL). We assume - /// the application applies the payload by the _order_ of this callback invocation. - fn apply_payload(&mut self, payload: WALBytes); + fn enumerate_files(&self) -> Result; + /// Apply the payload during recovery. An invocation of the callback waits the application for + /// redoing the given operation to ensure its state is consistent. We assume the necessary + /// changes by the payload has already been persistent when the callback returns. + fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()>; } /// The middle layer that manages WAL file handles and invokes public trait functions to actually @@ -108,15 +108,15 @@ impl WALFilePool { format!("{:08x}.log", fid) } - fn get_file(&mut self, fid: u64, touch: bool) -> &'static dyn WALFile { + fn get_file(&mut self, fid: u64, touch: bool) -> Result<&'static dyn WALFile, ()> { let h = match self.handles.get(&fid) { Some(h) => &**h, None => { - self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch).unwrap()); + self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch)?); &**self.handles.get(&fid).unwrap() } }; - unsafe {&*(h as *const dyn WALFile)} + Ok(unsafe {&*(h as *const dyn WALFile)}) } fn get_fid(&mut self, fname: &str) -> WALFileId { @@ -124,17 +124,20 @@ impl WALFilePool { } // TODO: evict stale handles - fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) { + fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) -> Result<(), ()> { // pre-allocate the file space let mut fid = writes[0].0 >> self.file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); let mut alloc_end = alloc_start + writes[0].1.len() as u64; - let mut h = self.get_file(fid, true); - for (off, w) in &writes[1..] { + let files = writes.iter().map(|(off, _)| + self.get_file((*off) >> self.file_nbit, true)).collect::, ()>>()?; + // prepare file handles + let mut last_h = files[0]; + for ((off, w), h) in writes[1..].iter().zip(files[1..].iter()) { let next_fid = off >> self.file_nbit; if next_fid != fid { - h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap(); - h = self.get_file(next_fid, true); + last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?; + last_h = *h; alloc_start = 0; alloc_end = alloc_start + w.len() as u64; fid = next_fid; @@ -142,10 +145,11 @@ impl WALFilePool { alloc_end += w.len() as u64; } } - h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap(); + last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?; for (off, w) in writes.into_iter() { - self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w); + self.get_file(off >> self.file_nbit, true)?.write(off & (self.file_size - 1), w)?; } + Ok(()) } fn remove_file(&mut self, fid: u64) -> Result<(), ()> { @@ -186,7 +190,7 @@ impl WALWriter { /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the /// function returns. The caller then has the knowledge of WAL writes so it should defer /// actual data writes after WAL writes. - pub fn grow>(&mut self, records: T) -> Box<[WALRingId]> { + pub fn grow>(&mut self, records: T) -> Result, ()> { let mut res = Vec::new(); let mut writes = Vec::new(); let msize = std::mem::size_of::() as u32; @@ -257,13 +261,13 @@ impl WALWriter { .to_vec().into_boxed_slice())); self.state.next += (bbuff_cur - bbuff_start) as u64; } - self.file_pool.write(writes); - res.into_boxed_slice() + self.file_pool.write(writes)?; + Ok(res.into_boxed_slice()) } /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are /// complete so that it could automatically remove obsolete WAL files. - pub fn peel>(&mut self, records: T) { + pub fn peel>(&mut self, records: T) -> Result<(), ()> { let msize = std::mem::size_of::() as u64; let block_size = self.block_size as u64; for rec in records.as_ref() { @@ -283,9 +287,10 @@ impl WALWriter { } let next_fid = self.next_complete >> self.state.file_nbit; for fid in orig_fid..next_fid { - self.file_pool.remove_file(fid).unwrap(); + self.file_pool.remove_file(fid)?; } self.state.first_fid = next_fid; + Ok(()) } } @@ -301,20 +306,20 @@ impl WALLoader { WALLoader{ file_pool, filename_fmt } } - pub fn recover(mut self) -> WALWriter { + pub fn recover(mut self) -> Result, ()> { let block_size = 1 << self.file_pool.block_nbit; let msize = std::mem::size_of::() as u32; let mut logfiles: Vec = self.file_pool.store - .enumerate_files() + .enumerate_files()? .filter(|f| self.filename_fmt.is_match(f)).collect(); // TODO: check for missing logfiles logfiles.sort(); let mut chunks = None; for fname in logfiles.iter() { let fid = self.file_pool.get_fid(fname); - let f = self.file_pool.get_file(fid, false); + let f = self.file_pool.get_file(fid, false)?; let mut off = 0; - while let Some(header_raw) = f.read(off, msize as usize) { + while let Some(header_raw) = f.read(off, msize as usize)? { if block_size - (off & (block_size - 1)) <= msize as u64 { break } off += msize as u64; let header = unsafe { @@ -323,21 +328,21 @@ impl WALLoader { match header.rtype { WALRingType::Full => { assert!(chunks.is_none()); - let payload = f.read(off, rsize as usize).unwrap(); + let payload = f.read(off, rsize as usize)?.ok_or(())?; off += rsize as u64; - self.file_pool.store.apply_payload(payload); + self.file_pool.store.apply_payload(payload)?; }, WALRingType::First => { assert!(chunks.is_none()); - chunks = Some(vec![f.read(off, rsize as usize).unwrap()]); + chunks = Some(vec![f.read(off, rsize as usize)?.ok_or(())?]); off += rsize as u64; }, WALRingType::Middle => { - chunks.as_mut().unwrap().push(f.read(off, rsize as usize).unwrap()); + chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?); off += rsize as u64; }, WALRingType::Last => { - chunks.as_mut().unwrap().push(f.read(off, rsize as usize).unwrap()); + chunks.as_mut().unwrap().push(f.read(off, rsize as usize)?.ok_or(())?); off += rsize as u64; let _chunks = chunks.take().unwrap(); @@ -348,19 +353,19 @@ impl WALLoader { ps[..c.len()].copy_from_slice(&*c); ps = &mut ps[c.len()..]; } - self.file_pool.store.apply_payload(payload.into_boxed_slice()); + self.file_pool.store.apply_payload(payload.into_boxed_slice())?; }, WALRingType::Null => break, } } - f.truncate(0).unwrap(); - self.file_pool.remove_file(fid).unwrap(); + f.truncate(0)?; + self.file_pool.remove_file(fid)?; } self.file_pool.reset(); - WALWriter::new(WALState { + Ok(WALWriter::new(WALState { first_fid: 0, next: 0, file_nbit: self.file_pool.file_nbit, - }, self.file_pool) + }, self.file_pool)) } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index fd98539..7a7e42a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -32,9 +32,11 @@ impl std::ops::Deref for FileContentEmul { fn deref(&self) -> &Self::Target {&self.0} } +type FailGen = std::iter::Iterator; + /// Emulate the a virtual file handle. pub struct WALFileEmul { - file: Rc + file: Rc, } impl WALFile for WALFileEmul { @@ -52,17 +54,18 @@ impl WALFile for WALFileEmul { Ok(()) } - fn write(&self, offset: WALPos, data: WALBytes) { + fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { let offset = offset as usize; &self.file.borrow_mut()[offset..offset + data.len()].copy_from_slice(&data); + Ok(()) } - fn read(&self, offset: WALPos, length: usize) -> Option { + fn read(&self, offset: WALPos, length: usize) -> Result, ()> { let offset = offset as usize; let file = self.file.borrow(); - if offset + length > file.len() { None } + if offset + length > file.len() { Ok(None) } else { - Some((&file[offset..offset + length]).to_vec().into_boxed_slice()) + Ok(Some((&file[offset..offset + length]).to_vec().into_boxed_slice())) } } } @@ -87,14 +90,14 @@ impl<'a> WALStoreEmul<'a> { impl<'a> WALStore for WALStoreEmul<'a> { type FileNameIter = std::vec::IntoIter; - fn open_file(&mut self, filename: &str, touch: bool) -> Option> { + fn open_file(&mut self, filename: &str, touch: bool) -> Result, ()> { match self.0.files.entry(filename.to_string()) { - Entry::Occupied(e) => Some(Box::new(WALFileEmul { file: e.get().clone() })), + Entry::Occupied(e) => Ok(Box::new(WALFileEmul { file: e.get().clone() })), Entry::Vacant(e) => if touch { - Some(Box::new( + Ok(Box::new( WALFileEmul { file: e.insert(Rc::new(FileContentEmul::new())).clone() })) } else { - None + Err(()) } } } @@ -103,15 +106,16 @@ impl<'a> WALStore for WALStoreEmul<'a> { self.0.files.remove(filename).ok_or(()).and_then(|_| Ok(())) } - fn enumerate_files(&self) -> Self::FileNameIter { + fn enumerate_files(&self) -> Result { let mut logfiles = Vec::new(); for (fname, _) in self.0.files.iter() { logfiles.push(fname.clone()) } - logfiles.into_iter() + Ok(logfiles.into_iter()) } - fn apply_payload(&mut self, payload: WALBytes) { - println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap()) + fn apply_payload(&mut self, payload: WALBytes) -> Result<(), ()> { + println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap()); + Ok(()) } } diff --git a/tests/rand_fail.rs b/tests/rand_fail.rs index 988da05..7499694 100644 --- a/tests/rand_fail.rs +++ b/tests/rand_fail.rs @@ -7,7 +7,7 @@ mod common; fn test(records: Vec, wal: &mut WALWriter) -> Box<[WALRingId]> { let records: Vec = records.into_iter().map(|s| s.into_bytes().into_boxed_slice()).collect(); - let ret = wal.grow(&records); + let ret = wal.grow(&records).unwrap(); for ring_id in ret.iter() { println!("got ring id: {:?}", ring_id); } @@ -17,9 +17,9 @@ fn test(records: Vec, wal: &mut WALWriter) -> Box< #[test] fn test_rand_fail() { let mut state = common::WALStoreEmulState::new(); - let mut wal = WALLoader::new(common::WALStoreEmul::new(&mut state), 9, 8, 1000).recover(); + let mut wal = WALLoader::new(common::WALStoreEmul::new(&mut state), 9, 8, 1000).recover().unwrap(); for _ in 0..3 { test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::>(), &mut wal); } - let mut wal = WALLoader::new(common::WALStoreEmul::new(&mut state), 9, 8, 1000).recover(); + let mut wal = WALLoader::new(common::WALStoreEmul::new(&mut state), 9, 8, 1000).recover().unwrap(); } -- cgit v1.2.3-70-g09d2