aboutsummaryrefslogblamecommitdiff
path: root/src/lib.rs
blob: 29f78351e771ce622e676511cd18f6dc05d02850 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11

                                 

                  
               





           
          






                       
                     
                  

















                                                                            
 






                                                                         
                       
                                 


                      
                   


                                                               







                                                                                 

                                                                                                 

                                 



                                                        


                                  

                                                                                


                                                    


                                                

         













                                                                                                  
     























                                                                            


                                             


                                   
                    
                              

                            

                                      

 
                                
                                                                      
                               



                                                  


                                         
                                                                                      

                                               

                                           


         




                                                                                              
                                    


                                                              
                                                                             












                                                                                        
                                                                                        






















                                                                                              
                                                                                      
                                                                          




                                                                                    
                                                 

                                                                      
                                                                              





                                    
                                         

                                                                                
                                                                
         
                                     



















                                                                                                   
         
                                        




                  
use std::collections::BinaryHeap;

#[repr(u8)]
enum WALRingType {
    Null = 0x0,
    Full,
    First,
    Middle,
    Last
}

#[repr(C)]
struct WALRingBlob {
    crc32: u32,
    rsize: u32,
    rtype: WALRingType,
    // payload follows
}

type WALFileId = u64;
type WALPos = u64;

#[derive(Eq, PartialEq, Copy, Clone)]
pub struct WALRingId {
    start: WALPos,
    end: WALPos
}

impl Ord for WALRingId {
    fn cmp(&self, other: &WALRingId) -> std::cmp::Ordering {
        other.start.cmp(&self.start).then_with(|| other.end.cmp(&self.end))
    }
}

impl PartialOrd for WALRingId {
    fn partial_cmp(&self, other: &WALRingId) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

/// the state for a WAL writer
struct WALState {
    /// the first file id of WAL
    pub first_fid: WALFileId,
    /// the next position for a record, addressed in the entire WAL space
    pub next: WALPos,
    /// number of bits for a block
    pub block_nbit: u8,
    /// number of bits for a file
    pub file_nbit: u8,
}

pub trait WALFile {
    fn allocate(&self, offset: WALPos, length: usize);
    fn write(&self, offset: WALPos, data: Box<[u8]>);
    fn read(&self, offset: WALPos, length: usize) -> Box<[u8]>;
}

pub trait WALStore {
    fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>>;
    fn remove_file(&self, filename: &str) -> bool;
    fn scan_files(&self) -> Box<[&str]>;
}

/// The middle layer that manages WAL file handles and invokes public trait functions to actually
/// manipulate files and their contents.
struct WALFilePool<F: WALStore> {
    store: F,
    handles: lru::LruCache<WALFileId, Box<dyn WALFile>>,
    file_nbit: u64,
    file_size: u64,
    block_size: u64,
}

impl<F: WALStore> WALFilePool<F> {
    fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
        let file_nbit = file_nbit as u64;
        WALFilePool {
            store,
            handles: lru::LruCache::new(cache_size),
            file_nbit,
            file_size: 1 << (file_nbit as u64),
            block_size: 1 << (block_nbit as u64)
        }
    }

    fn get_fname(fid: WALFileId) -> String {
        format!("{:08x}.log", fid)
    }

    fn get_file(&mut self, fid: u64) -> &'static dyn WALFile {
        let h = match self.handles.get(&fid) {
            Some(h) => &**h,
            None => {
                self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), true).unwrap());
                &**self.handles.get(&fid).unwrap()
            }
        };
        unsafe {&*(h as *const dyn WALFile)}
    }

    fn write(&mut self, writes: Vec<(WALPos, Box<[u8]>)>) {
        // pre-allocate the blocks
        let fid = writes[0].0 >> self.file_nbit;
        let mut alloc_start = writes[0].0 & (self.file_size - 1);
        let mut alloc_end = alloc_start + self.block_size;
        let mut h = self.get_file(fid);
        for (off, _) in &writes[1..] {
            let next_fid = off >> self.file_nbit;
            if next_fid != fid {
                h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
                h