diff options
author | Determinant <[email protected]> | 2020-06-16 12:40:07 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-16 12:40:07 -0400 |
commit | 6b8000d8fd8b88afbc7fb45094da29fb02627ed2 (patch) | |
tree | 96d2659772e1c68776cd20893e3609b8a0a22a89 | |
parent | d0e8ceeb250ce362d7d9bf2c6e5c297c716259cc (diff) |
finish the AIO File/Store impl
-rw-r--r-- | Cargo.lock | 133 | ||||
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | examples/demo1.rs | 209 | ||||
-rw-r--r-- | src/lib.rs | 169 |
4 files changed, 317 insertions, 197 deletions
@@ -66,6 +66,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" [[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +dependencies = [ + "bitflags", +] + +[[package]] name = "const-random" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -95,6 +104,27 @@ dependencies = [ ] [[package]] +name = "crossbeam-channel" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061" +dependencies = [ + "crossbeam-utils", + "maybe-uninit", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg 1.0.0", + "cfg-if", + "lazy_static", +] + +[[package]] name = "futures" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -209,6 +239,7 @@ dependencies = [ "futures", "hex", "indexmap", + "libaio-futures", "libc", "lru", "nix", @@ -249,12 +280,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] +name = "libaio-futures" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "049dd6ea8958e5c868151b08c68fa771daa979304ab3de8f5bab58126ba53e3b" +dependencies = [ + "crossbeam-channel", + "libc", + "parking_lot", +] + +[[package]] name = "libc" version = "0.2.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49" [[package]] +name = "lock_api" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" +dependencies = [ + "scopeguard", +] + +[[package]] name = "lru" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -264,6 +315,12 @@ dependencies = [ ] [[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + +[[package]] name = "memchr" version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -289,19 +346,43 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" [[package]] +name = "parking_lot" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3" +dependencies = [ + "cfg-if", + "cloudabi", + "libc", + "redox_syscall", + "smallvec", + "winapi", +] + +[[package]] name = "pin-project" -version = "0.4.20" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e75373ff9037d112bb19bc61333a06a159eaeb217660dcfbea7d88e1db823919" +checksum = "12e3a6cdbfe94a5e4572812a0201f8c0ed98c1c452c7b8563ce2276988ef9c17" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "0.4.20" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10b4b44893d3c370407a1d6a5cfde7c41ae0478e31c516c85f67eb3adc51be6d" +checksum = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7" dependencies = [ "proc-macro2", "quote", @@ -328,9 +409,9 @@ checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4" [[package]] name = "proc-macro-nested" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0afe1bd463b9e9ed51d0e0f0b50b6b146aec855c56fd182bb242388710a9b6de" +checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" [[package]] name = "proc-macro2" @@ -392,6 +473,12 @@ dependencies = [ ] [[package]] +name = "redox_syscall" +version = "0.1.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" + +[[package]] name = "regex" version = "1.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -419,12 +506,24 @@ dependencies = [ ] [[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] name = "slab" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" [[package]] +name = "smallvec" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4" + +[[package]] name = "syn" version = "1.0.31" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -461,3 +560,25 @@ name = "wasi" version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + +[[package]] +name = "winapi" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" @@ -13,6 +13,9 @@ scan_fmt = "0.2.5" regex = "1" async-trait = "0.1" futures = "0.3" +libaio-futures = "0.1.1" +nix = "0.17.0" +libc = "0.2.71" [dev-dependencies] hex = "0.4.2" diff --git a/examples/demo1.rs b/examples/demo1.rs index 0c735f8..4a923c6 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -1,195 +1,12 @@ -use async_trait::async_trait; -use libc::off_t; -use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag}; -use nix::sys::{ - stat::Mode, - uio::{pread, pwrite}, +use growthring::{ + wal::{WALBytes, WALLoader, WALRingId, WALWriter}, + WALStoreAIO, }; -use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags}; use rand::{seq::SliceRandom, Rng}; -use std::os::unix::io::RawFd; -use growthring::wal::{ - WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, WALWriter, -}; - -struct WALFileTest { - filename: String, - fd: RawFd, -} - -impl WALFileTest { - fn new(rootfd: RawFd, filename: &str) -> Result<Self, ()> { - openat( - rootfd, - filename, - OFlag::O_CREAT | OFlag::O_RDWR, - Mode::S_IRUSR | Mode::S_IWUSR, - ) - .and_then(|fd| { - let filename = filename.to_string(); - Ok(WALFileTest { filename, fd }) - }) - .or_else(|_| Err(())) - } -} - -impl Drop for WALFileTest { - fn drop(&mut self) { - close(self.fd).unwrap(); - } -} - -#[async_trait(?Send)] -impl WALFile for WALFileTest { - async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> { - println!( - "{}.allocate(offset=0x{:x}, end=0x{:x})", - self.filename, - offset, - offset + length as u64 - ); - fallocate( - self.fd, - FallocateFlags::FALLOC_FL_ZERO_RANGE, - offset as off_t, - length as off_t, - ) - .and_then(|_| Ok(())) - .or_else(|_| Err(())) - } - - fn truncate(&self, length: usize) -> Result<(), ()> { - println!("{}.truncate(length={})", self.filename, length); - ftruncate(self.fd, length as off_t).or_else(|_| Err(())) - } - - async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { - println!( - "{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})", - self.filename, - offset, - offset + data.len() as u64, - hex::encode(&data) - ); - pwrite(self.fd, &*data, offset as off_t) - .or_else(|_| Err(())) - .and_then(|nwrote| { - if nwrote == data.len() { - Ok(()) - } else { - Err(()) - } - }) - } - - fn read( - &self, - offset: WALPos, - length: usize, - ) -> Result<Option<WALBytes>, ()> { - let mut buff = Vec::new(); - buff.resize(length, 0); - pread(self.fd, &mut buff[..], offset as off_t) - .or_else(|_| Err(())) - .and_then(|nread| { - Ok(if nread == length { - Some(buff.into_boxed_slice()) - } else { - None - }) - }) - } -} - -struct WALStoreTest { - rootfd: RawFd, - rootpath: String, -} - -impl WALStoreTest { - fn new(wal_dir: &str, truncate: bool) -> Self { - let rootpath = wal_dir.to_string(); - if truncate { - let _ = std::fs::remove_dir_all(wal_dir); - } - match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) { - Err(e) => { - if truncate { - panic!("error while creating directory: {}", e) - } - } - Ok(_) => (), - } - let rootfd = match open( - wal_dir, - OFlag::O_DIRECTORY | OFlag::O_PATH, - Mode::empty(), - ) { - Ok(fd) => fd, - Err(_) => panic!("error while opening the DB"), - }; - WALStoreTest { rootfd, rootpath } - } -} - -impl Drop for WALStoreTest { - fn drop(&mut self) { - close(self.rootfd).unwrap(); - } -} - -#[async_trait(?Send)] -impl WALStore for WALStoreTest { - type FileNameIter = std::vec::IntoIter<String>; - - async fn open_file( - &self, - filename: &str, - touch: bool, - ) -> Result<Box<dyn WALFile>, ()> { - println!("open_file(filename={}, touch={})", filename, touch); - let filename = filename.to_string(); - WALFileTest::new(self.rootfd, &filename) - .and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>)) - } - - async fn remove_file(&self, filename: String) -> Result<(), ()> { - println!("remove_file(filename={})", filename); - unlinkat( - Some(self.rootfd), - filename.as_str(), - UnlinkatFlags::NoRemoveDir, - ) - .or_else(|_| Err(())) - } - - fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> { - println!("enumerate_files()"); - let mut logfiles = Vec::new(); - for fname in std::fs::read_dir(&self.rootpath).unwrap() { - logfiles.push(fname.unwrap().file_name().into_string().unwrap()) - } - Ok(logfiles.into_iter()) - } - - fn apply_payload( - &self, - payload: WALBytes, - ringid: WALRingId, - ) -> Result<(), ()> { - println!( - "apply_payload(payload={}, ringid={:?})", - std::str::from_utf8(&payload).unwrap(), - ringid - ); - Ok(()) - } -} - -fn test( +fn test<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>( records: Vec<String>, - wal: &mut WALWriter<WALStoreTest>, + wal: &mut WALWriter<WALStoreAIO<F>>, ) -> Vec<WALRingId> { let mut res = Vec::new(); for r in wal.grow(records).into_iter() { @@ -200,9 +17,19 @@ fn test( res } +fn recover(payload: WALBytes, ringid: WALRingId) -> Result<(), ()> { + println!( + "recover(payload={}, ringid={:?}", + std::str::from_utf8(&payload).unwrap(), + ringid + ); + Ok(()) +} + fn main() { + let wal_dir = "./wal_demo1"; let mut rng = rand::thread_rng(); - let store = WALStoreTest::new("./wal_demo1", true); + let store = WALStoreAIO::new(&wal_dir, true, recover); let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { test( @@ -220,7 +47,7 @@ fn main() { ); } - let store = WALStoreTest::new("./wal_demo1", false); + let store = WALStoreAIO::new(&wal_dir, false, recover); let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { test( @@ -234,7 +61,7 @@ fn main() { ); } - let store = WALStoreTest::new("./wal_demo1", false); + let store = WALStoreAIO::new(&wal_dir, false, recover); let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { let mut ids = Vec::new(); @@ -1,2 +1,171 @@ #[macro_use] extern crate scan_fmt; pub mod wal; + +use async_trait::async_trait; +use futures::executor::block_on; +use libaiofut::{new_batch_scheduler, AIOBatchSchedulerIn, AIOManager}; +use libc::off_t; +use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag}; +use nix::sys::stat::Mode; +use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags}; +use std::cell::RefCell; +use std::os::unix::io::RawFd; +use std::rc::Rc; +use wal::{WALBytes, WALFile, WALPos, WALRingId, WALStore}; + +pub struct WALFileAIO { + fd: RawFd, + aiomgr: Rc<RefCell<AIOManager<AIOBatchSchedulerIn>>>, +} + +impl WALFileAIO { + pub fn new( + rootfd: RawFd, + filename: &str, + aiomgr: Rc<RefCell<AIOManager<AIOBatchSchedulerIn>>>, + ) -> Result<Self, ()> { + openat( + rootfd, + filename, + OFlag::O_CREAT | OFlag::O_RDWR, + Mode::S_IRUSR | Mode::S_IWUSR, + ) + .and_then(|fd| Ok(WALFileAIO { fd, aiomgr })) + .or_else(|_| Err(())) + } +} + +impl Drop for WALFileAIO { + fn drop(&mut self) { + close(self.fd).unwrap(); + } +} + +#[async_trait(?Send)] +impl WALFile for WALFileAIO { + async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> { + // TODO: is there any async version of fallocate? + fallocate( + self.fd, + FallocateFlags::FALLOC_FL_ZERO_RANGE, + offset as off_t, + length as off_t, + ) + .and_then(|_| Ok(())) + .or_else(|_| Err(())) + } + + fn truncate(&self, length: usize) -> Result<(), ()> { + ftruncate(self.fd, length as off_t).or_else(|_| Err(())) + } + + async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { + self.aiomgr + .borrow_mut() + .write(self.fd, offset, data, None) + .await + .or_else(|_| Err(())) + .and_then(|(nwrote, data)| { + if nwrote == data.len() { + Ok(()) + } else { + Err(()) + } + }) + } + + fn read( + &self, + offset: WALPos, + length: usize, + ) -> Result<Option<WALBytes>, ()> { + block_on(self.aiomgr.borrow_mut().read(self.fd, offset, length, None)) + .or_else(|_| Err(())) + .and_then(|(nread, data)| { + Ok(if nread == length { Some(data) } else { None }) + }) + } +} + +pub struct WALStoreAIO<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> { + rootfd: RawFd, + rootpath: String, + recover_func: RefCell<F>, +} + +impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStoreAIO<F> { + pub fn new(wal_dir: &str, truncate: bool, recover_func: F) -> Self { + let recover_func = RefCell::new(recover_func); + let rootpath = wal_dir.to_string(); + if truncate { + let _ = std::fs::remove_dir_all(wal_dir); + } + match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) { + Err(e) => { + if truncate { + panic!("error while creating directory: {}", e) + } + } + Ok(_) => (), + } + let rootfd = match open( + wal_dir, + OFlag::O_DIRECTORY | OFlag::O_PATH, + Mode::empty(), + ) { + Ok(fd) => fd, + Err(_) => panic!("error while opening the WAL directory"), + }; + WALStoreAIO { + rootfd, + rootpath, + recover_func, + } + } +} + +#[async_trait(?Send)] +impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStore + for WALStoreAIO<F> +{ + type FileNameIter = std::vec::IntoIter<String>; + + async fn open_file( + &self, + filename: &str, + _touch: bool, + ) -> Result<Box<dyn WALFile>, ()> { + let filename = filename.to_string(); + let aiomgr = Rc::new(RefCell::new( + AIOManager::new(new_batch_scheduler(None), 10, None, None) + .or(Err(()))?, + )); + WALFileAIO::new(self.rootfd, &filename, aiomgr.clone()) + .and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>)) + } + + async fn remove_file(&self, filename: String) -> Result<(), ()> { + unlinkat( + Some(self.rootfd), + filename.as_str(), + UnlinkatFlags::NoRemoveDir, + ) + .or_else(|_| Err(())) + } + + fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> { + let mut logfiles = Vec::new(); + for fname in std::fs::read_dir(&self.rootpath).unwrap() { + logfiles.push(fname.unwrap().file_name().into_string().unwrap()) + } + Ok(logfiles.into_iter()) + } + + fn apply_payload( + &self, + payload: WALBytes, + ringid: WALRingId, + ) -> Result<(), ()> { + (&mut *self.recover_func.borrow_mut())(payload, ringid) + } +} |