path: root/src/wal.rs
blob: 0ba35fbec45b8f11db42fe6556d8f64591f8c31f (plain) (tree)












use std::collections::BinaryHeap;

enum WALRingType {
    Null = 0x0,

struct WALRingBlob {
    crc32: u32,
    rsize: u32,
    rtype: WALRingType,
    // payload follows

pub type WALBytes = Box<[u8]>;
pub type WALFileId = u64;
pub type WALPos = u64;

#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub struct WALRingId {
    start: WALPos,
    end: WALPos

impl Ord for WALRingId {
    fn cmp(&self, other: &WALRingId) -> std::cmp::Ordering {
        other.start.cmp(&self.start).then_with(|| other.end.cmp(&self.end))

impl PartialOrd for WALRingId {
    fn partial_cmp(&self, other: &WALRingId) -> Option<std::cmp::Ordering> {

/// the state for a WAL writer
struct WALState {
    /// the first file id of WAL
    first_fid: WALFileId,
    /// the next position for a record, addressed in the entire WAL space
    next: WALPos,
    /// number of bits for a file
    file_nbit: u64,

pub trait WALFile {
    /// Initialize the file space in [offset, offset + length) to zero.
    fn allocate(&self, offset: WALPos, length: usize) -> Result<(), ()>;
    /// Truncate a file to a specified length.
    fn truncate(&self, length: usize) -> Result<(), ()>;
    /// Write data with offset. We assume the actual writes on the storage medium are _strictly
    /// ordered_ the same way as this callback is invoked. We also assume all previous
    /// `allocate/truncate` invocation should be visible if ordered earlier (should be guaranteed
    /// by most OS).  Additionally, the final write caused by each invocation of this function
    /// should be _atomic_ (the entire single write should be all or nothing).
    fn write(&self, offset: WALPos, data: WALBytes);
    /// Read data with offset.
    fn read(&self, offset: WALPos, length: usize) -> WALBytes;

pub trait WALStore {
    type FileNameIter: Iterator<Item = String>;

    /// Open a file given the filename, create the file if not exists when `touch` is `true`.
    fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
    /// Unlink a file given the filename.
    fn remove_file(&self, filename: &str) -> Result<(), ()>;
    /// Enumerate all WAL filenames. It should include all WAL files that are previously opened
    /// (created) but not removed. The list could be unordered.
    fn enumerate_files(&self) -> Self::FileNameIter;
    /// Apply the payload during recovery. This notifies the application should redo the given
    /// operation to ensure its state is consistent (the major goal of having a WAL). We assume
    /// the application applies the payload by the _order_ of this callback invocation.
    fn apply_payload(&self, payload: WALBytes);

/// The middle layer that manages WAL file handles and invokes public trait functions to actually
/// manipulate files and their contents.
struct WALFilePool<F: WALStore> {
    store: F,
    handles: lru::LruCache<WALFileId, Box<dyn WALFile>>,
    file_nbit: u64,
    file_size: u64,
    block_nbit: u64,

impl<F: WALStore> WALFilePool<F> {
    fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
        let file_nbit = file_nbit as u64;
        let block_nbit = block_nbit as u64;
        WALFilePool {
            handles: lru::LruCache::new(cache_size),
            file_size: 1 << (file_nbit as u64),

    fn get_fname(fid: WALFileId) -> String {
        format!("{:08x}.log", fid)

    fn get_file(&mut self, fid: u64, touch: bool) -> &'static dyn WALFile {
        let h = match self.handles.get(&fid) {
            Some(h) => &**h,
            None => {
                self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch).unwrap());
        unsafe {&*(h as *const dyn WALFile)}

    fn get_fid(&mut self, fname: &str) -> WALFileId {
        scan_fmt!(fname, "{x}.log", [hex WALFileId]).unwrap()

    // TODO: evict stale handles
    fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) {
        // pre-allocate the file space
        let mut fid = writes[0].0 >> self.file_nbit;
        let mut alloc_start = writes[0].0 & (self.file_size - 1);
        let mut alloc_end = alloc_start + writes[0].1.len() as u64;
        let mut h = self.get_file(fid, true);
        for (off, w) in &writes[1..] {
            let next_fid = off >> self.file_nbit;
            if next_fid != fid {
                h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
                h = self.get_file(next_fid, true);
                alloc_start = 0;
                alloc_end = alloc_start + w.len() as u64;
                fid = next_fid;
            } else {
                alloc_end += w.len() as u64;
        h.allocate(alloc_start, (alloc_end - alloc_start) as usize).unwrap();
        for (off, w) in writes.into_iter() {
            self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w);

    fn remove_file(&self, fid: u64) -> Result<(), ()> {

    fn reset(&mut self) { self.handles.clear() }

pub struct WALWriter<F: WALStore> {
    state: WALState,
    file_pool: WALFilePool<F>,
    block_buffer: WALBytes,
    block_size: u32,
    next_complete: WALPos,
    io_complete: BinaryHeap<WALRingId>

impl<F: WALStore> WALWriter<F> {
    fn new(state: WALState, file_pool: WALFilePool<F>) -> Self {
        let mut b = Vec::new();
        let block_size = 1 << file_pool.block_nbit as u32;
        //let block_nbit = state.block_nbit;
        //let block_size = 1 << (block_nbit as u32);
        //let file_nbit = state.file_nbit;
        //let file_size = 1 << (file_nbit as u64);
        b.resize(block_size as usize, 0);
            block_buffer: b.into_boxed_slice(),
            next_complete: 0,
            io_complete: BinaryHeap::new(),

    /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
    /// function returns.  The caller then has the knowledge of WAL writes so it should defer
    /// actual data writes after WAL writes.
    pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> Box<[WALRingId]> {
        let mut res = Vec::new();
        let mut writes = Vec::new();
        let msize = std::mem::size_of::<WALRingBlob>() as u32;
        // the global offest of the begining of the block
        // the start of the unwritten data
        let mut bbuff_start = self.state.next as u32 & (self.block_size - 1);
        // the end of the unwritten data
        let mut bbuff_cur = bbuff_start;

        for _rec in records.as_ref() {