diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/lib.rs | 100 | ||||
-rw-r--r-- | src/wal.rs | 30 |
2 files changed, 86 insertions, 44 deletions
@@ -10,8 +10,8 @@ //! //! //! // Start with empty WAL (truncate = true). -//! let store = WALStoreAIO::new("./walfiles", true, None).unwrap(); -//! let mut wal = loader.load(store, |_, _| {Ok(())}).unwrap(); +//! let store = WALStoreAIO::new("./walfiles", true, None, None).unwrap(); +//! let mut wal = block_on(loader.load(store, |_, _| {Ok(())})).unwrap(); //! // Write a vector of records to WAL. //! for f in wal.grow(vec!["record1(foo)", "record2(bar)", "record3(foobar)"]).into_iter() { //! let ring_id = block_on(f).unwrap().1; @@ -20,28 +20,28 @@ //! //! //! // Load from WAL (truncate = false). -//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap(); -//! let mut wal = loader.load(store, |payload, ringid| { +//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap(); +//! let mut wal = block_on(loader.load(store, |payload, ringid| { //! // redo the operations in your application //! println!("recover(payload={}, ringid={:?})", //! std::str::from_utf8(&payload).unwrap(), //! ringid); //! Ok(()) -//! }).unwrap(); +//! })).unwrap(); //! // We saw some log playback, even there is no failure. //! // Let's try to grow the WAL to create many files. -//! let ring_ids = wal.grow((0..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>()) +//! let ring_ids = wal.grow((1..100).into_iter().map(|i| "a".repeat(i)).collect::<Vec<_>>()) //! .into_iter().map(|f| block_on(f).unwrap().1).collect::<Vec<_>>(); //! // Then assume all these records are not longer needed. We can tell WALWriter by the `peel` //! // method. //! block_on(wal.peel(ring_ids)).unwrap(); //! // There will only be one remaining file in ./walfiles. //! -//! let store = WALStoreAIO::new("./walfiles", false, None).unwrap(); -//! let wal = loader.load(store, |payload, _| { +//! let store = WALStoreAIO::new("./walfiles", false, None, None).unwrap(); +//! let wal = block_on(loader.load(store, |payload, _| { //! println!("payload.len() = {}", payload.len()); //! Ok(()) -//! }).unwrap(); +//! })).unwrap(); //! // After each recovery, the ./walfiles is empty. //! ``` @@ -49,7 +49,6 @@ pub mod wal; use async_trait::async_trait; -use futures::executor::block_on; use libaiofut::{AIOBuilder, AIOManager}; use libc::off_t; use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag}; @@ -116,13 +115,12 @@ impl WALFile for WALFileAIO { }) } - fn read( + async fn read( &self, offset: WALPos, length: usize, ) -> Result<Option<WALBytes>, ()> { - let (res, data) = - block_on(self.aiomgr.read(self.fd, offset, length, None)); + let (res, data) = self.aiomgr.read(self.fd, offset, length, None).await; res.or_else(|_| Err(())).and_then(|nread| { Ok(if nread == length { Some(data) } else { None }) }) @@ -131,7 +129,6 @@ impl WALFile for WALFileAIO { pub struct WALStoreAIO { rootfd: RawFd, - rootpath: String, aiomgr: Rc<AIOManager>, } @@ -139,9 +136,9 @@ impl WALStoreAIO { pub fn new( wal_dir: &str, truncate: bool, + rootfd: Option<RawFd>, aiomgr: Option<AIOManager>, ) -> Result<Self, ()> { - let rootpath = wal_dir.to_string(); let aiomgr = Rc::new(aiomgr.ok_or(Err(())).or_else( |_: Result<AIOManager, ()>| { AIOBuilder::default().build().or(Err(())) @@ -151,25 +148,55 @@ impl WALStoreAIO { if truncate { let _ = std::fs::remove_dir_all(wal_dir); } - match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) { - Err(e) => { - if truncate { - panic!("error while creating directory: {}", e) + let walfd; + match rootfd { + None => { + match mkdir( + wal_dir, + Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR, + ) { + Err(e) => { + if truncate { + panic!("error while creating directory: {}", e) + } + } + Ok(_) => (), + } + walfd = match open( + wal_dir, + OFlag::O_DIRECTORY | OFlag::O_PATH, + Mode::empty(), + ) { + Ok(fd) => fd, + Err(_) => panic!("error while opening the WAL directory"), + } + } + Some(fd) => { + let ret = unsafe { + libc::mkdirat( + fd, + std::ffi::CString::new(wal_dir).unwrap().as_ptr(), + libc::S_IRUSR | libc::S_IWUSR | libc::S_IXUSR, + ) + }; + if ret != 0 { + if truncate { + panic!("error while creating directory") + } + } + walfd = match nix::fcntl::openat( + fd, + wal_dir, + OFlag::O_DIRECTORY | OFlag::O_PATH, + Mode::empty(), + ) { + Ok(fd) => fd, + Err(_) => panic!("error while opening the WAL directory"), } } - Ok(_) => (), } - let rootfd = match open( - wal_dir, - OFlag::O_DIRECTORY | OFlag::O_PATH, - Mode::empty(), - ) { - Ok(fd) => fd, - Err(_) => panic!("error while opening the WAL directory"), - }; Ok(WALStoreAIO { - rootfd, - rootpath, + rootfd: walfd, aiomgr, }) } @@ -200,8 +227,17 @@ impl WALStore for WALStoreAIO { fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> { let mut logfiles = Vec::new(); - for fname in std::fs::read_dir(&self.rootpath).unwrap() { - logfiles.push(fname.unwrap().file_name().into_string().unwrap()) + for ent in nix::dir::Dir::openat( + self.rootfd, + "./", + OFlag::empty(), + Mode::empty(), + ) + .unwrap() + .iter() + { + logfiles + .push(ent.unwrap().file_name().to_str().unwrap().to_string()) } Ok(logfiles.into_iter()) } @@ -1,5 +1,4 @@ use async_trait::async_trait; -use futures::executor::block_on; use futures::future::{self, FutureExt, TryFutureExt}; use std::cell::{RefCell, UnsafeCell}; use std::collections::{hash_map, BinaryHeap, HashMap}; @@ -106,7 +105,7 @@ pub trait WALFile { /// all or nothing). async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>; /// Read data with offset. Return `Ok(None)` when it reaches EOF. - fn read( + async fn read( &self, offset: WALPos, length: usize, @@ -408,7 +407,8 @@ impl<F: WALStore> WALWriter<F> { /// corresponds to one record. When a future resolves to `WALRingId`, it is guaranteed the /// record is already logged. Then, after finalizing the changes encoded by that record to /// the persistent storage, the caller can recycle the WAL files by invoking the given - /// `peel` with the given `WALRingId`s. + /// `peel` with the given `WALRingId`s. Note: each serialized record should contain at least 1 + /// byte (empty record payload will result in assertion failure). pub fn grow<'a, R: Record + 'a>( &'a mut self, records: Vec<R>, @@ -427,6 +427,7 @@ impl<F: WALStore> WALWriter<F> { let mut rec = &bytes[..]; let mut rsize = rec.len() as u32; let mut ring_start = None; + assert!(rsize > 0); while rsize > 0 { let remain = self.block_size - bbuff_cur; if remain > msize { @@ -647,7 +648,10 @@ impl WALLoader { } /// Recover by reading the WAL files. - pub fn load<S: WALStore, F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>( + pub async fn load< + S: WALStore, + F: FnMut(WALBytes, WALRingId) -> Result<(), ()>, + >( &self, store: S, mut recover_func: F, @@ -674,14 +678,14 @@ impl WALLoader { let mut skip = false; for fname in logfiles.into_iter() { let fid = file_pool.get_fid(&fname); - let f = block_on(file_pool.get_file(fid, false))?; + let f = file_pool.get_file(fid, false).await?; let mut off = 0; if skip { f.truncate(0)?; - block_on(file_pool.store.remove_file(fname))?; + file_pool.store.remove_file(fname).await?; continue; } - while let Some(header_raw) = f.read(off, msize as usize)? { + while let Some(header_raw) = f.read(off, msize as usize).await? { let ringid_start = (fid << file_pool.file_nbit) + off; off += msize as u64; let header = unsafe { @@ -693,7 +697,8 @@ impl WALLoader { match header.rtype { WALRingType::Full => { assert!(chunks.is_none()); - let payload = f.read(off, rsize as usize)?.ok_or(())?; + let payload = + f.read(off, rsize as usize).await?.ok_or(())?; // TODO: improve the behavior when CRC32 fails if !self.verify_checksum(&payload, header.crc32)? { skip = true; @@ -710,7 +715,8 @@ impl WALLoader { } WALRingType::First => { assert!(chunks.is_none()); - let chunk = f.read(off, rsize as usize)?.ok_or(())?; + let chunk = + f.read(off, rsize as usize).await?.ok_or(())?; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; break; @@ -721,7 +727,7 @@ impl WALLoader { WALRingType::Middle => { if let Some((chunks, _)) = &mut chunks { let chunk = - f.read(off, rsize as usize)?.ok_or(())?; + f.read(off, rsize as usize).await?.ok_or(())?; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; break; @@ -734,7 +740,7 @@ impl WALLoader { if let Some((mut chunks, ringid_start)) = chunks.take() { let chunk = - f.read(off, rsize as usize)?.ok_or(())?; + f.read(off, rsize as usize).await?.ok_or(())?; off += rsize as u64; if !self.verify_checksum(&chunk, header.crc32)? { skip = true; @@ -772,7 +778,7 @@ impl WALLoader { } } f.truncate(0)?; - block_on(file_pool.store.remove_file(fname))?; + file_pool.store.remove_file(fname).await?; } file_pool.reset(); Ok(WALWriter::new( |