aboutsummaryrefslogtreecommitdiff
path: root/src/lib.rs
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-06-10 11:50:02 -0400
committerDeterminant <[email protected]>2020-06-10 11:50:02 -0400
commitf8fd7a86c11bf15a41cd402b2ef8fe457f0c2026 (patch)
tree0adf825d7e19e0b25bda8ac58fa246c7a30994f6 /src/lib.rs
parent811146c489ddd1494ff083609c10daed0c2c5023 (diff)
move pure WAL implementation to wal.rs
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs355
1 files changed, 1 insertions, 354 deletions
diff --git a/src/lib.rs b/src/lib.rs
index a2b88ad..7635dab 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,355 +1,2 @@
#[macro_use] extern crate scan_fmt;
-use std::collections::BinaryHeap;
-
-#[repr(u8)]
-enum WALRingType {
- #[allow(dead_code)]
- Null = 0x0,
- Full,
- First,
- Middle,
- Last
-}
-
-#[repr(packed)]
-struct WALRingBlob {
- crc32: u32,
- rsize: u32,
- rtype: WALRingType,
- // payload follows
-}
-
-pub type WALBytes = Box<[u8]>;
-pub type WALFileId = u64;
-pub type WALPos = u64;
-
-#[derive(Eq, PartialEq, Copy, Clone, Debug)]
-pub struct WALRingId {
- start: WALPos,
- end: WALPos
-}
-
-impl Ord for WALRingId {
- fn cmp(&self, other: &WALRingId) -> std::cmp::Ordering {
- other.start.cmp(&self.start).then_with(|| other.end.cmp(&self.end))
- }
-}
-
-impl PartialOrd for WALRingId {
- fn partial_cmp(&self, other: &WALRingId) -> Option<std::cmp::Ordering> {
- Some(self.cmp(other))
- }
-}
-
-/// the state for a WAL writer
-struct WALState {
- /// the first file id of WAL
- first_fid: WALFileId,
- /// the next position for a record, addressed in the entire WAL space
- next: WALPos,
- /// number of bits for a file
- file_nbit: u64,
-}
-
-pub trait WALFile {
- /// Initialize the file space in [offset, offset + length) to zero.
- fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
- /// Truncate a file to a specified length.
- fn truncate(&self, length: usize) -> Result<(), ()>;
- /// Write data with offset.
- fn write(&self, offset: WALPos, data: WALBytes);
- /// Read data with offset.
- fn read(&self, offset: WALPos, length: usize) -> WALBytes;
-}
-
-pub trait WALStore {
- /// Open a file given the filename, create the file if not exists when `touch` is `true`.
- fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
- /// Unlink a file given the filename.
- fn remove_file(&self, filename: &str) -> Result<(), ()>;
- /// Enumerate all WAL files.
- fn enumerate_files(&self) -> Box<[String]>;
- /// Apply (redo) the payload during recovery.
- fn apply_payload(&self, payload: WALBytes);
-}
-
-/// The middle layer that manages WAL file handles and invokes public trait functions to actually
-/// manipulate files and their contents.
-struct WALFilePool<F: WALStore> {
- store: F,
- handles: lru::LruCache<WALFileId, Box<dyn WALFile>>,
- file_nbit: u64,
- file_size: u64,
- block_nbit: u64,
-}
-
-impl<F: WALStore> WALFilePool<F> {
- fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
- let file_nbit = file_nbit as u64;
- let block_nbit = block_nbit as u64;
- WALFilePool {
- store,
- handles: lru::LruCache::new(cache_size),
- file_nbit,
- file_size: 1 << (file_nbit as u64),
- block_nbit,
- }
- }
-
- fn get_fname(fid: WALFileId) -> String {
- format!("{:08x}.log", fid)
- }
-
- fn get_file(&mut self, fid: u64, touch: bool) -> &'static dyn WALFile {
- let h = match self.handles.get(&fid) {
- Some(h) => &**h,
- None => {
- self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch).unwrap());
- &**self.handles.get(&fid).unwrap()
- }
- };
- unsafe {&*(h as *const dyn WALFile)}
- }
-
- fn get_fid(&mut self, fname: &str) -> WALFileId {
- scan_fmt!(fname, "{x}.log", [hex WALFileId]).unwrap()
- }
-
- // TODO: evict stale handles
- fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) {
- // pre-allocate the file space
- let mut fid = writes[0].0 >> self.file_nbit;
- let mut alloc_start = writes[0].0 & (self.file_size - 1);
- let mut alloc_end = alloc_start + writes[0].1.len() as u64;
- let mut h = self.get_file(fid, true);
- for (off, w) in &writes[1..] {
- let next_fid = off >> self.file_nbit;
- if next_fid != fid {
- h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
- h = self.get_file(next_fid, true);
- alloc_start = 0;
- alloc_end = alloc_start + w.len() as u64;
- fid = next_fid;
- } else {
- alloc_end += w.len() as u64;
- }
- }
- h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
- for (off, w) in writes.into_iter() {
- self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w);
- }
- }
-
- fn remove_file(&self, fid: u64) -> Result<(), ()> {
- self.store.remove_file(&Self::get_fname(fid))
- }
-
- fn reset(&mut self) { self.handles.clear() }
-}
-
-pub struct WALWriter<F: WALStore> {
- state: WALState,
- file_pool: WALFilePool<F>,
- block_buffer: WALBytes,
- block_size: u32,
- next_complete: WALPos,
- io_complete: BinaryHeap<WALRingId>
-}
-
-impl<F: WALStore> WALWriter<F> {
- fn new(state: WALState, file_pool: WALFilePool<F>) -> Self {
- let mut b = Vec::new();
- let block_size = 1 << file_pool.block_nbit as u32;
- //let block_nbit = state.block_nbit;
- //let block_size = 1 << (block_nbit as u32);
- //let file_nbit = state.file_nbit;
- //let file_size = 1 << (file_nbit as u64);
- b.resize(block_size as usize, 0);
- WALWriter{
- state,
- file_pool,
- block_buffer: b.into_boxed_slice(),
- block_size,
- next_complete: 0,
- io_complete: BinaryHeap::new(),
- }
- }
-
- /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
- /// function returns. The caller then has the knowledge of WAL writes so it should defer
- /// actual data writes after WAL writes.
- pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> Box<[WALRingId]> {
- let mut res = Vec::new();
- let mut writes = Vec::new();
- let msize = std::mem::size_of::<WALRingBlob>() as u32;
- // the global offest of the begining of the block
- // the start of the unwritten data
- let mut bbuff_start = self.state.next as u32 & (self.block_size - 1);
- // the end of the unwritten data
- let mut bbuff_cur = bbuff_start;
-
- for _rec in records.as_ref() {
- let mut rec = &_rec[..];
- let mut rsize = rec.len() as u32;
- let mut started = false;
- while rsize > 0 {
- let remain = self.block_size - bbuff_cur;
- if remain > msize {
- let d = remain - msize;
- let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64;
- let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>(
- (&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())};
- bbuff_cur += msize;
- if d >= rsize {
- // the remaining rec fits in the block
- let payload = rec;
- blob.crc32 = crc::crc32::checksum_ieee(payload);
- blob.rsize = rsize;
- blob.rtype = if started {WALRingType::Last} else {WALRingType::Full};
- &mut self.block_buffer[
- bbuff_cur as usize..
- bbuff_cur as usize + payload.len()].copy_from_slice(payload);
- bbuff_cur += rsize;
- rsize = 0;
- } else {
- // the remaining block can only accommodate partial rec
- let payload = &rec[..d as usize];
- blob.crc32 = crc::crc32::checksum_ieee(payload);
- blob.rsize = d;
- blob.rtype = if started {WALRingType::Middle} else {
- started = true;
- WALRingType::First
- };
- &mut self.block_buffer[
- bbuff_cur as usize..
- bbuff_cur as usize + payload.len()].copy_from_slice(payload);
- bbuff_cur += d;
- rsize -= d;
- rec = &rec[d as usize..];
- }
- let ring_end = self.state.next + (bbuff_cur - bbuff_start) as u64;
- res.push(WALRingId{start: ring_start, end: ring_end});
- } else {
- // add padding space by moving the point to the end of the block
- bbuff_cur = self.block_size;
- }
- if bbuff_cur == self.block_size {
- writes.push((self.state.next,
- self.block_buffer[bbuff_start as usize..]
- .to_vec().into_boxed_slice()));
- self.state.next += (self.block_size - bbuff_start) as u64;
- bbuff_start = 0;
- bbuff_cur = 0;
- }
- }
- }
- if bbuff_cur > bbuff_start {
- writes.push((self.state.next,
- self.block_buffer[bbuff_start as usize..bbuff_cur as usize]
- .to_vec().into_boxed_slice()));
- self.state.next += (bbuff_cur - bbuff_start) as u64;
- }
- self.file_pool.write(writes);
- res.into_boxed_slice()
- }
-
- /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
- /// complete so that it could automatically remove obsolete WAL files.
- pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) {
- let msize = std::mem::size_of::<WALRingBlob>() as u64;
- let block_size = self.block_size as u64;
- for rec in records.as_ref() {
- self.io_complete.push(*rec)
- }
- let orig_fid = self.state.first_fid;
- while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) {
- if s != self.next_complete {
- break
- }
- let mut m = self.io_complete.pop().unwrap();
- let block_remain = block_size - (m.end & (block_size - 1));
- if block_remain <= msize as u64 {
- m.end += block_remain
- }
- self.next_complete = m.end
- }
- let next_fid = self.next_complete >> self.state.file_nbit;
- for fid in orig_fid..next_fid {
- self.file_pool.remove_file(fid).unwrap();
- }
- self.state.first_fid = next_fid;
- }
-}
-
-pub struct WALLoader<F: WALStore> {
- file_pool: WALFilePool<F>,
-}
-
-impl<F: WALStore> WALLoader<F> {
- pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
- let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size);
- WALLoader{ file_pool }
- }
-
- pub fn recover(mut self) -> WALWriter<F> {
- let block_size = 1 << self.file_pool.block_nbit;
- let msize = std::mem::size_of::<WALRingBlob>() as u32;
- let mut logfiles = self.file_pool.store.enumerate_files();
- // TODO: use regex to filter out invalid files
- // TODO: check for missing logfiles
- logfiles.sort();
- let mut chunks = None;
- for fname in logfiles.iter() {
- let fid = self.file_pool.get_fid(fname);
- let f = self.file_pool.get_file(fid, false);
- let mut off = 0;
- while block_size - (off & (block_size - 1)) > msize as u64 {
- let header_raw = f.read(off, msize as usize);
- off += msize as u64;
- let header = unsafe {
- std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
- let rsize = header.rsize;
- match header.rtype {
- WALRingType::Full => {
- assert!(chunks.is_none());
- let payload = f.read(off, rsize as usize);
- off += rsize as u64;
- self.file_pool.store.apply_payload(payload);
- },
- WALRingType::First => {
- assert!(chunks.is_none());
- chunks = Some(vec![f.read(off, rsize as usize)]);
- off += rsize as u64;
- },
- WALRingType::Middle => {
- chunks.as_mut().unwrap().push(f.read(off, rsize as usize));
- off += rsize as u64;
- },
- WALRingType::Last => {
- chunks.as_mut().unwrap().push(f.read(off, rsize as usize));
- off += rsize as u64;
-
- let _chunks = chunks.take().unwrap();
- let mut payload = Vec::new();
- payload.resize(_chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
- let mut ps = &mut payload[..];
- for c in _chunks {
- ps[..c.len()].copy_from_slice(&*c);
- ps = &mut ps[c.len()..];
- }
- self.file_pool.store.apply_payload(payload.into_boxed_slice());
- },
- WALRingType::Null => break,
- }
- }
- f.truncate(0).unwrap();
- self.file_pool.remove_file(fid).unwrap();
- }
- self.file_pool.reset();
- WALWriter::new(WALState {
- first_fid: 0,
- next: 0,
- file_nbit: self.file_pool.file_nbit,
- }, self.file_pool)
- }
-}
+pub mod wal;