summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/demo1.rs179
-rw-r--r--src/lib.rs3
-rw-r--r--src/wal.rs41
3 files changed, 160 insertions, 63 deletions
diff --git a/examples/demo1.rs b/examples/demo1.rs
index 625f40d..47d1423 100644
--- a/examples/demo1.rs
+++ b/examples/demo1.rs
@@ -1,11 +1,17 @@
-use std::os::unix::io::RawFd;
-use nix::unistd::{close, mkdir, unlinkat, UnlinkatFlags, ftruncate};
-use nix::fcntl::{open, openat, OFlag, fallocate, FallocateFlags};
-use nix::sys::{stat::Mode, uio::{pwrite, pread}};
-use rand::{Rng, seq::SliceRandom};
+use async_trait::async_trait;
use libc::off_t;
+use nix::fcntl::{fallocate, open, openat, FallocateFlags, OFlag};
+use nix::sys::{
+ stat::Mode,
+ uio::{pread, pwrite},
+};
+use nix::unistd::{close, ftruncate, mkdir, unlinkat, UnlinkatFlags};
+use rand::{seq::SliceRandom, Rng};
+use std::os::unix::io::RawFd;
-use growthring::wal::{WALFile, WALStore, WALPos, WALBytes, WALLoader, WALWriter, WALRingId};
+use growthring::wal::{
+ WALBytes, WALFile, WALLoader, WALPos, WALRingId, WALStore, WALWriter,
+};
struct WALFileTest {
filename: String,
@@ -14,15 +20,17 @@ struct WALFileTest {
impl WALFileTest {
fn new(rootfd: RawFd, filename: &str) -> Result<Self, ()> {
- openat(rootfd, filename,
+ openat(
+ rootfd,
+ filename,
OFlag::O_CREAT | OFlag::O_RDWR,
- Mode::S_IRUSR | Mode::S_IWUSR).and_then(|fd| {
+ Mode::S_IRUSR | Mode::S_IWUSR,
+ )
+ .and_then(|fd| {
let filename = filename.to_string();
- Ok (WALFileTest {
- filename,
- fd,
- })
- }).or_else(|_| Err(()))
+ Ok(WALFileTest { filename, fd })
+ })
+ .or_else(|_| Err(()))
}
}
@@ -32,12 +40,23 @@ impl Drop for WALFileTest {
}
}
+#[async_trait(?Send)]
impl WALFile for WALFileTest {
- fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> {
- println!("{}.allocate(offset=0x{:x}, end=0x{:x})", self.filename, offset, offset + length as u64);
- fallocate(self.fd,
- FallocateFlags::FALLOC_FL_ZERO_RANGE,
- offset as off_t, length as off_t).and_then(|_| Ok(())).or_else(|_| Err(()))
+ async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> {
+ println!(
+ "{}.allocate(offset=0x{:x}, end=0x{:x})",
+ self.filename,
+ offset,
+ offset + length as u64
+ );
+ fallocate(
+ self.fd,
+ FallocateFlags::FALLOC_FL_ZERO_RANGE,
+ offset as off_t,
+ length as off_t,
+ )
+ .and_then(|_| Ok(()))
+ .or_else(|_| Err(()))
}
fn truncate(&self, length: usize) -> Result<(), ()> {
@@ -45,26 +64,47 @@ impl WALFile for WALFileTest {
ftruncate(self.fd, length as off_t).or_else(|_| Err(()))
}
- fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
- println!("{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})",
- self.filename, offset, offset + data.len() as u64, hex::encode(&data));
+ async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()> {
+ println!(
+ "{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})",
+ self.filename,
+ offset,
+ offset + data.len() as u64,
+ hex::encode(&data)
+ );
pwrite(self.fd, &*data, offset as off_t)
.or_else(|_| Err(()))
- .and_then(|nwrote| if nwrote == data.len() { Ok(()) } else { Err(()) })
+ .and_then(|nwrote| {
+ if nwrote == data.len() {
+ Ok(())
+ } else {
+ Err(())
+ }
+ })
}
- fn read(&self, offset: WALPos, length: usize) -> Result<Option<WALBytes>, ()> {
+ fn read(
+ &self,
+ offset: WALPos,
+ length: usize,
+ ) -> Result<Option<WALBytes>, ()> {
let mut buff = Vec::new();
buff.resize(length, 0);
pread(self.fd, &mut buff[..], offset as off_t)
.or_else(|_| Err(()))
- .and_then(|nread| Ok(if nread == length {Some(buff.into_boxed_slice())} else {None}))
+ .and_then(|nread| {
+ Ok(if nread == length {
+ Some(buff.into_boxed_slice())
+ } else {
+ None
+ })
+ })
}
}
struct WALStoreTest {
rootfd: RawFd,
- rootpath: String
+ rootpath: String,
}
impl WALStoreTest {
@@ -74,12 +114,20 @@ impl WALStoreTest {
let _ = std::fs::remove_dir_all(wal_dir);
}
match mkdir(wal_dir, Mode::S_IRUSR | Mode::S_IWUSR | Mode::S_IXUSR) {
- Err(e) => if truncate { panic!("error while creating directory: {}", e) },
- Ok(_) => ()
+ Err(e) => {
+ if truncate {
+ panic!("error while creating directory: {}", e)
+ }
+ }
+ Ok(_) => (),
}
- let rootfd = match open(wal_dir, OFlag::O_DIRECTORY | OFlag::O_PATH, Mode::empty()) {
+ let rootfd = match open(
+ wal_dir,
+ OFlag::O_DIRECTORY | OFlag::O_PATH,
+ Mode::empty(),
+ ) {
Ok(fd) => fd,
- Err(_) => panic!("error while opening the DB")
+ Err(_) => panic!("error while opening the DB"),
};
WALStoreTest { rootfd, rootpath }
}
@@ -95,15 +143,25 @@ impl Drop for WALStoreTest {
impl WALStore for WALStoreTest {
type FileNameIter = std::vec::IntoIter<String>;
- async fn open_file(&self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()> {
+ async fn open_file(
+ &self,
+ filename: &str,
+ touch: bool,
+ ) -> Result<Box<dyn WALFile>, ()> {
println!("open_file(filename={}, touch={})", filename, touch);
let filename = filename.to_string();
- WALFileTest::new(self.rootfd, &filename).and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>))
+ WALFileTest::new(self.rootfd, &filename)
+ .and_then(|f| Ok(Box::new(f) as Box<dyn WALFile>))
}
async fn remove_file(&self, filename: String) -> Result<(), ()> {
println!("remove_file(filename={})", filename);
- unlinkat(Some(self.rootfd), &filename, UnlinkatFlags::NoRemoveDir).or_else(|_| Err(()))
+ unlinkat(
+ Some(self.rootfd),
+ filename.as_str(),
+ UnlinkatFlags::NoRemoveDir,
+ )
+ .or_else(|_| Err(()))
}
fn enumerate_files(&self) -> Result<Self::FileNameIter, ()> {
@@ -115,21 +173,35 @@ impl WALStore for WALStoreTest {
Ok(logfiles.into_iter())
}
- fn apply_payload(&self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()> {
- println!("apply_payload(payload={}, ringid={:?})",
- std::str::from_utf8(&payload).unwrap(),
- ringid);
+ fn apply_payload(
+ &self,
+ payload: WALBytes,
+ ringid: WALRingId,
+ ) -> Result<(), ()> {
+ println!(
+ "apply_payload(payload={}, ringid={:?})",
+ std::str::from_utf8(&payload).unwrap(),
+ ringid
+ );
Ok(())
}
}
-fn test(records: Vec<String>, wal: &mut WALWriter<WALStoreTest>) -> Box<[WALRingId]> {
- let records: Vec<WALBytes> = records.into_iter().map(|s| s.into_bytes().into_boxed_slice()).collect();
- let ret = wal.grow(&records).unwrap();
- for ring_id in ret.iter() {
+fn test(
+ records: Vec<String>,
+ wal: &mut WALWriter<WALStoreTest>,
+) -> Vec<WALRingId> {
+ let records: Vec<WALBytes> = records
+ .into_iter()
+ .map(|s| s.into_bytes().into_boxed_slice())
+ .collect();
+ let mut res = Vec::new();
+ for r in wal.grow(&records).into_iter() {
+ let ring_id = futures::executor::block_on(r).unwrap();
println!("got ring id: {:?}", ring_id);
+ res.push(ring_id);
}
- ret
+ res
}
fn main() {
@@ -137,16 +209,33 @@ fn main() {
let store = WALStoreTest::new("./wal_demo1", true);
let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
for _ in 0..3 {
- test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::<Vec<String>>(), &mut wal);
+ test(
+ ["hi", "hello", "lol"]
+ .iter()
+ .map(|s| s.to_string())
+ .collect::<Vec<String>>(),
+ &mut wal,
+ );
}
for _ in 0..3 {
- test(vec!["a".repeat(10), "b".repeat(100), "c".repeat(1000)], &mut wal);
+ test(
+ vec!["a".repeat(10), "b".repeat(100), "c".repeat(1000)],
+ &mut wal,
+ );
}
let store = WALStoreTest::new("./wal_demo1", false);
let mut wal = WALLoader::new(9, 8, 1000).recover(store).unwrap();
for _ in 0..3 {
- test(vec!["a".repeat(10), "b".repeat(100), "c".repeat(300), "d".repeat(400)], &mut wal);
+ test(
+ vec![
+ "a".repeat(10),
+ "b".repeat(100),
+ "c".repeat(300),
+ "d".repeat(400),
+ ],
+ &mut wal,
+ );
}
let store = WALStoreTest::new("./wal_demo1", false);
@@ -165,7 +254,7 @@ fn main() {
ids.shuffle(&mut rng);
for e in ids.chunks(20) {
println!("peel(20)");
- wal.peel(e);
+ futures::executor::block_on(wal.peel(e)).unwrap();
}
}
}
diff --git a/src/lib.rs b/src/lib.rs
index 1db80e8..7635dab 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,3 +1,2 @@
-#[macro_use]
-extern crate scan_fmt;
+#[macro_use] extern crate scan_fmt;
pub mod wal;
diff --git a/src/wal.rs b/src/wal.rs
index 8d92d01..ff76bdb 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -24,8 +24,8 @@ struct WALRingBlob {
// payload follows
}
+type WALFileId = u64;
pub type WALBytes = Box<[u8]>;
-pub type WALFileId = u64;
pub type WALPos = u64;
#[derive(Eq, PartialEq, Copy, Clone, Debug, Hash)]
@@ -77,12 +77,12 @@ struct WALState {
pub trait WALFile {
/// Initialize the file space in [offset, offset + length) to zero.
async fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
- /// Write data with offset. We assume all previous `allocate/truncate` invocations are visible
- /// if ordered earlier (should be guaranteed by most OS). Additionally, the final write caused
+ /// Write data with offset. We assume all previous `allocate`/`truncate` invocations are visible
+ /// if ordered earlier (should be guaranteed by most OS). Additionally, the write caused
/// by each invocation of this function should be _atomic_ (the entire single write should be
/// all or nothing).
async fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
- /// Read data with offset. Return Ok(None) when it reaches EOF.
+ /// Read data with offset. Return `Ok(None)` when it reaches EOF.
fn read(
&self,
offset: WALPos,
@@ -259,7 +259,11 @@ impl<F: WALStore> WALFilePool<F> {
res
}
- fn remove_files<'a>(&'a mut self, fid_s: u64, fid_e: u64) -> impl Future<Output = Result<(), ()>> + 'a {
+ fn remove_files<'a>(
+ &'a mut self,
+ fid_s: u64,
+ fid_e: u64,
+ ) -> impl Future<Output = Result<(), ()>> + 'a {
let last_peel = unsafe {
std::mem::replace(
&mut *self.last_peel.get(),
@@ -279,11 +283,12 @@ impl<F: WALStore> WALFilePool<F> {
r.await?
}
Ok(())
- }.shared();
+ }
+ .shared();
unsafe {
- (*self.last_peel.get()) = MaybeUninit::new(
- std::mem::transmute(
- Box::pin(p.clone()) as Pin<Box<dyn Future<Output = _> + 'a>>))
+ (*self.last_peel.get()) =
+ MaybeUninit::new(std::mem::transmute(Box::pin(p.clone())
+ as Pin<Box<dyn Future<Output = _> + 'a>>))
}
p
}
@@ -316,9 +321,11 @@ impl<F: WALStore> WALWriter<F> {
}
}
- /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
- /// function returns. The caller then has the knowledge of WAL writes so it should defer
- /// actual data writes after WAL writes.
+ /// Submit a sequence of records to WAL. It returns a vector of futures, each of which
+ /// corresponds to one record. When a future resolves to `WALRingId`, it is guaranteed the
+ /// record is already logged. Then, after finalizing the changes encoded by that record to
+ /// the persistent storage, the caller can recycle the WAL files by invoking the given
+ /// `peel` with the given `WALRingId`s.
pub fn grow<'a, T: AsRef<[WALBytes]>>(
&'a mut self,
records: T,
@@ -455,8 +462,9 @@ impl<F: WALStore> WALWriter<F> {
res
}
- /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
- /// complete so that it could automatically remove obsolete WAL files.
+ /// Inform the `WALWriter` that some data writes are complete so that it could automatically
+ /// remove obsolete WAL files. The given list of `WALRingId` does not need to be ordered and
+ /// could be of arbitrary length.
pub fn peel<'a, T: AsRef<[WALRingId]>>(
&'a mut self,
records: T,
@@ -468,7 +476,8 @@ impl<F: WALStore> WALWriter<F> {
state.io_complete.push(*rec);
}
let orig_fid = state.first_fid;
- while let Some(s) = state.io_complete.peek().and_then(|&e| Some(e.start))
+ while let Some(s) =
+ state.io_complete.peek().and_then(|&e| Some(e.start))
{
if s != state.next_complete {
break;
@@ -509,7 +518,7 @@ impl WALLoader {
}
}
- /// Recover by reading the WAL log files.
+ /// Recover by reading the WAL files.
pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> {
let mut file_pool = WALFilePool::new(
store,