aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDeterminant <tederminant@gmail.com>2020-06-16 12:40:07 -0400
committerDeterminant <tederminant@gmail.com>2020-06-16 12:40:07 -0400
commit6b8000d8fd8b88afbc7fb45094da29fb02627ed2 (patch)
tree96d2659772e1c68776cd20893e3609b8a0a22a89
parentd0e8ceeb250ce362d7d9bf2c6e5c297c716259cc (diff)
finish the AIO File/Store impl
-rw-r--r--Cargo.lock133
-rw-r--r--Cargo.toml3
-rw-r--r--examples/demo1.rs209
-rw-r--r--src/lib.rs169
4 files changed, 317 insertions, 197 deletions
diff --git a/Cargo.lock b/Cargo.lock
index a8226bb..67f4716 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -66,6 +66,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
+name = "cloudabi"
+version = "0.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
+dependencies = [
+ "bitflags",
+]
+
+[[package]]
name = "const-random"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -95,6 +104,27 @@ dependencies = [
]
[[package]]
+name = "crossbeam-channel"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061"
+dependencies = [
+ "crossbeam-utils",
+ "maybe-uninit",
+]
+
+[[package]]
+name = "crossbeam-utils"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
+dependencies = [
+ "autocfg 1.0.0",
+ "cfg-if",
+ "lazy_static",
+]
+
+[[package]]
name = "futures"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -209,6 +239,7 @@ dependencies = [
"futures",
"hex",
"indexmap",
+ "libaio-futures",
"libc",
"lru",
"nix",
@@ -249,12 +280,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
+name = "libaio-futures"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "049dd6ea8958e5c868151b08c68fa771daa979304ab3de8f5bab58126ba53e3b"
+dependencies = [
+ "crossbeam-channel",
+ "libc",
+ "parking_lot",
+]
+
+[[package]]
name = "libc"
version = "0.2.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49"
[[package]]
+name = "lock_api"
+version = "0.3.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75"
+dependencies = [
+ "scopeguard",
+]
+
+[[package]]
name = "lru"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -264,6 +315,12 @@ dependencies = [
]
[[package]]
+name = "maybe-uninit"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
+
+[[package]]
name = "memchr"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -289,19 +346,43 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
[[package]]
+name = "parking_lot"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d3a704eb390aafdc107b0e392f56a82b668e3a71366993b5340f5833fd62505e"
+dependencies = [
+ "lock_api",
+ "parking_lot_core",
+]
+
+[[package]]
+name = "parking_lot_core"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d58c7c768d4ba344e3e8d72518ac13e259d7c7ade24167003b8488e10b6740a3"
+dependencies = [
+ "cfg-if",
+ "cloudabi",
+ "libc",
+ "redox_syscall",
+ "smallvec",
+ "winapi",
+]
+
+[[package]]
name = "pin-project"
-version = "0.4.20"
+version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e75373ff9037d112bb19bc61333a06a159eaeb217660dcfbea7d88e1db823919"
+checksum = "12e3a6cdbfe94a5e4572812a0201f8c0ed98c1c452c7b8563ce2276988ef9c17"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
-version = "0.4.20"
+version = "0.4.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "10b4b44893d3c370407a1d6a5cfde7c41ae0478e31c516c85f67eb3adc51be6d"
+checksum = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7"
dependencies = [
"proc-macro2",
"quote",
@@ -328,9 +409,9 @@ checksum = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4"
[[package]]
name = "proc-macro-nested"
-version = "0.1.5"
+version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0afe1bd463b9e9ed51d0e0f0b50b6b146aec855c56fd182bb242388710a9b6de"
+checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"
[[package]]
name = "proc-macro2"
@@ -392,6 +473,12 @@ dependencies = [
]
[[package]]
+name = "redox_syscall"
+version = "0.1.56"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84"
+
+[[package]]
name = "regex"
version = "1.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -419,12 +506,24 @@ dependencies = [
]
[[package]]
+name = "scopeguard"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
+
+[[package]]
name = "slab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
+name = "smallvec"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c7cb5678e1615754284ec264d9bb5b4c27d2018577fd90ac0ceb578591ed5ee4"
+
+[[package]]
name = "syn"
version = "1.0.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -461,3 +560,25 @@ name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
+
+[[package]]
+name = "winapi"
+version = "0.3.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6"
+dependencies = [
+ "winapi-i686-pc-windows-gnu",
+ "winapi-x86_64-pc-windows-gnu",
+]
+
+[[package]]
+name = "winapi-i686-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
+
+[[package]]
+name = "winapi-x86_64-pc-windows-gnu"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
diff --git a/Cargo.toml b/Cargo.toml
index 4e17630..f84b995 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -13,6 +13,9 @@ scan_fmt = "0.2.5"
regex = "1"
async-trait = "0.1"
futures = "0.3"
+libaio-futures = "0.1.1"
+nix = "0.17.0"
+libc = "0.2.71"
[dev-dependencies]
hex = "0.4.2"
diff --git a/examples/demo1.rs b/examples/demo1.rs
index 0c735f8..4a923c6 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -1,195 +1,12 @@
-use async_trait::async_trait;
-use libc::off_t;
-use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag};
-use nix::sys::{
- stat::Mode,
- uio::{pread, pwrite},
+use growthring::{
+ wal::{WALBytes, WALLoader, WALRingId, WALWriter},
+ WALStoreAIO,
};
-use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags};
use rand::{seq::SliceRandom, Rng};
-use std::os::unix::io::RawFd;
-use growthring::wal::{
- WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, WALWriter,
-};
-
-struct WALFileTest {
- filename: String,
- fd: RawFd,
-}
-
-impl WALFileTest {
- fn new(rootfd: RawFd, filename: &str) -> Result<Self, ()> {
- openat(
- rootfd,
- filename,
- OFlag::O_CREAT | OFlag::O_RDWR,
- Mode::S_IRUSR | Mode::S_IWUSR,
- )
- .and_then(|fd| {
- let filename = filename.to_string();
- Ok(WALFileTest { filename, fd })
- })
- .or_else(|_| Err(()))
- }
-}
-
-impl Drop for WALFileTest {
- fn drop(&mut self) {
- close(self.fd).unwrap();
- }
-}
-
-#[async_trait(?Send)]
-impl WALFile for WALFileTest {
- async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> {
- println!(
- "{}.allocate(offset=0x{:x}, end=0x{:x})",
- self.filename,
- offset,
- offset + length as u64
- );
- fallocate(
- self.fd,
- FallocateFlags::FALLOC_FL_ZERO_RANGE,
- offset as off_t,
- length as off_t,
- )
- .and_then(|_| Ok(()))
- .or_else(|_| Err(()))
- }
-
- fn truncate(&self, length: usize) -> Result<(), ()> {
- println!("{}.truncate(length={})", self.filename, length);
- ftruncate(self.fd, length as off_t).or_else(|_| Err(()))
- }
-
- async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
- println!(
- "{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})",
- self.filename,
- offset,
- offset + data.len() as u64,
- hex::encode(&data)
- );
- pwrite(self.fd, &*data, offset as off_t)
- .or_else(|_| Err(()))
- .and_then(|nwrote| {
- if nwrote == data.len() {
- Ok(())
- } else {
- Err(())
- }
- })
- }
-
- fn read(
- &self,
- offset: WALPos,
- length: usize,
- ) -> Result<Option<WALBytes>, ()> {
- let mut buff = Vec::new();
- buff.resize(length, 0);
- pread(self.fd, &mut buff[..], offset as off_t)
- .or_else(|_| Err(()))
- .and_then(|nread| {
- Ok(if nread == length {
- Some(buff.into_boxed_slice())
- } else {
- None
- })
- })
- }
-}
-
-struct WALStoreTest {
- rootfd: RawFd,
- rootpath: String,
-}
-
-impl WALStoreTest {
- fn new(wal_dir: &str, truncate: bool) -> Self {
- let rootpath = wal_dir.to_string();
- if truncate {
- let _ = std::fs::remove_dir_all(wal_dir);
- }
- match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) {
- Err(e) => {
- if truncate {
- panic!("error while creating directory: {}", e)
- }
- }
- Ok(_) => (),
- }
- let rootfd = match open(
- wal_dir,
- OFlag::O_DIRECTORY | OFlag::O_PATH,
- Mode::empty(),
- ) {
- Ok(fd) => fd,
- Err(_) => panic!("error while opening the DB"),
- };
- WALStoreTest { rootfd, rootpath }
- }
-}
-
-impl Drop for WALStoreTest {
- fn drop(&mut self) {
- close(self.rootfd).unwrap();
- }
-}
-
-#[async_trait(?Send)]
-impl WALStore for WALStoreTest {
- type FileNameIter = std::vec::IntoIter<String>;
-
- async fn open_file(
- &self,
- filename: &str,
- touch: bool,
- ) -> Result<Box<dyn WALFile>, ()> {
- println!("open_file(filename={}, touch={})", filename, touch);
- let filename = filename.to_string();
- WALFileTest::new(self.rootfd, &filename)
- .and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>))
- }
-
- async fn remove_file(&self, filename: String) -> Result<(), ()> {
- println!("remove_file(filename={})", filename);
- unlinkat(
- Some(self.rootfd),
- filename.as_str(),
- UnlinkatFlags::NoRemoveDir,
- )
- .or_else(|_| Err(()))
- }
-
- fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
- println!("enumerate_files()");
- let mut logfiles = Vec::new();
- for fname in std::fs::read_dir(&self.rootpath).unwrap() {
- logfiles.push(fname.unwrap().file_name().into_string().unwrap())
- }
- Ok(logfiles.into_iter())
- }
-
- fn apply_payload(
- &self,
- payload: WALBytes,
- ringid: WALRingId,
- ) -> Result<(), ()> {
- println!(
- "apply_payload(payload={}, ringid={:?})",
- std::str::from_utf8(&payload).unwrap(),
- ringid
- );
- Ok(())
- }
-}
-
-fn test(
+fn test<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>(
records: Vec<String>,
- wal: &mut WALWriter<WALStoreTest>,
+ wal: &mut WALWriter<WALStoreAIO<F>>,
) -> Vec<WALRingId> {
let mut res = Vec::new();
for r in wal.grow(records).into_iter() {
@@ -200,9 +17,19 @@ fn test(
res
}
+fn recover(payload: WALBytes, ringid: WALRingId) -> Result<(), ()> {
+ println!(
+ "recover(payload={}, ringid={:?}",
+ std::str::from_utf8(&payload).unwrap(),
+ ringid
+ );
+ Ok(())
+}
+
fn main() {
+ let wal_dir = "./wal_demo1";
let mut rng = rand::thread_rng();
- let store = WALStoreTest::new("./wal_demo1", true);
+ let store = WALStoreAIO::new(&wal_dir, true, recover);
let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
for _ in 0..3 {
test(
@@ -220,7 +47,7 @@ fn main() {
);
}
- let store = WALStoreTest::new("./wal_demo1", false);
+ let store = WALStoreAIO::new(&wal_dir, false, recover);
let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
for _ in 0..3 {
test(
@@ -234,7 +61,7 @@ fn main() {
);
}
- let store = WALStoreTest::new("./wal_demo1", false);
+ let store = WALStoreAIO::new(&wal_dir, false, recover);
let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
for _ in 0..3 {
let mut ids = Vec::new();
diff --git a/src/lib.rs b/src/lib.rs
index 7635dab..906ec41 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,2 +1,171 @@
#[macro_use] extern crate scan_fmt;
pub mod wal;
+
+use async_trait::async_trait;
+use futures::executor::block_on;
+use libaiofut::{new_batch_scheduler, AIOBatchSchedulerIn, AIOManager};
+use libc::off_t;
+use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag};
+use nix::sys::stat::Mode;
+use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags};
+use std::cell::RefCell;
+use std::os::unix::io::RawFd;
+use std::rc::Rc;
+use wal::{WALBytes, WALFile, WALPos, WALRingId, WALStore};
+
+pub struct WALFileAIO {
+ fd: RawFd,
+ aiomgr: Rc<RefCell<AIOManager<AIOBatchSchedulerIn>>>,
+}
+
+impl WALFileAIO {
+ pub fn new(
+ rootfd: RawFd,
+ filename: &str,
+ aiomgr: Rc<RefCell<AIOManager<AIOBatchSchedulerIn>>>,
+ ) -> Result<Self, ()> {
+ openat(
+ rootfd,
+ filename,
+ OFlag::O_CREAT | OFlag::O_RDWR,
+ Mode::S_IRUSR | Mode::S_IWUSR,
+ )
+ .and_then(|fd| Ok(WALFileAIO { fd, aiomgr }))
+ .or_else(|_| Err(()))
+ }
+}
+
+impl Drop for WALFileAIO {
+ fn drop(&mut self) {
+ close(self.fd).unwrap();
+ }
+}
+
+#[async_trait(?Send)]
+impl WALFile for WALFileAIO {
+ async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> {
+ // TODO: is there any async version of fallocate?
+ fallocate(
+ self.fd,
+ FallocateFlags::FALLOC_FL_ZERO_RANGE,
+ offset as off_t,
+ length as off_t,
+ )
+ .and_then(|_| Ok(()))
+ .or_else(|_| Err(()))
+ }
+
+ fn truncate(&self, length: usize) -> Result<(), ()> {
+ ftruncate(self.fd, length as off_t).or_else(|_| Err(()))
+ }
+
+ async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
+ self.aiomgr
+ .borrow_mut()
+ .write(self.fd, offset, data, None)
+ .await
+ .or_else(|_| Err(()))
+ .and_then(|(nwrote, data)| {
+ if nwrote == data.len() {
+ Ok(())
+ } else {
+ Err(())
+ }
+ })
+ }
+
+ fn read(
+ &self,
+ offset: WALPos,
+ length: usize,
+ ) -> Result<Option<WALBytes>, ()> {
+ block_on(self.aiomgr.borrow_mut().read(self.fd, offset, length, None))
+ .or_else(|_| Err(()))
+ .and_then(|(nread, data)| {
+ Ok(if nread == length { Some(data) } else { None })
+ })
+ }
+}
+
+pub struct WALStoreAIO<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> {
+ rootfd: RawFd,
+ rootpath: String,
+ recover_func: RefCell<F>,
+}
+
+impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStoreAIO<F> {
+ pub fn new(wal_dir: &str, truncate: bool, recover_func: F) -> Self {
+ let recover_func = RefCell::new(recover_func);
+ let rootpath = wal_dir.to_string();
+ if truncate {
+ let _ = std::fs::remove_dir_all(wal_dir);
+ }
+ match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) {
+ Err(e) => {
+ if truncate {
+ panic!("error while creating directory: {}", e)
+ }
+ }
+ Ok(_) => (),
+ }
+ let rootfd = match open(
+ wal_dir,
+ OFlag::O_DIRECTORY | OFlag::O_PATH,
+ Mode::empty(),
+ ) {
+ Ok(fd) => fd,
+ Err(_) => panic!("error while opening the WAL directory"),
+ };
+ WALStoreAIO {
+ rootfd,
+ rootpath,
+ recover_func,
+ }
+ }
+}
+
+#[async_trait(?Send)]
+impl<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>> WALStore
+ for WALStoreAIO<F>
+{
+ type FileNameIter = std::vec::IntoIter<String>;
+
+ async fn open_file(
+ &self,
+ filename: &str,
+ _touch: bool,
+ ) -> Result<Box<dyn WALFile>, ()> {
+ let filename = filename.to_string();
+ let aiomgr = Rc::new(RefCell::new(
+ AIOManager::new(new_batch_scheduler(None), 10, None, None)
+ .or(Err(()))?,
+ ));
+ WALFileAIO::new(self.rootfd, &filename, aiomgr.clone())
+ .and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>))
+ }
+
+ async fn remove_file(&self, filename: String) -> Result<(), ()> {
+ unlinkat(
+ Some(self.rootfd),
+ filename.as_str(),
+ UnlinkatFlags::NoRemoveDir,
+ )
+ .or_else(|_| Err(()))
+ }
+
+ fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
+ let mut logfiles = Vec::new();
+ for fname in std::fs::read_dir(&self.rootpath).unwrap() {
+ logfiles.push(fname.unwrap().file_name().into_string().unwrap())
+ }
+ Ok(logfiles.into_iter())
+ }
+
+ fn apply_payload(
+ &self,
+ payload: WALBytes,
+ ringid: WALRingId,
+ ) -> Result<(), ()> {
+ (&mut *self.recover_func.borrow_mut())(payload, ringid)
+ }
+}