use std::collections::BinaryHeap;
#[repr(u8)]
enum WALRingType {
Null = 0x0,
Full,
First,
Middle,
Last
}
#[repr(C)]
struct WALRingBlob {
crc32: u32,
rsize: u32,
rtype: WALRingType,
// payload follows
}
type WALFileId = u64;
type WALPos = u64;
#[derive(Eq, PartialEq, Copy, Clone)]
pub struct WALRingId {
start: WALPos,
end: WALPos
}
impl Ord for WALRingId {
fn cmp(&self, other: &WALRingId) -> std::cmp::Ordering {
other.start.cmp(&self.start).then_with(|| other.end.cmp(&self.end))
}
}
impl PartialOrd for WALRingId {
fn partial_cmp(&self, other: &WALRingId) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
/// the state for a WAL writer
struct WALState {
/// the first file id of WAL
pub first_fid: WALFileId,
/// the next position for a record, addressed in the entire WAL space
pub next: WALPos,
/// number of bits for a block
pub block_nbit: u8,
/// number of bits for a file
pub file_nbit: u8,
}
pub trait WALFile {
fn allocate(&self, offset: WALPos, length: usize);
fn write(&self, offset: WALPos, data: Box<[u8]>);
fn read(&self, offset: WALPos, length: usize) -> Box<[u8]>;
}
pub trait WALStore {
fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
fn remove_file(&self, filename: &str) -> bool;
fn scan_files(&self) -> Box<[&str]>;
}
/// The middle layer that manages WAL file handles and invokes public trait functions to actually
/// manipulate files and their contents.
struct WALFilePool<F: WALStore> {
store: F,
handles: lru::LruCache<WALFileId, Box<dyn WALFile>>,
file_nbit: u64,
file_size: u64,
block_size: u64,
}
impl<F: WALStore> WALFilePool<F> {
fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
let file_nbit = file_nbit as u64;
WALFilePool {
store,
handles: lru::LruCache::new(cache_size),
file_nbit,
file_size: 1 << (file_nbit as u64),
block_size: 1 << (block_nbit as u64)
}
}
fn get_fname(fid: WALFileId) -> String {
format!("{:08x}.log", fid)
}
fn get_file(&mut self, fid: u64) -> &'static dyn WALFile {
let h = match self.handles.get(&fid) {
Some(h) => &**h,
None => {
self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), true).unwrap());
&**self.handles.get(&fid).unwrap()
}
};
unsafe {&*(h as *const dyn WALFile)}
}
fn write(&mut self, writes: Vec<(WALPos, Box<[u8]>)>) {
// pre-allocate the blocks
let fid = writes[0].0 >> self.file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
let mut alloc_end = alloc_start + self.block_size;
let mut h = self.get_file(fid);
for (off, _) in &writes[1..] {
let next_fid = off >> self.file_nbit;
if next_fid != fid {
h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
h