aboutsummaryrefslogblamecommitdiff
path: root/src/lib.rs
blob: 2716cb5165db308b1d5a86142baba50e6eb2fc24 (plain) (tree)
1
2
3
4
5

                                 

                  
               














                       

















                                                                            

                     
                       




                       



























                                                                                 


                                             


                                   
                    
                              

                            

                                      

 

                                                                          





                                                        
                                                                          

                                               

                                           


         




                                                                                              
                                    
















                                                                                        
                                                                                        






















                                                                                              

                                                                                      




                                                                                    
                                                 








                                                                              
                                         



                                                                                
                                            





















                                                                                                   
         




                  
use std::collections::BinaryHeap;

#[repr(u8)]
enum WALRingType {
    Null = 0x0,
    Full,
    First,
    Middle,
    Last
}

struct WALRingBlob {
    crc32: u32,
    rsize: u32,
    rtype: WALRingType,
    // payload follows
}

type WALFileId = u32;
type WALPos = u64;

#[derive(Eq, PartialEq, Copy, Clone)]
pub struct WALRingId {
    start: WALPos,
    end: WALPos
}

impl Ord for WALRingId {
    fn cmp(&self, other: &WALRingId) -> std::cmp::Ordering {
        other.start.cmp(&self.start).then_with(|| other.end.cmp(&self.end))
    }
}

impl PartialOrd for WALRingId {
    fn partial_cmp(&self, other: &WALRingId) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

pub struct WALState {
    pub first_fid: u64,
    pub last: WALPos,
    pub block_nbit: u8,
    pub file_nbit: u8,
}

pub trait WALFile {
    fn allocate(&self, offset: u64, length: usize);
    fn write(&self, offset: u64, data: Box<[u8]>);
    fn read(&self, offset: u64, length: usize) -> Box<[u8]>;
}

pub trait WALStore {
    fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
    fn remove_file(&self, filename: &str) -> bool;
    fn scan_files(&self) -> Box<[&str]>;
}

struct WALFilePool<F: WALStore> {
    store: F,
    handles: lru::LruCache<u64, Box<dyn WALFile>>,
    file_size: u64
}

impl<F: WALStore> WALFilePool<F> {
    fn new(store: F, file_size: u64, cache_size: usize) -> Self {
        WALFilePool {
            store,
            handles: lru::LruCache::new(cache_size),
            file_size,
        }
    }
    fn write(&mut self, offset: u64, data: Box<[u8]>) {
    }
    fn remove_file(&self, fid: u64) -> bool {
        true
    }
}

pub struct WALWriter<F: WALStore> {
    state: WALState,
    file_pool: WALFilePool<F>,
    block_buffer: Box<[u8]>,
    block_size: u32,
    next_complete: WALPos,
    io_complete: BinaryHeap<WALRingId>
}

impl<F: WALStore> WALWriter<F> {
    pub fn new(state: WALState, wal_store: F, cache_size: usize) -> Self {
        let mut b = Vec::new();
        let block_size = 1 << (state.block_nbit as u32);
        let file_size = 1 << (state.file_nbit as u64);
        b.resize(block_size as usize, 0);
        WALWriter{
            state,
            file_pool: WALFilePool::new(wal_store, file_size, cache_size),
            block_buffer: b.into_boxed_slice(),
            block_size,
            next_complete: 0,
            io_complete: BinaryHeap::new(),
        }
    }

    /// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
    /// function returns.  The caller then has the knowledge of WAL writes so it should defer
    /// actual data writes after WAL writes.
    pub fn grow(&mut self, records: &[Box<[u8]>]) -> Box<[WALRingId]> {
        let mut res = Vec::new();
        let mut writes = Vec::new();
        let msize = std::mem::size_of::<WALRingBlob>() as u32;
        // the global offest of the begining of the block
        // the start of the unwritten data
        let mut bbuff_start = self.state.last as u32 & (self.block_size - 1);
        // the end of the unwritten data
        let mut bbuff_cur = bbuff_start;

        for _rec in records {
            let mut rec = &_rec[..];
            let mut rsize = rec.len() as u32;
            let mut started = false;
            while rsize > 0 {
                let remain = self.block_size - bbuff_cur;
                if remain > msize {
                    let d = remain - msize;
                    let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>(
                        &mut self.block_buffer[bbuff_cur as usize] as *mut u8)};
                    let ring_start = self.state.last + (bbuff_cur - bbuff_start) as u64;
                    if d >= rsize {
                        // the remaining rec fits in the block
                        let payload = rec;
                        blob.crc32 = crc::crc32::checksum_ieee(payload);
                        blob.rsize = rsize;
                        blob.rtype = if started {WALRingType::Last} else {WALRingType::Full};
                        rsize = 0;
                        &mut self.block_buffer[bbuff_cur as usize..].copy_from_slice(payload);
                        bbuff_cur += rsize;
                    } else {
                        // the remaining block can only accommodate partial rec
                        let payload = &rec[..d as usize];
                        blob.crc32 = crc::crc32::checksum_ieee(payload);
                        blob.rsize = d;
                        blob.rtype = if started {WALRingType::Middle} else {
                            started = true;
                            WALRingType::First
                        };
                        rsize -= d;
                        &mut self.block_buffer[bbuff_cur as usize..].copy_from_slice(payload);
                        bbuff_cur += d;
                        rec = &rec[d as usize..];
                    }
                    let ring_end = self.state.last + (bbuff_cur - bbuff_start) as u64;
                    res.push(WALRingId{start: ring_start, end: ring_end});
                } else {
                    // add padding space by moving the point to the end of the block
                    bbuff_cur = self.block_size;
                }
                if bbuff_cur == self.block_size {
                    writes.push((self.state.last,
                             self.block_buffer[bbuff_start as usize..]
                                .to_vec().into_boxed_slice()));
                    self.state.last += (self.block_size - bbuff_start) as u64;
                    bbuff_start = 0;
                    bbuff_cur = 0;
                }
            }
        }
        if bbuff_cur > bbuff_start {
            writes.push((self.state.last,
                     self.block_buffer[bbuff_start as usize..bbuff_cur as usize]
                        .to_vec().into_boxed_slice()));
            self.state.last += (bbuff_cur - bbuff_start) as u64;
        }
        for (off, w) in writes.into_iter() {
            self.file_pool.write(off, w);
        }
        res.into_boxed_slice()
    }

    /// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
    /// complete so that it could automatically remove obsolete WAL files.
    pub fn peel(&mut self, records: &[WALRingId]) {
        for rec in records {
            self.io_complete.push(*rec)
        }
        let orig_fid = self.state.first_fid;
        while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) {
            if s != self.next_complete {
                break
            }
            let m = self.io_complete.pop().unwrap();
            self.next_complete = m.end
        }
        let next_fid = self.next_complete >> self.state.file_nbit;
        for fid in orig_fid..next_fid {
            self.file_pool.remove_file(fid);
        }
    }
}

struct WALReader {
}