summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs103
1 files changed, 77 insertions, 26 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 2716cb5..29f7835 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -9,6 +9,7 @@ enum WALRingType {
Last
}
+#[repr(C)]
struct WALRingBlob {
crc32: u32,
rsize: u32,
@@ -16,7 +17,7 @@ struct WALRingBlob {
// payload follows
}
-type WALFileId = u32;
+type WALFileId = u64;
type WALPos = u64;
#[derive(Eq, PartialEq, Copy, Clone)]
@@ -37,17 +38,22 @@ impl PartialOrd for WALRingId {
}
}
-pub struct WALState {
- pub first_fid: u64,
- pub last: WALPos,
+/// the state for a WAL writer
+struct WALState {
+ /// the first file id of WAL
+ pub first_fid: WALFileId,
+ /// the next position for a record, addressed in the entire WAL space
+ pub next: WALPos,
+ /// number of bits for a block
pub block_nbit: u8,
+ /// number of bits for a file
pub file_nbit: u8,
}
pub trait WALFile {
- fn allocate(&self, offset: u64, length: usize);
- fn write(&self, offset: u64, data: Box<[u8]>);
- fn read(&self, offset: u64, length: usize) -> Box<[u8]>;
+ fn allocate(&self, offset: WALPos, length: usize);
+ fn write(&self, offset: WALPos, data: Box<[u8]>);
+ fn read(&self, offset: WALPos, length: usize) -> Box<[u8]>;
}
pub trait WALStore {
@@ -56,22 +62,66 @@ pub trait WALStore {
fn scan_files(&self) -> Box<[&str]>;
}
+/// The middle layer that manages WAL file handles and invokes public trait functions to actually
+/// manipulate files and their contents.
struct WALFilePool<F: WALStore> {
store: F,
- handles: lru::LruCache<u64, Box<dyn WALFile>>,
- file_size: u64
+ handles: lru::LruCache<WALFileId, Box<dyn WALFile>>,
+ file_nbit: u64,
+ file_size: u64,
+ block_size: u64,
}
impl<F: WALStore> WALFilePool<F> {
- fn new(store: F, file_size: u64, cache_size: usize) -> Self {
+ fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
+ let file_nbit = file_nbit as u64;
WALFilePool {
store,
handles: lru::LruCache::new(cache_size),
- file_size,
+ file_nbit,
+ file_size: 1 << (file_nbit as u64),
+ block_size: 1 << (block_nbit as u64)
}
}
- fn write(&mut self, offset: u64, data: Box<[u8]>) {
+
+ fn get_fname(fid: WALFileId) -> String {
+ format!("{:08x}.log", fid)
+ }
+
+ fn get_file(&mut self, fid: u64) -> &'static dyn WALFile {
+ let h = match self.handles.get(&fid) {
+ Some(h) => &**h,
+ None => {
+ self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), true).unwrap());
+ &**self.handles.get(&fid).unwrap()
+ }
+ };
+ unsafe {&*(h as *const dyn WALFile)}
}
+
+ fn write(&mut self, writes: Vec<(WALPos, Box<[u8]>)>) {
+ // pre-allocate the blocks
+ let fid = writes[0].0 >> self.file_nbit;
+ let mut alloc_start = writes[0].0 & (self.file_size - 1);
+ let mut alloc_end = alloc_start + self.block_size;
+ let mut h = self.get_file(fid);
+ for (off, _) in &writes[1..] {
+ let next_fid = off >> self.file_nbit;
+ if next_fid != fid {
+ h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
+ h = self.get_file(next_fid);
+ alloc_start = 0;
+ alloc_end = alloc_start + self.block_size;
+ } else {
+ alloc_end += self.block_size;
+ }
+ }
+ h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
+ for (off, w) in writes.into_iter() {
+ self.get_file(off >> self.file_nbit).write(off, w);
+ }
+ }
+
fn remove_file(&self, fid: u64) -> bool {
true
}
@@ -87,14 +137,16 @@ pub struct WALWriter<F: WALStore> {
}
impl<F: WALStore> WALWriter<F> {
- pub fn new(state: WALState, wal_store: F, cache_size: usize) -> Self {
+ fn new(state: WALState, wal_store: F, cache_size: usize) -> Self {
let mut b = Vec::new();
- let block_size = 1 << (state.block_nbit as u32);
- let file_size = 1 << (state.file_nbit as u64);
+ let block_nbit = state.block_nbit;
+ let block_size = 1 << (block_nbit as u32);
+ let file_nbit = state.file_nbit;
+ let file_size = 1 << (file_nbit as u64);
b.resize(block_size as usize, 0);
WALWriter{
state,
- file_pool: WALFilePool::new(wal_store, file_size, cache_size),
+ file_pool: WALFilePool::new(wal_store, file_nbit, block_nbit, cache_size),
block_buffer: b.into_boxed_slice(),
block_size,
next_complete: 0,
@@ -111,7 +163,7 @@ impl<F: WALStore> WALWriter<F> {
let msize = std::mem::size_of::<WALRingBlob>() as u32;
// the global offest of the begining of the block
// the start of the unwritten data
- let mut bbuff_start = self.state.last as u32 & (self.block_size - 1);
+ let mut bbuff_start = self.state.next as u32 & (self.block_size - 1);
// the end of the unwritten data
let mut bbuff_cur = bbuff_start;
@@ -125,7 +177,7 @@ impl<F: WALStore> WALWriter<F> {
let d = remain - msize;
let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>(
&mut self.block_buffer[bbuff_cur as usize] as *mut u8)};
- let ring_start = self.state.last + (bbuff_cur - bbuff_start) as u64;
+ let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64;
if d >= rsize {
// the remaining rec fits in the block
let payload = rec;
@@ -149,31 +201,29 @@ impl<F: WALStore> WALWriter<F> {
bbuff_cur += d;
rec = &rec[d as usize..];
}
- let ring_end = self.state.last + (bbuff_cur - bbuff_start) as u64;
+ let ring_end = self.state.next + (bbuff_cur - bbuff_start) as u64;
res.push(WALRingId{start: ring_start, end: ring_end});
} else {
// add padding space by moving the point to the end of the block
bbuff_cur = self.block_size;
}
if bbuff_cur == self.block_size {
- writes.push((self.state.last,
+ writes.push((self.state.next,
self.block_buffer[bbuff_start as usize..]
.to_vec().into_boxed_slice()));
- self.state.last += (self.block_size - bbuff_start) as u64;
+ self.state.next += (self.block_size - bbuff_start) as u64;
bbuff_start = 0;
bbuff_cur = 0;
}
}
}
if bbuff_cur > bbuff_start {
- writes.push((self.state.last,
+ writes.push((self.state.next,
self.block_buffer[bbuff_start as usize..bbuff_cur as usize]
.to_vec().into_boxed_slice()));
- self.state.last += (bbuff_cur - bbuff_start) as u64;
- }
- for (off, w) in writes.into_iter() {
- self.file_pool.write(off, w);
+ self.state.next += (bbuff_cur - bbuff_start) as u64;
}
+ self.file_pool.write(writes);
res.into_boxed_slice()
}
@@ -195,6 +245,7 @@ impl<F: WALStore> WALWriter<F> {
for fid in orig_fid..next_fid {
self.file_pool.remove_file(fid);
}
+ self.state.first_fid = next_fid;
}
}