aboutsummaryrefslogtreecommitdiff
path: root/examples/demo1.rs
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-06-16 12:40:07 -0400
committerDeterminant <[email protected]>2020-06-16 12:40:07 -0400
commit6b8000d8fd8b88afbc7fb45094da29fb02627ed2 (patch)
tree96d2659772e1c68776cd20893e3609b8a0a22a89 /examples/demo1.rs
parentd0e8ceeb250ce362d7d9bf2c6e5c297c716259cc (diff)
finish the AIO File/Store impl
Diffstat (limited to 'examples/demo1.rs')
-rw-r--r--examples/demo1.rs209
1 files changed, 18 insertions, 191 deletions
diff --git a/examples/demo1.rs b/examples/demo1.rs
index 0c735f8..4a923c6 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -1,195 +1,12 @@
-use async_trait::async_trait;
-use libc::off_t;
-use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag};
-use nix::sys::{
- stat::Mode,
- uio::{pread, pwrite},
+use growthring::{
+ wal::{WALBytes, WALLoader, WALRingId, WALWriter},
+ WALStoreAIO,
};
-use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags};
use rand::{seq::SliceRandom, Rng};
-use std::os::unix::io::RawFd;
-use growthring::wal::{
- WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, WALWriter,
-};
-
-struct WALFileTest {
- filename: String,
- fd: RawFd,
-}
-
-impl WALFileTest {
- fn new(rootfd: RawFd, filename: &str) -> Result<Self, ()> {
- openat(
- rootfd,
- filename,
- OFlag::O_CREAT | OFlag::O_RDWR,
- Mode::S_IRUSR | Mode::S_IWUSR,
- )
- .and_then(|fd| {
- let filename = filename.to_string();
- Ok(WALFileTest { filename, fd })
- })
- .or_else(|_| Err(()))
- }
-}
-
-impl Drop for WALFileTest {
- fn drop(&mut self) {
- close(self.fd).unwrap();
- }
-}
-
-#[async_trait(?Send)]
-impl WALFile for WALFileTest {
- async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> {
- println!(
- "{}.allocate(offset=0x{:x}, end=0x{:x})",
- self.filename,
- offset,
- offset + length as u64
- );
- fallocate(
- self.fd,
- FallocateFlags::FALLOC_FL_ZERO_RANGE,
- offset as off_t,
- length as off_t,
- )
- .and_then(|_| Ok(()))
- .or_else(|_| Err(()))
- }
-
- fn truncate(&self, length: usize) -> Result<(), ()> {
- println!("{}.truncate(length={})", self.filename, length);
- ftruncate(self.fd, length as off_t).or_else(|_| Err(()))
- }
-
- async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
- println!(
- "{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})",
- self.filename,
- offset,
- offset + data.len() as u64,
- hex::encode(&data)
- );
- pwrite(self.fd, &*data, offset as off_t)
- .or_else(|_| Err(()))
- .and_then(|nwrote| {
- if nwrote == data.len() {
- Ok(())
- } else {
- Err(())
- }
- })
- }
-
- fn read(
- &self,
- offset: WALPos,
- length: usize,
- ) -> Result<Option<WALBytes>, ()> {
- let mut buff = Vec::new();
- buff.resize(length, 0);
- pread(self.fd, &mut buff[..], offset as off_t)
- .or_else(|_| Err(()))
- .and_then(|nread| {
- Ok(if nread == length {
- Some(buff.into_boxed_slice())
- } else {
- None
- })
- })
- }
-}
-
-struct WALStoreTest {
- rootfd: RawFd,
- rootpath: String,
-}
-
-impl WALStoreTest {
- fn new(wal_dir: &str, truncate: bool) -> Self {
- let rootpath = wal_dir.to_string();
- if truncate {
- let _ = std::fs::remove_dir_all(wal_dir);
- }
- match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) {
- Err(e) => {
- if truncate {
- panic!("error while creating directory: {}", e)
- }
- }
- Ok(_) => (),
- }
- let rootfd = match open(
- wal_dir,
- OFlag::O_DIRECTORY | OFlag::O_PATH,
- Mode::empty(),
- ) {
- Ok(fd) => fd,
- Err(_) => panic!("error while opening the DB"),
- };
- WALStoreTest { rootfd, rootpath }
- }
-}
-
-impl Drop for WALStoreTest {
- fn drop(&mut self) {
- close(self.rootfd).unwrap();
- }
-}
-
-#[async_trait(?Send)]
-impl WALStore for WALStoreTest {
- type FileNameIter = std::vec::IntoIter<String>;
-
- async fn open_file(
- &self,
- filename: &str,
- touch: bool,
- ) -> Result<Box<dyn WALFile>, ()> {
- println!("open_file(filename={}, touch={})", filename, touch);
- let filename = filename.to_string();
- WALFileTest::new(self.rootfd, &filename)
- .and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>))
- }
-
- async fn remove_file(&self, filename: String) -> Result<(), ()> {
- println!("remove_file(filename={})", filename);
- unlinkat(
- Some(self.rootfd),
- filename.as_str(),
- UnlinkatFlags::NoRemoveDir,
- )
- .or_else(|_| Err(()))
- }
-
- fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
- println!("enumerate_files()");
- let mut logfiles = Vec::new();
- for fname in std::fs::read_dir(&self.rootpath).unwrap() {
- logfiles.push(fname.unwrap().file_name().into_string().unwrap())
- }
- Ok(logfiles.into_iter())
- }
-
- fn apply_payload(
- &self,
- payload: WALBytes,
- ringid: WALRingId,
- ) -> Result<(), ()> {
- println!(
- "apply_payload(payload={}, ringid={:?})",
- std::str::from_utf8(&payload).unwrap(),
- ringid
- );
- Ok(())
- }
-}
-
-fn test(
+fn test<F: FnMut(WALBytes, WALRingId) -> Result<(), ()>>(
records: Vec<String>,
- wal: &mut WALWriter<WALStoreTest>,
+ wal: &mut WALWriter<WALStoreAIO<F>>,
) -> Vec<WALRingId> {
let mut res = Vec::new();
for r in wal.grow(records).into_iter() {
@@ -200,9 +17,19 @@ fn test(
res
}
+fn recover(payload: WALBytes, ringid: WALRingId) -> Result<(), ()> {
+ println!(
+ "recover(payload={}, ringid={:?}",
+ std::str::from_utf8(&payload).unwrap(),
+ ringid
+ );
+ Ok(())
+}
+
fn main() {
+ let wal_dir = "./wal_demo1";
let mut rng = rand::thread_rng();
- let store = WALStoreTest::new("./wal_demo1", true);
+ let store = WALStoreAIO::new(&wal_dir, true, recover);
let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
for _ in 0..3 {
test(
@@ -220,7 +47,7 @@ fn main() {
);
}
- let store = WALStoreTest::new("./wal_demo1", false);
+ let store = WALStoreAIO::new(&wal_dir, false, recover);
let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
for _ in 0..3 {
test(
@@ -234,7 +61,7 @@ fn main() {
);
}
- let store = WALStoreTest::new("./wal_demo1", false);
+ let store = WALStoreAIO::new(&wal_dir, false, recover);
let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
for _ in 0..3 {
let mut ids = Vec::new();