From e39324e62ab4e09fb0dfc7784519e77fedca65cb Mon Sep 17 00:00:00 2001 From: Determinant Date: Fri, 12 Jun 2020 16:41:19 -0400 Subject: ... --- examples/demo1.rs | 179 ++++++++++++++++++++++++++++++++++++++++-------------- src/lib.rs | 3 +- src/wal.rs | 41 ++++++++----- 3 files changed, 160 insertions(+), 63 deletions(-) diff --git a/examples/demo1.rs b/examples/demo1.rs index 625f40d..47d1423 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -1,11 +1,17 @@ -use std::os::unix::io::RawFd; -use nix::unistd::{close, mkdir, unlinkat, UnlinkatFlags, ftruncate}; -use nix::fcntl::{open, openat, OFlag, fallocate, FallocateFlags}; -use nix::sys::{stat::Mode, uio::{pwrite, pread}}; -use rand::{Rng, seq::SliceRandom}; +use async_trait::async_trait; use libc::off_t; +use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag}; +use nix::sys::{ + stat::Mode, + uio::{pread, pwrite}, +}; +use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags}; +use rand::{seq::SliceRandom, Rng}; +use std::os::unix::io::RawFd; -use growthring::wal::{WALFile, WALStore, WALPos, WALBytes, WALLoader, WALWriter, WALRingId}; +use growthring::wal::{ + WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, WALWriter, +}; struct WALFileTest { filename: String, @@ -14,15 +20,17 @@ struct WALFileTest { impl WALFileTest { fn new(rootfd: RawFd, filename: &str) -> Result { - openat(rootfd, filename, + openat( + rootfd, + filename, OFlag::O_CREAT | OFlag::O_RDWR, - Mode::S_IRUSR | Mode::S_IWUSR).and_then(|fd| { + Mode::S_IRUSR | Mode::S_IWUSR, + ) + .and_then(|fd| { let filename = filename.to_string(); - Ok (WALFileTest { - filename, - fd, - }) - }).or_else(|_| Err(())) + Ok(WALFileTest { filename, fd }) + }) + .or_else(|_| Err(())) } } @@ -32,12 +40,23 @@ impl Drop for WALFileTest { } } +#[async_trait(?Send)] impl WALFile for WALFileTest { - fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> { - println!("{}.allocate(offset=0x{:x}, end=0x{:x})", self.filename, offset, offset + length as u64); - fallocate(self.fd, - FallocateFlags::FALLOC_FL_ZERO_RANGE, - offset as off_t, length as off_t).and_then(|_| Ok(())).or_else(|_| Err(())) + async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> { + println!( + "{}.allocate(offset=0x{:x}, end=0x{:x})", + self.filename, + offset, + offset + length as u64 + ); + fallocate( + self.fd, + FallocateFlags::FALLOC_FL_ZERO_RANGE, + offset as off_t, + length as off_t, + ) + .and_then(|_| Ok(())) + .or_else(|_| Err(())) } fn truncate(&self, length: usize) -> Result<(), ()> { @@ -45,26 +64,47 @@ impl WALFile for WALFileTest { ftruncate(self.fd, length as off_t).or_else(|_| Err(())) } - fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { - println!("{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})", - self.filename, offset, offset + data.len() as u64, hex::encode(&data)); + async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> { + println!( + "{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})", + self.filename, + offset, + offset + data.len() as u64, + hex::encode(&data) + ); pwrite(self.fd, &*data, offset as off_t) .or_else(|_| Err(())) - .and_then(|nwrote| if nwrote == data.len() { Ok(()) } else { Err(()) }) + .and_then(|nwrote| { + if nwrote == data.len() { + Ok(()) + } else { + Err(()) + } + }) } - fn read(&self, offset: WALPos, length: usize) -> Result, ()> { + fn read( + &self, + offset: WALPos, + length: usize, + ) -> Result, ()> { let mut buff = Vec::new(); buff.resize(length, 0); pread(self.fd, &mut buff[..], offset as off_t) .or_else(|_| Err(())) - .and_then(|nread| Ok(if nread == length {Some(buff.into_boxed_slice())} else {None})) + .and_then(|nread| { + Ok(if nread == length { + Some(buff.into_boxed_slice()) + } else { + None + }) + }) } } struct WALStoreTest { rootfd: RawFd, - rootpath: String + rootpath: String, } impl WALStoreTest { @@ -74,12 +114,20 @@ impl WALStoreTest { let _ = std::fs::remove_dir_all(wal_dir); } match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) { - Err(e) => if truncate { panic!("error while creating directory: {}", e) }, - Ok(_) => () + Err(e) => { + if truncate { + panic!("error while creating directory: {}", e) + } + } + Ok(_) => (), } - let rootfd = match open(wal_dir, OFlag::O_DIRECTORY | OFlag::O_PATH, Mode::empty()) { + let rootfd = match open( + wal_dir, + OFlag::O_DIRECTORY | OFlag::O_PATH, + Mode::empty(), + ) { Ok(fd) => fd, - Err(_) => panic!("error while opening the DB") + Err(_) => panic!("error while opening the DB"), }; WALStoreTest { rootfd, rootpath } } @@ -95,15 +143,25 @@ impl Drop for WALStoreTest { impl WALStore for WALStoreTest { type FileNameIter = std::vec::IntoIter; - async fn open_file(&self, filename: &str, touch: bool) -> Result, ()> { + async fn open_file( + &self, + filename: &str, + touch: bool, + ) -> Result, ()> { println!("open_file(filename={}, touch={})", filename, touch); let filename = filename.to_string(); - WALFileTest::new(self.rootfd, &filename).and_then(|f| Ok(Box::new(f) as Box)) + WALFileTest::new(self.rootfd, &filename) + .and_then(|f| Ok(Box::new(f) as Box)) } async fn remove_file(&self, filename: String) -> Result<(), ()> { println!("remove_file(filename={})", filename); - unlinkat(Some(self.rootfd), &filename, UnlinkatFlags::NoRemoveDir).or_else(|_| Err(())) + unlinkat( + Some(self.rootfd), + filename.as_str(), + UnlinkatFlags::NoRemoveDir, + ) + .or_else(|_| Err(())) } fn enumerate_files(&self) -> Result { @@ -115,21 +173,35 @@ impl WALStore for WALStoreTest { Ok(logfiles.into_iter()) } - fn apply_payload(&self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()> { - println!("apply_payload(payload={}, ringid={:?})", - std::str::from_utf8(&payload).unwrap(), - ringid); + fn apply_payload( + &self, + payload: WALBytes, + ringid: WALRingId, + ) -> Result<(), ()> { + println!( + "apply_payload(payload={}, ringid={:?})", + std::str::from_utf8(&payload).unwrap(), + ringid + ); Ok(()) } } -fn test(records: Vec, wal: &mut WALWriter) -> Box<[WALRingId]> { - let records: Vec = records.into_iter().map(|s| s.into_bytes().into_boxed_slice()).collect(); - let ret = wal.grow(&records).unwrap(); - for ring_id in ret.iter() { +fn test( + records: Vec, + wal: &mut WALWriter, +) -> Vec { + let records: Vec = records + .into_iter() + .map(|s| s.into_bytes().into_boxed_slice()) + .collect(); + let mut res = Vec::new(); + for r in wal.grow(&records).into_iter() { + let ring_id = futures::executor::block_on(r).unwrap(); println!("got ring id: {:?}", ring_id); + res.push(ring_id); } - ret + res } fn main() { @@ -137,16 +209,33 @@ fn main() { let store = WALStoreTest::new("./wal_demo1", true); let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { - test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::>(), &mut wal); + test( + ["hi", "hello", "lol"] + .iter() + .map(|s| s.to_string()) + .collect::>(), + &mut wal, + ); } for _ in 0..3 { - test(vec!["a".repeat(10), "b".repeat(100), "c".repeat(1000)], &mut wal); + test( + vec!["a".repeat(10), "b".repeat(100), "c".repeat(1000)], + &mut wal, + ); } let store = WALStoreTest::new("./wal_demo1", false); let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap(); for _ in 0..3 { - test(vec!["a".repeat(10), "b".repeat(100), "c".repeat(300), "d".repeat(400)], &mut wal); + test( + vec![ + "a".repeat(10), + "b".repeat(100), + "c".repeat(300), + "d".repeat(400), + ], + &mut wal, + ); } let store = WALStoreTest::new("./wal_demo1", false); @@ -165,7 +254,7 @@ fn main() { ids.shuffle(&mut rng); for e in ids.chunks(20) { println!("peel(20)"); - wal.peel(e); + futures::executor::block_on(wal.peel(e)).unwrap(); } } } diff --git a/src/lib.rs b/src/lib.rs index 1db80e8..7635dab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,2 @@ -#[macro_use] -extern crate scan_fmt; +#[macro_use] extern crate scan_fmt; pub mod wal; diff --git a/src/wal.rs b/src/wal.rs index 8d92d01..ff76bdb 100644 --- a/src/wal.rs +++ b/src/wal.rs @@ -24,8 +24,8 @@ struct WALRingBlob { // payload follows } +type WALFileId = u64; pub type WALBytes = Box<[u8]>; -pub type WALFileId = u64; pub type WALPos = u64; #[derive(Eq, PartialEq, Copy, Clone, Debug, Hash)] @@ -77,12 +77,12 @@ struct WALState { pub trait WALFile { /// Initialize the file space in [offset, offset + length) to zero. async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>; - /// Write data with offset. We assume all previous `allocate/truncate` invocations are visible - /// if ordered earlier (should be guaranteed by most OS). Additionally, the final write caused + /// Write data with offset. We assume all previous `allocate`/`truncate` invocations are visible + /// if ordered earlier (should be guaranteed by most OS). Additionally, the write caused /// by each invocation of this function should be _atomic_ (the entire single write should be /// all or nothing). async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>; - /// Read data with offset. Return Ok(None) when it reaches EOF. + /// Read data with offset. Return `Ok(None)` when it reaches EOF. fn read( &self, offset: WALPos, @@ -259,7 +259,11 @@ impl WALFilePool { res } - fn remove_files<'a>(&'a mut self, fid_s: u64, fid_e: u64) -> impl Future> + 'a { + fn remove_files<'a>( + &'a mut self, + fid_s: u64, + fid_e: u64, + ) -> impl Future> + 'a { let last_peel = unsafe { std::mem::replace( &mut *self.last_peel.get(), @@ -279,11 +283,12 @@ impl WALFilePool { r.await? } Ok(()) - }.shared(); + } + .shared(); unsafe { - (*self.last_peel.get()) = MaybeUninit::new( - std::mem::transmute( - Box::pin(p.clone()) as Pin + 'a>>)) + (*self.last_peel.get()) = + MaybeUninit::new(std::mem::transmute(Box::pin(p.clone()) + as Pin + 'a>>)) } p } @@ -316,9 +321,11 @@ impl WALWriter { } } - /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the - /// function returns. The caller then has the knowledge of WAL writes so it should defer - /// actual data writes after WAL writes. + /// Submit a sequence of records to WAL. It returns a vector of futures, each of which + /// corresponds to one record. When a future resolves to `WALRingId`, it is guaranteed the + /// record is already logged. Then, after finalizing the changes encoded by that record to + /// the persistent storage, the caller can recycle the WAL files by invoking the given + /// `peel` with the given `WALRingId`s. pub fn grow<'a, T: AsRef<[WALBytes]>>( &'a mut self, records: T, @@ -455,8 +462,9 @@ impl WALWriter { res } - /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are - /// complete so that it could automatically remove obsolete WAL files. + /// Inform the `WALWriter` that some data writes are complete so that it could automatically + /// remove obsolete WAL files. The given list of `WALRingId` does not need to be ordered and + /// could be of arbitrary length. pub fn peel<'a, T: AsRef<[WALRingId]>>( &'a mut self, records: T, @@ -468,7 +476,8 @@ impl WALWriter { state.io_complete.push(*rec); } let orig_fid = state.first_fid; - while let Some(s) = state.io_complete.peek().and_then(|&e| Some(e.start)) + while let Some(s) = + state.io_complete.peek().and_then(|&e| Some(e.start)) { if s != state.next_complete { break; @@ -509,7 +518,7 @@ impl WALLoader { } } - /// Recover by reading the WAL log files. + /// Recover by reading the WAL files. pub fn recover(self, store: F) -> Result, ()> { let mut file_pool = WALFilePool::new( store, -- cgit v1.2.3