From 6b8000d8fd8b88afbc7fb45094da29fb02627ed2 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 16 Jun 2020 12:40:07 -0400 Subject: finish the AIO File/Store impl --- examples/demo1.rs | 209 +++++------------------------------------------------- 1 file changed, 18 insertions(+), 191 deletions(-) (limited to 'examples') diff --git a/examples/demo1.rs b/examples/demo1.rs index 0c735f8..4a923c6 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -1,195 +1,12 @@ -use async_trait::async_trait; -use libc::off_t; -use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag}; -use nix::sys::{ - stat::Mode, - uio::{pread, pwrite}, +use growthring::{ + wal::{WALBytes, WALLoader, WALRingId, WALWriter}, + WALStoreAIO, }; -use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags}; use rand::{seq::SliceRandom, Rng}; -use std::os::unix::io::RawFd; -use growthring::wal::{ - WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, WALWriter, -}; - -struct WALFileTest { - filename: String, - fd: RawFd, -} - -impl WALFileTest { - fn new(rootfd: RawFd, filename: &str) -> Result { - openat( - rootfd, - filename, - OFlag::O_CREAT | OFlag::O_RDWR, - Mode::S_IRUSR | Mode::S_IWUSR, - ) - .and_then(|fd| { - let filename = filename.to_string(); - Ok(WALFileTest { filename, fd }) - }) - .or_else(|_| Err(())) - } -} - -impl Drop for WALFileTest { - fn drop(&mut self) { - close(self.fd).unwrap(); - } -} - -#[async_trait(?Send)] -impl WALFile for WALFileTest { - async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> { - println!( - "{}.allocate(offset=0x{:x}, end=0x{:x})", - self.filename, - offset, - offset + length as u64 - ); - fallocate( - self.fd, - FallocateFlags::FALLOC_FL_ZERO_RANGE, - offset as off_t, - length as off_t, - ) - .and_then(|_| Ok(())) - .or_else(|_| Err(())) - } - - fn truncate(&self, length: usize) -> Result<(), ()> { - println!("{}.truncate(length={})", self.filename, length); - ftruncate(self.fd, length as off_t).or_else(|_| Err(())) - } - - async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { - println!( - "{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})", - self.filename, - offset, - offset + data.len() as u64, - hex::encode(&data) - ); - pwrite(self.fd, &*data, offset as off_t) - .or_else(|_| Err(())) - .and_then(|nwrote| { - if nwrote == data.len() { - Ok(()) - } else { - Err(()) - } - }) - } - - fn read( - &self, - offset: WALPos, - length: usize, - ) -> Result, ()> { - let mut buff = Vec::new(); - buff.resize(length, 0); - pread(self.fd, &mut buff[..], offset as off_t) - .or_else(|_| Err(())) - .and_then(|nread| { - Ok(if nread == length { - Some(buff.into_boxed_slice()) - } else { - None - }) - }) - } -} - -struct WALStoreTest { - rootfd: RawFd, - rootpath: String, -} - -impl WALStoreTest { - fn new(wal_dir: &str, truncate: bool) -> Self { - let rootpath = wal_dir.to_string(); - if truncate { - let _ = std::fs::remove_dir_all(wal_dir); - } - match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) { - Err(e) => { - if truncate { - panic!("error while creating directory: {}", e) - } - } - Ok(_) => (), - } - let rootfd = match open( - wal_dir, - OFlag::O_DIRECTORY | OFlag::O_PATH, - Mode::empty(), - ) { - Ok(fd) => fd, - Err(_) => panic!("error while opening the DB"), - }; - WALStoreTest { rootfd, rootpath } - } -} - -impl Drop for WALStoreTest { - fn drop(&mut self) { - close(self.rootfd).unwrap(); - } -} - -#[async_trait(?Send)] -impl WALStore for WALStoreTest { - type FileNameIter = std::vec::IntoIter; - - async fn open_file( - &self, - filename: &str, - touch: bool, - ) -> Result, ()> { - println!("open_file(filename={}, touch={})", filename, touch); - let filename = filename.to_string(); - WALFileTest::new(self.rootfd, &filename) - .and_then(|f| Ok(Box::new(f) as Box)) - } - - async fn remove_file(&self, filename: String) -> Result<(), ()> { - println!("remove_file(filename={})", filename); - unlinkat( - Some(self.rootfd), - filename.as_str(), - UnlinkatFlags::NoRemoveDir, - ) - .or_else(|_| Err(())) - } - - fn enumerate_files(&self) -> Result { - println!("enumerate_files()"); - let mut logfiles = Vec::new(); - for fname in std::fs::read_dir(&self.rootpath).unwrap() { - logfiles.push(fname.unwrap().file_name().into_string().unwrap()) - } - Ok(logfiles.into_iter()) - } - - fn apply_payload( - &self, - payload: WALBytes, - ringid: WALRingId, - ) -> Result<(), ()> { - println!( - "apply_payload(payload={}, ringid={:?})", - std::str::from_utf8(&payload).unwrap(), - ringid - ); - Ok(()) - } -} - -fn test( +fn test Result<(), ()>>( records: Vec, - wal: &mut WALWriter, + wal: &mut WALWriter>, ) -> Vec { let mut res = Vec::new(); for r in wal.grow(records).into_iter() { @@ -200,9 +17,19 @@ fn test( res } +fn recover(payload: WALBytes, ringid: WALRingId) -> Result<(), ()> { + println!( + "recover(payload={}, ringid={:?}", + std::str::from_utf8(&payload).unwrap(), + ringid + ); + Ok(()) +} + fn main() { + let wal_dir = "./wal_demo1"; let mut rng = rand::thread_rng(); - let store = WALStoreTest::new("./wal_demo1", true); + let store = WALStoreAIO::new(&wal_dir, true, recover); let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { test( @@ -220,7 +47,7 @@ fn main() { ); } - let store = WALStoreTest::new("./wal_demo1", false); + let store = WALStoreAIO::new(&wal_dir, false, recover); let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { test( @@ -234,7 +61,7 @@ fn main() { ); } - let store = WALStoreTest::new("./wal_demo1", false); + let store = WALStoreAIO::new(&wal_dir, false, recover); let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { let mut ids = Vec::new(); -- cgit v1.2.3-70-g09d2