diff options
author | Determinant <[email protected]> | 2020-06-09 23:19:11 -0400 |
---|---|---|
committer | Determinant <[email protected]> | 2020-06-09 23:19:11 -0400 |
commit | 4831ae815f26170174545ae87e9fe960bfce5b8c (patch) | |
tree | 0161c2ff585d8176f33306ee0f2cf01b628d5a9a /src | |
parent | 56bc442c4da0e6e75401d50f206fe56e1bf8adbb (diff) |
...
Diffstat (limited to 'src')
-rw-r--r-- | src/lib.rs | 45 |
1 files changed, 26 insertions, 19 deletions
@@ -11,7 +11,7 @@ enum WALRingType { Last } -#[repr(C)] +#[repr(packed)] struct WALRingBlob { crc32: u32, rsize: u32, @@ -19,11 +19,11 @@ struct WALRingBlob { // payload follows } -type WALBytes = Box<[u8]>; -type WALFileId = u64; -type WALPos = u64; +pub type WALBytes = Box<[u8]>; +pub type WALFileId = u64; +pub type WALPos = u64; -#[derive(Eq, PartialEq, Copy, Clone)] +#[derive(Eq, PartialEq, Copy, Clone, Debug)] pub struct WALRingId { start: WALPos, end: WALPos @@ -118,7 +118,7 @@ impl<F: WALStore> WALFilePool<F> { // TODO: evict stale handles fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) { // pre-allocate the file space - let fid = writes[0].0 >> self.file_nbit; + let mut fid = writes[0].0 >> self.file_nbit; let mut alloc_start = writes[0].0 & (self.file_size - 1); let mut alloc_end = alloc_start + writes[0].1.len() as u64; let mut h = self.get_file(fid, true); @@ -129,13 +129,14 @@ impl<F: WALStore> WALFilePool<F> { h = self.get_file(next_fid, true); alloc_start = 0; alloc_end = alloc_start + w.len() as u64; + fid = next_fid; } else { alloc_end += w.len() as u64; } } h.allocate(alloc_start, (alloc_end - alloc_start) as usize); for (off, w) in writes.into_iter() { - self.get_file(off >> self.file_nbit, true).write(off, w); + self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w); } } @@ -175,7 +176,7 @@ impl<F: WALStore> WALWriter<F> { /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the /// function returns. The caller then has the knowledge of WAL writes so it should defer /// actual data writes after WAL writes. - pub fn grow(&mut self, records: &[WALBytes]) -> Box<[WALRingId]> { + pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> Box<[WALRingId]> { let mut res = Vec::new(); let mut writes = Vec::new(); let msize = std::mem::size_of::<WALRingBlob>() as u32; @@ -185,7 +186,7 @@ impl<F: WALStore> WALWriter<F> { // the end of the unwritten data let mut bbuff_cur = bbuff_start; - for _rec in records { + for _rec in records.as_ref() { let mut rec = &_rec[..]; let mut rsize = rec.len() as u32; let mut started = false; @@ -193,18 +194,22 @@ impl<F: WALStore> WALWriter<F> { let remain = self.block_size - bbuff_cur; if remain > msize { let d = remain - msize; + let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64; let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>( (&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())}; - let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64; + bbuff_cur += msize; if d >= rsize { // the remaining rec fits in the block let payload = rec; + println!("rsize {} {}", rsize, d); blob.crc32 = crc::crc32::checksum_ieee(payload); blob.rsize = rsize; blob.rtype = if started {WALRingType::Last} else {WALRingType::Full}; - rsize = 0; - &mut self.block_buffer[bbuff_cur as usize..].copy_from_slice(payload); + &mut self.block_buffer[ + bbuff_cur as usize.. + bbuff_cur as usize + payload.len()].copy_from_slice(payload); bbuff_cur += rsize; + rsize = 0; } else { // the remaining block can only accommodate partial rec let payload = &rec[..d as usize]; @@ -214,9 +219,11 @@ impl<F: WALStore> WALWriter<F> { started = true; WALRingType::First }; - rsize -= d; - &mut self.block_buffer[bbuff_cur as usize..].copy_from_slice(payload); + &mut self.block_buffer[ + bbuff_cur as usize.. + bbuff_cur as usize + payload.len()].copy_from_slice(payload); bbuff_cur += d; + rsize -= d; rec = &rec[d as usize..]; } let ring_end = self.state.next + (bbuff_cur - bbuff_start) as u64; @@ -247,8 +254,8 @@ impl<F: WALStore> WALWriter<F> { /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are /// complete so that it could automatically remove obsolete WAL files. - pub fn peel(&mut self, records: &[WALRingId]) { - for rec in records { + pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) { + for rec in records.as_ref() { self.io_complete.push(*rec) } let orig_fid = self.state.first_fid; @@ -267,14 +274,14 @@ impl<F: WALStore> WALWriter<F> { } } -pub struct WALReader<F: WALStore> { +pub struct WALLoader<F: WALStore> { file_pool: WALFilePool<F>, } -impl<F: WALStore> WALReader<F> { +impl<F: WALStore> WALLoader<F> { pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self { let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size); - WALReader{ file_pool } + WALLoader{ file_pool } } pub fn recover(mut self) -> WALWriter<F> { |