From 6b8000d8fd8b88afbc7fb45094da29fb02627ed2 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 16 Jun 2020 12:40:07 -0400 Subject: finish the AIO File/Store impl --- src/lib.rs | 169 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) (limited to 'src/lib.rs') diff --git a/src/lib.rs b/src/lib.rs index 7635dab..906ec41 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,2 +1,171 @@ #[macro_use] extern crate scan_fmt; pub mod wal; + +use async_trait::async_trait; +use futures::executor::block_on; +use libaiofut::{new_batch_scheduler, AIOBatchSchedulerIn, AIOManager}; +use libc::off_t; +use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag}; +use nix::sys::stat::Mode; +use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags}; +use std::cell::RefCell; +use std::os::unix::io::RawFd; +use std::rc::Rc; +use wal::{WALBytes, WALFile, WALPos, WALRingId, WALStore}; + +pub struct WALFileAIO { + fd: RawFd, + aiomgr: Rc>>, +} + +impl WALFileAIO { + pub fn new( + rootfd: RawFd, + filename: &str, + aiomgr: Rc>>, + ) -> Result { + openat( + rootfd, + filename, + OFlag::O_CREAT | OFlag::O_RDWR, + Mode::S_IRUSR | Mode::S_IWUSR, + ) + .and_then(|fd| Ok(WALFileAIO { fd, aiomgr })) + .or_else(|_| Err(())) + } +} + +impl Drop for WALFileAIO { + fn drop(&mut self) { + close(self.fd).unwrap(); + } +} + +#[async_trait(?Send)] +impl WALFile for WALFileAIO { + async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> { + // TODO: is there any async version of fallocate? + fallocate( + self.fd, + FallocateFlags::FALLOC_FL_ZERO_RANGE, + offset as off_t, + length as off_t, + ) + .and_then(|_| Ok(())) + .or_else(|_| Err(())) + } + + fn truncate(&self, length: usize) -> Result<(), ()> { + ftruncate(self.fd, length as off_t).or_else(|_| Err(())) + } + + async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { + self.aiomgr + .borrow_mut() + .write(self.fd, offset, data, None) + .await + .or_else(|_| Err(())) + .and_then(|(nwrote, data)| { + if nwrote == data.len() { + Ok(()) + } else { + Err(()) + } + }) + } + + fn read( + &self, + offset: WALPos, + length: usize, + ) -> Result, ()> { + block_on(self.aiomgr.borrow_mut().read(self.fd, offset, length, None)) + .or_else(|_| Err(())) + .and_then(|(nread, data)| { + Ok(if nread == length { Some(data) } else { None }) + }) + } +} + +pub struct WALStoreAIO Result<(), ()>> { + rootfd: RawFd, + rootpath: String, + recover_func: RefCell, +} + +impl Result<(), ()>> WALStoreAIO { + pub fn new(wal_dir: &str, truncate: bool, recover_func: F) -> Self { + let recover_func = RefCell::new(recover_func); + let rootpath = wal_dir.to_string(); + if truncate { + let _ = std::fs::remove_dir_all(wal_dir); + } + match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) { + Err(e) => { + if truncate { + panic!("error while creating directory: {}", e) + } + } + Ok(_) => (), + } + let rootfd = match open( + wal_dir, + OFlag::O_DIRECTORY | OFlag::O_PATH, + Mode::empty(), + ) { + Ok(fd) => fd, + Err(_) => panic!("error while opening the WAL directory"), + }; + WALStoreAIO { + rootfd, + rootpath, + recover_func, + } + } +} + +#[async_trait(?Send)] +impl Result<(), ()>> WALStore + for WALStoreAIO +{ + type FileNameIter = std::vec::IntoIter; + + async fn open_file( + &self, + filename: &str, + _touch: bool, + ) -> Result, ()> { + let filename = filename.to_string(); + let aiomgr = Rc::new(RefCell::new( + AIOManager::new(new_batch_scheduler(None), 10, None, None) + .or(Err(()))?, + )); + WALFileAIO::new(self.rootfd, &filename, aiomgr.clone()) + .and_then(|f| Ok(Box::new(f) as Box)) + } + + async fn remove_file(&self, filename: String) -> Result<(), ()> { + unlinkat( + Some(self.rootfd), + filename.as_str(), + UnlinkatFlags::NoRemoveDir, + ) + .or_else(|_| Err(())) + } + + fn enumerate_files(&self) -> Result { + let mut logfiles = Vec::new(); + for fname in std::fs::read_dir(&self.rootpath).unwrap() { + logfiles.push(fname.unwrap().file_name().into_string().unwrap()) + } + Ok(logfiles.into_iter()) + } + + fn apply_payload( + &self, + payload: WALBytes, + ringid: WALRingId, + ) -> Result<(), ()> { + (&mut *self.recover_func.borrow_mut())(payload, ringid) + } +} -- cgit v1.2.3-70-g09d2