From 4831ae815f26170174545ae87e9fe960bfce5b8c Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 9 Jun 2020 23:19:11 -0400 Subject: ... --- Cargo.lock | 7 +++++++ Cargo.toml | 3 +++ examples/.gitignore | 2 ++ examples/demo1.rs | 56 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 45 ++++++++++++++++++++++++------------------ 5 files changed, 94 insertions(+), 19 deletions(-) create mode 100644 examples/.gitignore create mode 100644 examples/demo1.rs diff --git a/Cargo.lock b/Cargo.lock index d6e8d74..f20c53d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,6 +81,7 @@ name = "growth-ring" version = "0.1.0" dependencies = [ "crc", + "hex", "lru", "scan_fmt", ] @@ -95,6 +96,12 @@ dependencies = [ "autocfg", ] +[[package]] +name = "hex" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" + [[package]] name = "lazy_static" version = "1.4.0" diff --git a/Cargo.toml b/Cargo.toml index ee8d867..2915a84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,9 @@ crc = "1.8.1" lru = "0.5.1" scan_fmt = "0.2.5" +[dev-dependencies] +hex = "0.4.2" + [lib] name = "growthring" path = "src/lib.rs" diff --git a/examples/.gitignore b/examples/.gitignore new file mode 100644 index 0000000..7d977f4 --- /dev/null +++ b/examples/.gitignore @@ -0,0 +1,2 @@ +demo +testdb diff --git a/examples/demo1.rs b/examples/demo1.rs new file mode 100644 index 0000000..0f50ec3 --- /dev/null +++ b/examples/demo1.rs @@ -0,0 +1,56 @@ +use growthring::{WALFile, WALStore, WALPos, WALBytes, WALLoader, WALWriter}; + +struct WALFileFake { + filename: String +} + +impl WALFile for WALFileFake { + fn allocate(&self, offset: WALPos, length: usize) { + println!("{}.allocate(offset=0x{:x}, end=0x{:x})", self.filename, offset, offset + length as u64); + } + fn write(&self, offset: WALPos, data: WALBytes) { + println!("{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})", self.filename, offset, offset + data.len() as u64, hex::encode(data)); + } + fn read(&self, offset: WALPos, length: usize) -> WALBytes { + Vec::new().into_boxed_slice() + } +} + +struct WALStoreFake; +impl WALStore for WALStoreFake { + fn open_file(&self, filename: &str, touch: bool) -> Option> { + println!("open_file(filename={}, touch={}", filename, touch); + let filename = filename.to_string(); + Some(Box::new(WALFileFake{ filename })) + } + fn remove_file(&self, filename: &str) -> bool { + println!("remove_file(filename={})", filename); + true + } + fn enumerate_files(&self) -> Box<[String]> { + println!("enumerate_files()"); + Vec::new().into_boxed_slice() + } + fn apply_payload(&self, payload: WALBytes) { + println!("apply_payload(payload=0x{})", hex::encode(payload)) + } +} + +fn test(records: Vec, wal: &mut WALWriter) { + let records: Vec = records.into_iter().map(|s| s.into_bytes().into_boxed_slice()).collect(); + let ret = wal.grow(&records); + for ring_id in ret.iter() { + println!("got ring id: {:?}", ring_id); + } +} + +fn main() { + let store = WALStoreFake; + let mut wal = WALLoader::new(store, 9, 8, 1000).recover(); + for _ in 0..3 { + test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::>(), &mut wal) + } + for _ in 0..3 { + test(["a".repeat(10), "b".repeat(100), "c".repeat(1000)].iter().map(|s| s.to_string()).collect::>(), &mut wal) + } +} diff --git a/src/lib.rs b/src/lib.rs index 53f4e0f..1e1b653 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ enum WALRingType { Last } -#[repr(C)] +#[repr(packed)] struct WALRingBlob { crc32: u32, rsize: u32, @@ -19,11 +19,11 @@ struct WALRingBlob { // payload follows } -type WALBytes = Box<[u8]>; -type WALFileId = u64; -type WALPos = u64; +pub type WALBytes = Box<[u8]>; +pub type WALFileId = u64; +pub type WALPos = u64; -#[derive(Eq, PartialEq, Copy, Clone)] +#[derive(Eq, PartialEq, Copy, Clone, Debug)] pub struct WALRingId { start: WALPos, end: WALPos @@ -118,7 +118,7 @@ impl WALFilePool { // TODO: evict stale handles fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) { // pre-allocate the file space - let fid = writes[0].0 >> self.file_nbit; + let mut fid = writes[0].0 >> self.file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); let mut alloc_end = alloc_start + writes[0].1.len() as u64; let mut h = self.get_file(fid, true); @@ -129,13 +129,14 @@ impl WALFilePool { h = self.get_file(next_fid, true); alloc_start = 0; alloc_end = alloc_start + w.len() as u64; + fid = next_fid; } else { alloc_end += w.len() as u64; } } h.allocate(alloc_start, (alloc_end - alloc_start) as usize); for (off, w) in writes.into_iter() { - self.get_file(off >> self.file_nbit, true).write(off, w); + self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w); } } @@ -175,7 +176,7 @@ impl WALWriter { /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the /// function returns. The caller then has the knowledge of WAL writes so it should defer /// actual data writes after WAL writes. - pub fn grow(&mut self, records: &[WALBytes]) -> Box<[WALRingId]> { + pub fn grow>(&mut self, records: T) -> Box<[WALRingId]> { let mut res = Vec::new(); let mut writes = Vec::new(); let msize = std::mem::size_of::() as u32; @@ -185,7 +186,7 @@ impl WALWriter { // the end of the unwritten data let mut bbuff_cur = bbuff_start; - for _rec in records { + for _rec in records.as_ref() { let mut rec = &_rec[..]; let mut rsize = rec.len() as u32; let mut started = false; @@ -193,18 +194,22 @@ impl WALWriter { let remain = self.block_size - bbuff_cur; if remain > msize { let d = remain - msize; + let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64; let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>( (&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())}; - let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64; + bbuff_cur += msize; if d >= rsize { // the remaining rec fits in the block let payload = rec; + println!("rsize {} {}", rsize, d); blob.crc32 = crc::crc32::checksum_ieee(payload); blob.rsize = rsize; blob.rtype = if started {WALRingType::Last} else {WALRingType::Full}; - rsize = 0; - &mut self.block_buffer[bbuff_cur as usize..].copy_from_slice(payload); + &mut self.block_buffer[ + bbuff_cur as usize.. + bbuff_cur as usize + payload.len()].copy_from_slice(payload); bbuff_cur += rsize; + rsize = 0; } else { // the remaining block can only accommodate partial rec let payload = &rec[..d as usize]; @@ -214,9 +219,11 @@ impl WALWriter { started = true; WALRingType::First }; - rsize -= d; - &mut self.block_buffer[bbuff_cur as usize..].copy_from_slice(payload); + &mut self.block_buffer[ + bbuff_cur as usize.. + bbuff_cur as usize + payload.len()].copy_from_slice(payload); bbuff_cur += d; + rsize -= d; rec = &rec[d as usize..]; } let ring_end = self.state.next + (bbuff_cur - bbuff_start) as u64; @@ -247,8 +254,8 @@ impl WALWriter { /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are /// complete so that it could automatically remove obsolete WAL files. - pub fn peel(&mut self, records: &[WALRingId]) { - for rec in records { + pub fn peel>(&mut self, records: T) { + for rec in records.as_ref() { self.io_complete.push(*rec) } let orig_fid = self.state.first_fid; @@ -267,14 +274,14 @@ impl WALWriter { } } -pub struct WALReader { +pub struct WALLoader { file_pool: WALFilePool, } -impl WALReader { +impl WALLoader { pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size); - WALReader{ file_pool } + WALLoader{ file_pool } } pub fn recover(mut self) -> WALWriter { -- cgit v1.2.3-70-g09d2