From 61efb65b72a1d8d02a2c1615ae4a498eef7540c6 Mon Sep 17 00:00:00 2001 From: Determinant Date: Tue, 9 Jun 2020 17:07:02 -0400 Subject: ... --- src/lib.rs | 103 +++++++++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 77 insertions(+), 26 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2716cb5..29f7835 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ enum WALRingType { Last } +#[repr(C)] struct WALRingBlob { crc32: u32, rsize: u32, @@ -16,7 +17,7 @@ struct WALRingBlob { // payload follows } -type WALFileId = u32; +type WALFileId = u64; type WALPos = u64; #[derive(Eq, PartialEq, Copy, Clone)] @@ -37,17 +38,22 @@ impl PartialOrd for WALRingId { } } -pub struct WALState { - pub first_fid: u64, - pub last: WALPos, +/// the state for a WAL writer +struct WALState { + /// the first file id of WAL + pub first_fid: WALFileId, + /// the next position for a record, addressed in the entire WAL space + pub next: WALPos, + /// number of bits for a block pub block_nbit: u8, + /// number of bits for a file pub file_nbit: u8, } pub trait WALFile { - fn allocate(&self, offset: u64, length: usize); - fn write(&self, offset: u64, data: Box<[u8]>); - fn read(&self, offset: u64, length: usize) -> Box<[u8]>; + fn allocate(&self, offset: WALPos, length: usize); + fn write(&self, offset: WALPos, data: Box<[u8]>); + fn read(&self, offset: WALPos, length: usize) -> Box<[u8]>; } pub trait WALStore { @@ -56,22 +62,66 @@ pub trait WALStore { fn scan_files(&self) -> Box<[&str]>; } +/// The middle layer that manages WAL file handles and invokes public trait functions to actually +/// manipulate files and their contents. struct WALFilePool { store: F, - handles: lru::LruCache>, - file_size: u64 + handles: lru::LruCache>, + file_nbit: u64, + file_size: u64, + block_size: u64, } impl WALFilePool { - fn new(store: F, file_size: u64, cache_size: usize) -> Self { + fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { + let file_nbit = file_nbit as u64; WALFilePool { store, handles: lru::LruCache::new(cache_size), - file_size, + file_nbit, + file_size: 1 << (file_nbit as u64), + block_size: 1 << (block_nbit as u64) } } - fn write(&mut self, offset: u64, data: Box<[u8]>) { + + fn get_fname(fid: WALFileId) -> String { + format!("{:08x}.log", fid) + } + + fn get_file(&mut self, fid: u64) -> &'static dyn WALFile { + let h = match self.handles.get(&fid) { + Some(h) => &**h, + None => { + self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), true).unwrap()); + &**self.handles.get(&fid).unwrap() + } + }; + unsafe {&*(h as *const dyn WALFile)} } + + fn write(&mut self, writes: Vec<(WALPos, Box<[u8]>)>) { + // pre-allocate the blocks + let fid = writes[0].0 >> self.file_nbit; + let mut alloc_start = writes[0].0 & (self.file_size - 1); + let mut alloc_end = alloc_start + self.block_size; + let mut h = self.get_file(fid); + for (off, _) in &writes[1..] { + let next_fid = off >> self.file_nbit; + if next_fid != fid { + h.allocate(alloc_start, (alloc_end - alloc_start) as usize); + h = self.get_file(next_fid); + alloc_start = 0; + alloc_end = alloc_start + self.block_size; + } else { + alloc_end += self.block_size; + } + } + h.allocate(alloc_start, (alloc_end - alloc_start) as usize); + for (off, w) in writes.into_iter() { + self.get_file(off >> self.file_nbit).write(off, w); + } + } + fn remove_file(&self, fid: u64) -> bool { true } @@ -87,14 +137,16 @@ pub struct WALWriter { } impl WALWriter { - pub fn new(state: WALState, wal_store: F, cache_size: usize) -> Self { + fn new(state: WALState, wal_store: F, cache_size: usize) -> Self { let mut b = Vec::new(); - let block_size = 1 << (state.block_nbit as u32); - let file_size = 1 << (state.file_nbit as u64); + let block_nbit = state.block_nbit; + let block_size = 1 << (block_nbit as u32); + let file_nbit = state.file_nbit; + let file_size = 1 << (file_nbit as u64); b.resize(block_size as usize, 0); WALWriter{ state, - file_pool: WALFilePool::new(wal_store, file_size, cache_size), + file_pool: WALFilePool::new(wal_store, file_nbit, block_nbit, cache_size), block_buffer: b.into_boxed_slice(), block_size, next_complete: 0, @@ -111,7 +163,7 @@ impl WALWriter { let msize = std::mem::size_of::() as u32; // the global offest of the begining of the block // the start of the unwritten data - let mut bbuff_start = self.state.last as u32 & (self.block_size - 1); + let mut bbuff_start = self.state.next as u32 & (self.block_size - 1); // the end of the unwritten data let mut bbuff_cur = bbuff_start; @@ -125,7 +177,7 @@ impl WALWriter { let d = remain - msize; let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>( &mut self.block_buffer[bbuff_cur as usize] as *mut u8)}; - let ring_start = self.state.last + (bbuff_cur - bbuff_start) as u64; + let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64; if d >= rsize { // the remaining rec fits in the block let payload = rec; @@ -149,31 +201,29 @@ impl WALWriter { bbuff_cur += d; rec = &rec[d as usize..]; } - let ring_end = self.state.last + (bbuff_cur - bbuff_start) as u64; + let ring_end = self.state.next + (bbuff_cur - bbuff_start) as u64; res.push(WALRingId{start: ring_start, end: ring_end}); } else { // add padding space by moving the point to the end of the block bbuff_cur = self.block_size; } if bbuff_cur == self.block_size { - writes.push((self.state.last, + writes.push((self.state.next, self.block_buffer[bbuff_start as usize..] .to_vec().into_boxed_slice())); - self.state.last += (self.block_size - bbuff_start) as u64; + self.state.next += (self.block_size - bbuff_start) as u64; bbuff_start = 0; bbuff_cur = 0; } } } if bbuff_cur > bbuff_start { - writes.push((self.state.last, + writes.push((self.state.next, self.block_buffer[bbuff_start as usize..bbuff_cur as usize] .to_vec().into_boxed_slice())); - self.state.last += (bbuff_cur - bbuff_start) as u64; - } - for (off, w) in writes.into_iter() { - self.file_pool.write(off, w); + self.state.next += (bbuff_cur - bbuff_start) as u64; } + self.file_pool.write(writes); res.into_boxed_slice() } @@ -195,6 +245,7 @@ impl WALWriter { for fid in orig_fid..next_fid { self.file_pool.remove_file(fid); } + self.state.first_fid = next_fid; } } -- cgit v1.2.3