From 895c761cc46f48907dc8442297f84c8959692b49 Mon Sep 17 00:00:00 2001 From: Determinant Date: Wed, 10 Jun 2020 01:32:16 -0400 Subject: basic WAL functionality works --- examples/demo1.rs | 57 +++++++++++++++++++++++++++++++------------- src/lib.rs | 70 ++++++++++++++++++++++++++++--------------------------- 2 files changed, 77 insertions(+), 50 deletions(-) diff --git a/examples/demo1.rs b/examples/demo1.rs index 536aba2..d1d488c 100644 --- a/examples/demo1.rs +++ b/examples/demo1.rs @@ -1,13 +1,11 @@ -use growthring::{WALFile, WALStore, WALPos, WALBytes, WALLoader, WALWriter}; - use std::os::unix::io::RawFd; -use nix::Error::Sys; -use nix::errno::Errno; -use nix::unistd::{close, mkdir, sysconf, SysconfVar}; +use nix::unistd::{close, mkdir, unlinkat, UnlinkatFlags, ftruncate}; use nix::fcntl::{open, openat, OFlag, fallocate, FallocateFlags}; -use nix::sys::{stat::Mode, uio::pwrite}; +use nix::sys::{stat::Mode, uio::{pwrite, pread}}; use libc::off_t; +use growthring::{WALFile, WALStore, WALPos, WALBytes, WALLoader, WALWriter}; + struct WALFileTest { filename: String, fd: RawFd, @@ -33,26 +31,39 @@ impl Drop for WALFileTest { } impl WALFile for WALFileTest { - fn allocate(&self, offset: WALPos, length: usize) { + fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()> { println!("{}.allocate(offset=0x{:x}, end=0x{:x})", self.filename, offset, offset + length as u64); - fallocate(self.fd, FallocateFlags::FALLOC_FL_ZERO_RANGE, offset as off_t, length as off_t); + fallocate(self.fd, + FallocateFlags::FALLOC_FL_ZERO_RANGE, + offset as off_t, length as off_t).and_then(|_| Ok(())).or_else(|_| Err(())) } + + fn truncate(&self, length: usize) -> Result<(), ()> { + println!("{}.truncate(length={})", self.filename, length); + ftruncate(self.fd, length as off_t).or_else(|_| Err(())) + } + fn write(&self, offset: WALPos, data: WALBytes) { println!("{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})", self.filename, offset, offset + data.len() as u64, hex::encode(&data)); - pwrite(self.fd, &*data, offset as off_t); + pwrite(self.fd, &*data, offset as off_t).unwrap(); } fn read(&self, offset: WALPos, length: usize) -> WALBytes { - unreachable!() + let mut buff = Vec::new(); + buff.resize(length, 0); + pread(self.fd, &mut buff[..], offset as off_t).unwrap(); + buff.into_boxed_slice() } } struct WALStoreTest { - rootfd: RawFd + rootfd: RawFd, + rootpath: String } impl WALStoreTest { fn new(wal_dir: &str, truncate: bool) -> Self { + let rootpath = wal_dir.to_string(); if truncate { let _ = std::fs::remove_dir_all(wal_dir); } @@ -64,7 +75,7 @@ impl WALStoreTest { Ok(fd) => fd, Err(_) => panic!("error while opening the DB") }; - WALStoreTest { rootfd } + WALStoreTest { rootfd, rootpath } } } @@ -76,20 +87,27 @@ impl Drop for WALStoreTest { impl WALStore for WALStoreTest { fn open_file(&self, filename: &str, touch: bool) -> Option> { - println!("open_file(filename={}, touch={}", filename, touch); + println!("open_file(filename={}, touch={})", filename, touch); let filename = filename.to_string(); Some(Box::new(WALFileTest::new(self.rootfd, &filename))) } + fn remove_file(&self, filename: &str) -> bool { println!("remove_file(filename={})", filename); - true + unlinkat(Some(self.rootfd), filename, UnlinkatFlags::NoRemoveDir).is_ok() } + fn enumerate_files(&self) -> Box<[String]> { println!("enumerate_files()"); - Vec::new().into_boxed_slice() + let mut logfiles = Vec::new(); + for fname in std::fs::read_dir(&self.rootpath).unwrap() { + logfiles.push(fname.unwrap().file_name().into_string().unwrap()) + } + logfiles.into_boxed_slice() } + fn apply_payload(&self, payload: WALBytes) { - println!("apply_payload(payload=0x{})", hex::encode(payload)) + println!("apply_payload(payload={})", std::str::from_utf8(&payload).unwrap()) } } @@ -110,4 +128,11 @@ fn main() { for _ in 0..3 { test(["a".repeat(10), "b".repeat(100), "c".repeat(1000)].iter().map(|s| s.to_string()).collect::>(), &mut wal) } + let store = WALStoreTest::new("./wal_demo1", false); + let mut wal = WALLoader::new(store, 9, 8, 1000).recover(); + for _ in 0..3 { + test(["a".repeat(10), "b".repeat(100), "c".repeat(300), "d".repeat(400)].iter().map(|s| s.to_string()).collect::>(), &mut wal) + } + let store = WALStoreTest::new("./wal_demo1", false); + let wal = WALLoader::new(store, 9, 8, 1000).recover(); } diff --git a/src/lib.rs b/src/lib.rs index 1e1b653..5dcb78a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,7 +55,9 @@ struct WALState { pub trait WALFile { /// Initialize the file space in [offset, offset + length) to zero. - fn allocate(&self, offset: WALPos, length: usize); + fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>; + /// Truncate a file to a specified length. + fn truncate(&self, length: usize) -> Result<(), ()>; /// Write data with offset. fn write(&self, offset: WALPos, data: WALBytes); /// Read data with offset. @@ -67,7 +69,7 @@ pub trait WALStore { fn open_file(&self, filename: &str, touch: bool) -> Option>; /// Unlink a file given the filename. fn remove_file(&self, filename: &str) -> bool; - /// Enumerate all WAL files, ordered by their filenames. + /// Enumerate all WAL files. fn enumerate_files(&self) -> Box<[String]>; /// Apply (redo) the payload during recovery. fn apply_payload(&self, payload: WALBytes); @@ -112,7 +114,7 @@ impl WALFilePool { } fn get_fid(&mut self, fname: &str) -> WALFileId { - scan_fmt!(fname, "{:x}.log", [hex WALFileId]).unwrap() + scan_fmt!(fname, "{x}.log", [hex WALFileId]).unwrap() } // TODO: evict stale handles @@ -125,7 +127,7 @@ impl WALFilePool { for (off, w) in &writes[1..] { let next_fid = off >> self.file_nbit; if next_fid != fid { - h.allocate(alloc_start, (alloc_end - alloc_start) as usize); + h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap(); h = self.get_file(next_fid, true); alloc_start = 0; alloc_end = alloc_start + w.len() as u64; @@ -134,7 +136,7 @@ impl WALFilePool { alloc_end += w.len() as u64; } } - h.allocate(alloc_start, (alloc_end - alloc_start) as usize); + h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap(); for (off, w) in writes.into_iter() { self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w); } @@ -143,6 +145,8 @@ impl WALFilePool { fn remove_file(&self, fid: u64) -> bool { self.store.remove_file(&Self::get_fname(fid)) } + + fn reset(&mut self) { self.handles.clear() } } pub struct WALWriter { @@ -201,7 +205,6 @@ impl WALWriter { if d >= rsize { // the remaining rec fits in the block let payload = rec; - println!("rsize {} {}", rsize, d); blob.crc32 = crc::crc32::checksum_ieee(payload); blob.rsize = rsize; blob.rtype = if started {WALRingType::Last} else {WALRingType::Full}; @@ -285,61 +288,60 @@ impl WALLoader { } pub fn recover(mut self) -> WALWriter { + let block_size = 1 << self.file_pool.block_nbit; let msize = std::mem::size_of::() as u32; - let logfiles = self.file_pool.store.enumerate_files(); + let mut logfiles = self.file_pool.store.enumerate_files(); + // TODO: use regex to filter out invalid files + // TODO: check for missing logfiles + logfiles.sort(); + let mut chunks = None; for fname in logfiles.iter() { let fid = self.file_pool.get_fid(fname); let f = self.file_pool.get_file(fid, false); let mut off = 0; - let mut end = false; - while !end { + while block_size - (off & (block_size - 1)) > msize as u64 { let header_raw = f.read(off, msize as usize); + off += msize as u64; let header = unsafe { std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; let rsize = header.rsize; - off += msize as u64; match header.rtype { WALRingType::Full => { + assert!(chunks.is_none()); let payload = f.read(off, rsize as usize); - self.file_pool.store.apply_payload(payload); off += rsize as u64; + self.file_pool.store.apply_payload(payload); }, WALRingType::First => { - let mut chunks = vec![f.read(off, rsize as usize)]; + assert!(chunks.is_none()); + chunks = Some(vec![f.read(off, rsize as usize)]); off += rsize as u64; - loop { - let header_raw = f.read(off, msize as usize); - let header = unsafe { - std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())}; - if let WALRingType::Null = header.rtype { - end = true; - break - } - let rsize = header.rsize; - let payload = f.read(off, rsize as usize); - off += msize as u64; - chunks.push(payload); - match header.rtype { - WALRingType::Middle => (), - WALRingType::Last => break, - _ => unreachable!() - } - } + }, + WALRingType::Middle => { + chunks.as_mut().unwrap().push(f.read(off, rsize as usize)); + off += rsize as u64; + }, + WALRingType::Last => { + chunks.as_mut().unwrap().push(f.read(off, rsize as usize)); + off += rsize as u64; + + let _chunks = chunks.take().unwrap(); let mut payload = Vec::new(); - payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0); + payload.resize(_chunks.iter().fold(0, |acc, v| acc + v.len()), 0); let mut ps = &mut payload[..]; - for c in chunks { + for c in _chunks { ps[..c.len()].copy_from_slice(&*c); ps = &mut ps[c.len()..]; } self.file_pool.store.apply_payload(payload.into_boxed_slice()); }, - WALRingType::Null => end = true, - _ => unreachable!() + WALRingType::Null => break, } } + f.truncate(0).unwrap(); self.file_pool.remove_file(fid); } + self.file_pool.reset(); WALWriter::new(WALState { first_fid: 0, next: 0, -- cgit v1.2.3