From e39324e62ab4e09fb0dfc7784519e77fedca65cb Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 12 Jun 2020 16:41:19 -0400 Subject: ... --- examples/demo1.rs | 179 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 134 insertions(+), 45 deletions(-) (limited to 'examples/demo1.rs') diff --git a/examples/demo1.rs b/examples/demo1.rs index 625f40d..47d1423 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -1,11 +1,17 @@ -use std::os::unix::io::RawFd; -use nix::unistd::{close, mkdir, unlinkat, UnlinkatFlags, ftruncate}; -use nix::fcntl::{open, openat, OFlag, fallocate, FallocateFlags}; -use nix::sys::{stat::Mode, uio::{pwrite, pread}}; -use rand::{Rng, seq::SliceRandom}; +use async_trait::async_trait; use libc::off_t; +use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag}; +use nix::sys::{ + stat::Mode, + uio::{pread, pwrite}, +}; +use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags}; +use rand::{seq::SliceRandom, Rng}; +use std::os::unix::io::RawFd; -use growthring::wal::{WALFile, WALStore, WALPos, WALBytes, WALLoader, WALWriter, WALRingId}; +use growthring::wal::{ + WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, WALWriter, +}; struct WALFileTest { filename: String, @@ -14,15 +20,17 @@ struct WALFileTest { impl WALFileTest { fn new(rootfd: RawFd, filename: &str) -> Result { - openat(rootfd, filename, + openat( + rootfd, + filename, OFlag::O_CREAT | OFlag::O_RDWR, - Mode::S_IRUSR | Mode::S_IWUSR).and_then(|fd| { + Mode::S_IRUSR | Mode::S_IWUSR, + ) + .and_then(|fd| { let filename = filename.to_string(); - Ok (WALFileTest { - filename, - fd, - }) - }).or_else(|_| Err(())) + Ok(WALFileTest { filename, fd }) + }) + .or_else(|_| Err(())) } } @@ -32,12 +40,23 @@ impl Drop for WALFileTest { } } +#[async_trait(?Send)] impl WALFile for WALFileTest { - fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> { - println!("{}.allocate(offset=0x{:x}, end=0x{:x})", self.filename, offset, offset + length as u64); - fallocate(self.fd, - FallocateFlags::FALLOC_FL_ZERO_RANGE, - offset as off_t, length as off_t).and_then(|_| Ok(())).or_else(|_| Err(())) + async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> { + println!( + "{}.allocate(offset=0x{:x}, end=0x{:x})", + self.filename, + offset, + offset + length as u64 + ); + fallocate( + self.fd, + FallocateFlags::FALLOC_FL_ZERO_RANGE, + offset as off_t, + length as off_t, + ) + .and_then(|_| Ok(())) + .or_else(|_| Err(())) } fn truncate(&self, length: usize) -> Result<(), ()> { @@ -45,26 +64,47 @@ impl WALFile for WALFileTest { ftruncate(self.fd, length as off_t).or_else(|_| Err(())) } - fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { - println!("{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})", - self.filename, offset, offset + data.len() as u64, hex::encode(&data)); + async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { + println!( + "{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})", + self.filename, + offset, + offset + data.len() as u64, + hex::encode(&data) + ); pwrite(self.fd, &*data, offset as off_t) .or_else(|_| Err(())) - .and_then(|nwrote| if nwrote == data.len() { Ok(()) } else { Err(()) }) + .and_then(|nwrote| { + if nwrote == data.len() { + Ok(()) + } else { + Err(()) + } + }) } - fn read(&self, offset: WALPos, length: usize) -> Result, ()> { + fn read( + &self, + offset: WALPos, + length: usize, + ) -> Result, ()> { let mut buff = Vec::new(); buff.resize(length, 0); pread(self.fd, &mut buff[..], offset as off_t) .or_else(|_| Err(())) - .and_then(|nread| Ok(if nread == length {Some(buff.into_boxed_slice())} else {None})) + .and_then(|nread| { + Ok(if nread == length { + Some(buff.into_boxed_slice()) + } else { + None + }) + }) } } struct WALStoreTest { rootfd: RawFd, - rootpath: String + rootpath: String, } impl WALStoreTest { @@ -74,12 +114,20 @@ impl WALStoreTest { let _ = std::fs::remove_dir_all(wal_dir); } match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) { - Err(e) => if truncate { panic!("error while creating directory: {}", e) }, - Ok(_) => () + Err(e) => { + if truncate { + panic!("error while creating directory: {}", e) + } + } + Ok(_) => (), } - let rootfd = match open(wal_dir, OFlag::O_DIRECTORY | OFlag::O_PATH, Mode::empty()) { + let rootfd = match open( + wal_dir, + OFlag::O_DIRECTORY | OFlag::O_PATH, + Mode::empty(), + ) { Ok(fd) => fd, - Err(_) => panic!("error while opening the DB") + Err(_) => panic!("error while opening the DB"), }; WALStoreTest { rootfd, rootpath } } @@ -95,15 +143,25 @@ impl Drop for WALStoreTest { impl WALStore for WALStoreTest { type FileNameIter = std::vec::IntoIter; - async fn open_file(&self, filename: &str, touch: bool) -> Result, ()> { + async fn open_file( + &self, + filename: &str, + touch: bool, + ) -> Result, ()> { println!("open_file(filename={}, touch={})", filename, touch); let filename = filename.to_string(); - WALFileTest::new(self.rootfd, &filename).and_then(|f| Ok(Box::new(f) as Box)) + WALFileTest::new(self.rootfd, &filename) + .and_then(|f| Ok(Box::new(f) as Box)) } async fn remove_file(&self, filename: String) -> Result<(), ()> { println!("remove_file(filename={})", filename); - unlinkat(Some(self.rootfd), &filename, UnlinkatFlags::NoRemoveDir).or_else(|_| Err(())) + unlinkat( + Some(self.rootfd), + filename.as_str(), + UnlinkatFlags::NoRemoveDir, + ) + .or_else(|_| Err(())) } fn enumerate_files(&self) -> Result { @@ -115,21 +173,35 @@ impl WALStore for WALStoreTest { Ok(logfiles.into_iter()) } - fn apply_payload(&self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()> { - println!("apply_payload(payload={}, ringid={:?})", - std::str::from_utf8(&payload).unwrap(), - ringid); + fn apply_payload( + &self, + payload: WALBytes, + ringid: WALRingId, + ) -> Result<(), ()> { + println!( + "apply_payload(payload={}, ringid={:?})", + std::str::from_utf8(&payload).unwrap(), + ringid + ); Ok(()) } } -fn test(records: Vec, wal: &mut WALWriter) -> Box<[WALRingId]> { - let records: Vec = records.into_iter().map(|s| s.into_bytes().into_boxed_slice()).collect(); - let ret = wal.grow(&records).unwrap(); - for ring_id in ret.iter() { +fn test( + records: Vec, + wal: &mut WALWriter, +) -> Vec { + let records: Vec = records + .into_iter() + .map(|s| s.into_bytes().into_boxed_slice()) + .collect(); + let mut res = Vec::new(); + for r in wal.grow(&records).into_iter() { + let ring_id = futures::executor::block_on(r).unwrap(); println!("got ring id: {:?}", ring_id); + res.push(ring_id); } - ret + res } fn main() { @@ -137,16 +209,33 @@ fn main() { let store = WALStoreTest::new("./wal_demo1", true); let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { - test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::>(), &mut wal); + test( + ["hi", "hello", "lol"] + .iter() + .map(|s| s.to_string()) + .collect::>(), + &mut wal, + ); } for _ in 0..3 { - test(vec!["a".repeat(10), "b".repeat(100), "c".repeat(1000)], &mut wal); + test( + vec!["a".repeat(10), "b".repeat(100), "c".repeat(1000)], + &mut wal, + ); } let store = WALStoreTest::new("./wal_demo1", false); let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { - test(vec!["a".repeat(10), "b".repeat(100), "c".repeat(300), "d".repeat(400)], &mut wal); + test( + vec![ + "a".repeat(10), + "b".repeat(100), + "c".repeat(300), + "d".repeat(400), + ], + &mut wal, + ); } let store = WALStoreTest::new("./wal_demo1", false); @@ -165,7 +254,7 @@ fn main() { ids.shuffle(&mut rng); for e in ids.chunks(20) { println!("peel(20)"); - wal.peel(e); + futures::executor::block_on(wal.peel(e)).unwrap(); } } } -- cgit v1.2.3-70-g09d2