aboutsummaryrefslogtreecommitdiff
path: root/src/wal.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wal.rs')
-rw-r--r--src/wal.rs228
1 files changed, 159 insertions, 69 deletions
diff --git a/src/wal.rs b/src/wal.rs
index fe85c41..39c4390 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -7,7 +7,7 @@ enum WALRingType {
Full,
First,
Middle,
- Last
+ Last,
}
#[repr(packed)]
@@ -25,18 +25,27 @@ pub type WALPos = u64;
#[derive(Eq, PartialEq, Copy, Clone, Debug, Hash)]
pub struct WALRingId {
start: WALPos,
- end: WALPos
+ end: WALPos,
}
impl WALRingId {
- pub fn empty_id() -> Self { WALRingId { start: 0, end: 0 } }
- pub fn get_start(&self) -> WALPos { self.start }
- pub fn get_end(&self) -> WALPos { self.end }
+ pub fn empty_id() -> Self {
+ WALRingId { start: 0, end: 0 }
+ }
+ pub fn get_start(&self) -> WALPos {
+ self.start
+ }
+ pub fn get_end(&self) -> WALPos {
+ self.end
+ }
}
impl Ord for WALRingId {
fn cmp(&self, other: &WALRingId) -> std::cmp::Ordering {
- other.start.cmp(&self.start).then_with(|| other.end.cmp(&self.end))
+ other
+ .start
+ .cmp(&self.start)
+ .then_with(|| other.end.cmp(&self.end))
}
}
@@ -68,14 +77,22 @@ pub trait WALFile {
/// should be _atomic_ (the entire single write should be all or nothing).
fn write(&self, offset: WALPos, data: WALBytes) -> Result<(), ()>;
/// Read data with offset.
- fn read(&self, offset: WALPos, length: usize) -> Result<Option<WALBytes>, ()>;
+ fn read(
+ &self,
+ offset: WALPos,
+ length: usize,
+ ) -> Result<Option<WALBytes>, ()>;
}
pub trait WALStore {
type FileNameIter: Iterator<Item = String>;
/// Open a file given the filename, create the file if not exists when `touch` is `true`.
- fn open_file(&mut self, filename: &str, touch: bool) -> Result<Box<dyn WALFile>, ()>;
+ fn open_file(
+ &mut self,
+ filename: &str,
+ touch: bool,
+ ) -> Result<Box<dyn WALFile>, ()>;
/// Unlink a file given the filename.
fn remove_file(&mut self, filename: &str) -> Result<(), ()>;
/// Enumerate all WAL filenames. It should include all WAL files that are previously opened
@@ -84,7 +101,11 @@ pub trait WALStore {
/// Apply the payload during recovery. An invocation of the callback waits the application for
/// redoing the given operation to ensure its state is consistent. We assume the necessary
/// changes by the payload has already been persistent when the callback returns.
- fn apply_payload(&mut self, payload: WALBytes, ringid: WALRingId) -> Result<(), ()>;
+ fn apply_payload(
+ &mut self,
+ payload: WALBytes,
+ ringid: WALRingId,
+ ) -> Result<(), ()>;
}
/// The middle layer that manages WAL file handles and invokes public trait functions to actually
@@ -114,15 +135,22 @@ impl<F: WALStore> WALFilePool<F> {
format!("{:08x}.log", fid)
}
- fn get_file(&mut self, fid: u64, touch: bool) -> Result<&'static dyn WALFile, ()> {
+ fn get_file(
+ &mut self,
+ fid: u64,
+ touch: bool,
+ ) -> Result<&'static dyn WALFile, ()> {
let h = match self.handles.get(&fid) {
Some(h) => &**h,
None => {
- self.handles.put(fid, self.store.open_file(&Self::get_fname(fid), touch)?);
+ self.handles.put(
+ fid,
+ self.store.open_file(&Self::get_fname(fid), touch)?,
+ );
&**self.handles.get(&fid).unwrap()
}
};
- Ok(unsafe {&*(h as *const dyn WALFile)})
+ Ok(unsafe { &*(h as *const dyn WALFile) })
}
fn get_fid(&mut self, fname: &str) -> WALFileId {
@@ -135,14 +163,19 @@ impl<F: WALStore> WALFilePool<F> {
let mut fid = writes[0].0 >> self.file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
let mut alloc_end = alloc_start + writes[0].1.len() as u64;
- let files = writes.iter().map(|(off, _)|
- self.get_file((*off) >> self.file_nbit, true)).collect::<Result<Vec<&dyn WALFile>, ()>>()?;
+ let files = writes
+ .iter()
+ .map(|(off, _)| self.get_file((*off) >> self.file_nbit, true))
+ .collect::<Result<Vec<&dyn WALFile>, ()>>()?;
// prepare file handles
let mut last_h = files[0];
for ((off, w), h) in writes[1..].iter().zip(files[1..].iter()) {
let next_fid = off >> self.file_nbit;
if next_fid != fid {
- last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?;
+ last_h.allocate(
+ alloc_start,
+ (alloc_end - alloc_start) as usize,
+ )?;
last_h = *h;
alloc_start = 0;
alloc_end = alloc_start + w.len() as u64;
@@ -153,7 +186,8 @@ impl<F: WALStore> WALFilePool<F> {
}
last_h.allocate(alloc_start, (alloc_end - alloc_start) as usize)?;
for (off, w) in writes.into_iter() {
- self.get_file(off >> self.file_nbit, true)?.write(off & (self.file_size - 1), w)?;
+ self.get_file(off >> self.file_nbit, true)?
+ .write(off & (self.file_size - 1), w)?;
}
Ok(())
}
@@ -162,7 +196,9 @@ impl<F: WALStore> WALFilePool<F> {
self.store.remove_file(&Self::get_fname(fid))
}
- fn reset(&mut self) { self.handles.clear() }
+ fn reset(&mut self) {
+ self.handles.clear()
+ }
}
pub struct WALWriter<F: WALStore> {
@@ -172,7 +208,7 @@ pub struct WALWriter<F: WALStore> {
block_size: u32,
next_complete: WALPos,
io_complete: BinaryHeap<WALRingId>,
- msize: usize
+ msize: usize,
}
impl<F: WALStore> WALWriter<F> {
@@ -181,21 +217,24 @@ impl<F: WALStore> WALWriter<F> {
let block_size = 1 << file_pool.block_nbit as u32;
let msize = std::mem::size_of::<WALRingBlob>();
b.resize(block_size as usize, 0);
- WALWriter{
+ WALWriter {
state,
file_pool,
block_buffer: b.into_boxed_slice(),
block_size,
next_complete: 0,
io_complete: BinaryHeap::new(),
- msize
+ msize,
}
}
/// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
/// function returns. The caller then has the knowledge of WAL writes so it should defer
/// actual data writes after WAL writes.
- pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> (Box<[WALRingId]>, Result<(), ()>) {
+ pub fn grow<T: AsRef<[WALBytes]>>(
+ &mut self,
+ records: T,
+ ) -> (Box<[WALRingId]>, Result<(), ()>) {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = self.msize as u32;
@@ -213,9 +252,14 @@ impl<F: WALStore> WALWriter<F> {
let remain = self.block_size - bbuff_cur;
if remain > msize {
let d = remain - msize;
- let rs0 = self.state.next + (bbuff_cur - bbuff_start) as u64;
- let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>(
- (&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())};
+ let rs0 =
+ self.state.next + (bbuff_cur - bbuff_start) as u64;
+ let blob = unsafe {
+ std::mem::transmute::<*mut u8, &mut WALRingBlob>(
+ (&mut self.block_buffer[bbuff_cur as usize..])
+ .as_mut_ptr(),
+ )
+ };
bbuff_cur += msize;
if d >= rsize {
// the remaining rec fits in the block
@@ -228,25 +272,28 @@ impl<F: WALStore> WALWriter<F> {
(rs0, WALRingType::Full)
};
blob.rtype = rt;
- &mut self.block_buffer[
- bbuff_cur as usize..
- bbuff_cur as usize + payload.len()].copy_from_slice(payload);
+ &mut self.block_buffer[bbuff_cur as usize..
+ bbuff_cur as usize + payload.len()]
+ .copy_from_slice(payload);
bbuff_cur += rsize;
rsize = 0;
- let end = self.state.next + (bbuff_cur - bbuff_start) as u64;
- res.push(WALRingId{start: rs, end });
+ let end =
+ self.state.next + (bbuff_cur - bbuff_start) as u64;
+ res.push(WALRingId { start: rs, end });
} else {
// the remaining block can only accommodate partial rec
let payload = &rec[..d as usize];
blob.crc32 = crc::crc32::checksum_ieee(payload);
blob.rsize = d;
- blob.rtype = if ring_start.is_some() {WALRingType::Middle} else {
+ blob.rtype = if ring_start.is_some() {
+ WALRingType::Middle
+ } else {
ring_start = Some(rs0);
WALRingType::First
};
- &mut self.block_buffer[
- bbuff_cur as usize..
- bbuff_cur as usize + payload.len()].copy_from_slice(payload);
+ &mut self.block_buffer[bbuff_cur as usize..
+ bbuff_cur as usize + payload.len()]
+ .copy_from_slice(payload);
bbuff_cur += d;
rsize -= d;
rec = &rec[d as usize..];
@@ -256,9 +303,12 @@ impl<F: WALStore> WALWriter<F> {
bbuff_cur = self.block_size;
}
if bbuff_cur == self.block_size {
- writes.push((self.state.next,
- self.block_buffer[bbuff_start as usize..]
- .to_vec().into_boxed_slice()));
+ writes.push((
+ self.state.next,
+ self.block_buffer[bbuff_start as usize..]
+ .to_vec()
+ .into_boxed_slice(),
+ ));
self.state.next += (self.block_size - bbuff_start) as u64;
bbuff_start = 0;
bbuff_cur = 0;
@@ -266,9 +316,12 @@ impl<F: WALStore> WALWriter<F> {
}
}
if bbuff_cur > bbuff_start {
- writes.push((self.state.next,
- self.block_buffer[bbuff_start as usize..bbuff_cur as usize]
- .to_vec().into_boxed_slice()));
+ writes.push((
+ self.state.next,
+ self.block_buffer[bbuff_start as usize..bbuff_cur as usize]
+ .to_vec()
+ .into_boxed_slice(),
+ ));
self.state.next += (bbuff_cur - bbuff_start) as u64;
}
@@ -277,16 +330,20 @@ impl<F: WALStore> WALWriter<F> {
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
/// complete so that it could automatically remove obsolete WAL files.
- pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) -> Result<(), ()> {
+ pub fn peel<T: AsRef<[WALRingId]>>(
+ &mut self,
+ records: T,
+ ) -> Result<(), ()> {
let msize = self.msize as u64;
let block_size = self.block_size as u64;
for rec in records.as_ref() {
self.io_complete.push(*rec);
}
let orig_fid = self.state.first_fid;
- while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start)) {
+ while let Some(s) = self.io_complete.peek().and_then(|&e| Some(e.start))
+ {
if s != self.next_complete {
- break
+ break;
}
let mut m = self.io_complete.pop().unwrap();
let block_remain = block_size - (m.end & (block_size - 1));
@@ -309,7 +366,7 @@ pub struct WALLoader {
block_nbit: u8,
cache_size: usize,
msize: usize,
- filename_fmt: regex::Regex
+ filename_fmt: regex::Regex,
}
impl WALLoader {
@@ -318,17 +375,30 @@ impl WALLoader {
assert!(file_nbit > block_nbit);
assert!(msize < 1 << block_nbit);
let filename_fmt = regex::Regex::new(r"[0-9a-f]+\.log").unwrap();
- WALLoader{ file_nbit, block_nbit, cache_size, msize, filename_fmt }
+ WALLoader {
+ file_nbit,
+ block_nbit,
+ cache_size,
+ msize,
+ filename_fmt,
+ }
}
/// Recover by reading the WAL log files.
pub fn recover<F: WALStore>(self, store: F) -> Result<WALWriter<F>, ()> {
- let mut file_pool = WALFilePool::new(store, self.file_nbit, self.block_nbit, self.cache_size);
+ let mut file_pool = WALFilePool::new(
+ store,
+ self.file_nbit,
+ self.block_nbit,
+ self.cache_size,
+ );
let block_size = 1 << file_pool.block_nbit;
- let msize = std::mem::size_of::<WALRingBlob>() as u32;
- let mut logfiles: Vec<String> = file_pool.store
+ let msize = self.msize as u32;
+ let mut logfiles: Vec<String> = file_pool
+ .store
.enumerate_files()?
- .filter(|f| self.filename_fmt.is_match(f)).collect();
+ .filter(|f| self.filename_fmt.is_match(f))
+ .collect();
// TODO: check for missing logfiles
logfiles.sort();
let mut chunks = None;
@@ -340,7 +410,10 @@ impl WALLoader {
let ringid_start = (fid << file_pool.file_nbit) + off;
off += msize as u64;
let header = unsafe {
- std::mem::transmute::<*const u8, &WALRingBlob>(header_raw.as_ptr())};
+ std::mem::transmute::<*const u8, &WALRingBlob>(
+ header_raw.as_ptr(),
+ )
+ };
let rsize = header.rsize;
match header.rtype {
WALRingType::Full => {
@@ -351,26 +424,36 @@ impl WALLoader {
payload,
WALRingId {
start: ringid_start,
- end: (fid << file_pool.file_nbit) + off
- })?;
- },
+ end: (fid << file_pool.file_nbit) + off,
+ },
+ )?;
+ }
WALRingType::First => {
assert!(chunks.is_none());
- chunks = Some((vec![f.read(off, rsize as usize)?.ok_or(())?], ringid_start));
+ chunks = Some((
+ vec![f.read(off, rsize as usize)?.ok_or(())?],
+ ringid_start,
+ ));
off += rsize as u64;
- },
+ }
WALRingType::Middle => {
if let Some((chunks, _)) = &mut chunks {
- chunks.push(f.read(off, rsize as usize)?.ok_or(())?);
+ chunks
+ .push(f.read(off, rsize as usize)?.ok_or(())?);
} // otherwise ignore the leftover
off += rsize as u64;
- },
+ }
WALRingType::Last => {
- if let Some((mut chunks, ringid_start)) = chunks.take() {
- chunks.push(f.read(off, rsize as usize)?.ok_or(())?);
+ if let Some((mut chunks, ringid_start)) = chunks.take()
+ {
+ chunks
+ .push(f.read(off, rsize as usize)?.ok_or(())?);
off += rsize as u64;
let mut payload = Vec::new();
- payload.resize(chunks.iter().fold(0, |acc, v| acc + v.len()), 0);
+ payload.resize(
+ chunks.iter().fold(0, |acc, v| acc + v.len()),
+ 0,
+ );
let mut ps = &mut payload[..];
for c in chunks {
ps[..c.len()].copy_from_slice(&*c);
@@ -380,11 +463,15 @@ impl WALLoader {
payload.into_boxed_slice(),
WALRingId {
start: ringid_start,
- end: (fid << file_pool.file_nbit) + off
- })?;
- } // otherwise ignore the leftover
- else { off += rsize as u64; }
- },
+ end: (fid << file_pool.file_nbit) + off,
+ },
+ )?;
+ }
+ // otherwise ignore the leftover
+ else {
+ off += rsize as u64;
+ }
+ }
WALRingType::Null => break,
}
let block_remain = block_size - (off & (block_size - 1));
@@ -396,10 +483,13 @@ impl WALLoader {
file_pool.remove_file(fid)?;
}
file_pool.reset();
- Ok(WALWriter::new(WALState {
- first_fid: 0,
- next: 0,
- file_nbit: file_pool.file_nbit,
- }, file_pool))
+ Ok(WALWriter::new(
+ WALState {
+ first_fid: 0,
+ next: 0,
+ file_nbit: file_pool.file_nbit,
+ },
+ file_pool,
+ ))
}
}