aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock7
-rw-r--r--Cargo.toml3
-rw-r--r--examples/.gitignore2
-rw-r--r--examples/demo1.rs56
-rw-r--r--src/lib.rs45
5 files changed, 94 insertions, 19 deletions
diff --git a/Cargo.lock b/Cargo.lock
index d6e8d74..f20c53d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -81,6 +81,7 @@ name = "growth-ring"
version = "0.1.0"
dependencies = [
"crc",
+ "hex",
"lru",
"scan_fmt",
]
@@ -96,6 +97,12 @@ dependencies = [
]
[[package]]
+name = "hex"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35"
+
+[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index ee8d867..2915a84 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -11,6 +11,9 @@ crc = "1.8.1"
lru = "0.5.1"
scan_fmt = "0.2.5"
+[dev-dependencies]
+hex = "0.4.2"
+
[lib]
name = "growthring"
path = "src/lib.rs"
diff --git a/examples/.gitignore b/examples/.gitignore
new file mode 100644
index 0000000..7d977f4
--- /dev/null
+++ b/examples/.gitignore
@@ -0,0 +1,2 @@
+demo
+testdb
diff --git a/examples/demo1.rs b/examples/demo1.rs
new file mode 100644
index 0000000..0f50ec3
--- /dev/null
+++ b/examples/demo1.rs
@@ -0,0 +1,56 @@
+use growthring::{WALFile, WALStore, WALPos, WALBytes, WALLoader, WALWriter};
+
+struct WALFileFake {
+ filename: String
+}
+
+impl WALFile for WALFileFake {
+ fn allocate(&self, offset: WALPos, length: usize) {
+ println!("{}.allocate(offset=0x{:x}, end=0x{:x})", self.filename, offset, offset + length as u64);
+ }
+ fn write(&self, offset: WALPos, data: WALBytes) {
+ println!("{}.write(offset=0x{:x}, end=0x{:x}, data=0x{})", self.filename, offset, offset + data.len() as u64, hex::encode(data));
+ }
+ fn read(&self, offset: WALPos, length: usize) -> WALBytes {
+ Vec::new().into_boxed_slice()
+ }
+}
+
+struct WALStoreFake;
+impl WALStore for WALStoreFake {
+ fn open_file(&self, filename: &str, touch: bool) -> Option<Box<dyn WALFile>> {
+ println!("open_file(filename={}, touch={}", filename, touch);
+ let filename = filename.to_string();
+ Some(Box::new(WALFileFake{ filename }))
+ }
+ fn remove_file(&self, filename: &str) -> bool {
+ println!("remove_file(filename={})", filename);
+ true
+ }
+ fn enumerate_files(&self) -> Box<[String]> {
+ println!("enumerate_files()");
+ Vec::new().into_boxed_slice()
+ }
+ fn apply_payload(&self, payload: WALBytes) {
+ println!("apply_payload(payload=0x{})", hex::encode(payload))
+ }
+}
+
+fn test(records: Vec<String>, wal: &mut WALWriter<WALStoreFake>) {
+ let records: Vec<WALBytes> = records.into_iter().map(|s| s.into_bytes().into_boxed_slice()).collect();
+ let ret = wal.grow(&records);
+ for ring_id in ret.iter() {
+ println!("got ring id: {:?}", ring_id);
+ }
+}
+
+fn main() {
+ let store = WALStoreFake;
+ let mut wal = WALLoader::new(store, 9, 8, 1000).recover();
+ for _ in 0..3 {
+ test(["hi", "hello", "lol"].iter().map(|s| s.to_string()).collect::<Vec<String>>(), &mut wal)
+ }
+ for _ in 0..3 {
+ test(["a".repeat(10), "b".repeat(100), "c".repeat(1000)].iter().map(|s| s.to_string()).collect::<Vec<String>>(), &mut wal)
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 53f4e0f..1e1b653 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -11,7 +11,7 @@ enum WALRingType {
Last
}
-#[repr(C)]
+#[repr(packed)]
struct WALRingBlob {
crc32: u32,
rsize: u32,
@@ -19,11 +19,11 @@ struct WALRingBlob {
// payload follows
}
-type WALBytes = Box<[u8]>;
-type WALFileId = u64;
-type WALPos = u64;
+pub type WALBytes = Box<[u8]>;
+pub type WALFileId = u64;
+pub type WALPos = u64;
-#[derive(Eq, PartialEq, Copy, Clone)]
+#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub struct WALRingId {
start: WALPos,
end: WALPos
@@ -118,7 +118,7 @@ impl<F: WALStore> WALFilePool<F> {
// TODO: evict stale handles
fn write(&mut self, writes: Vec<(WALPos, WALBytes)>) {
// pre-allocate the file space
- let fid = writes[0].0 >> self.file_nbit;
+ let mut fid = writes[0].0 >> self.file_nbit;
let mut alloc_start = writes[0].0 & (self.file_size - 1);
let mut alloc_end = alloc_start + writes[0].1.len() as u64;
let mut h = self.get_file(fid, true);
@@ -129,13 +129,14 @@ impl<F: WALStore> WALFilePool<F> {
h = self.get_file(next_fid, true);
alloc_start = 0;
alloc_end = alloc_start + w.len() as u64;
+ fid = next_fid;
} else {
alloc_end += w.len() as u64;
}
}
h.allocate(alloc_start, (alloc_end - alloc_start) as usize);
for (off, w) in writes.into_iter() {
- self.get_file(off >> self.file_nbit, true).write(off, w);
+ self.get_file(off >> self.file_nbit, true).write(off & (self.file_size - 1), w);
}
}
@@ -175,7 +176,7 @@ impl<F: WALStore> WALWriter<F> {
/// Submit a sequence of records to WAL; WALStore/WALFile callbacks are invoked before the
/// function returns. The caller then has the knowledge of WAL writes so it should defer
/// actual data writes after WAL writes.
- pub fn grow(&mut self, records: &[WALBytes]) -> Box<[WALRingId]> {
+ pub fn grow<T: AsRef<[WALBytes]>>(&mut self, records: T) -> Box<[WALRingId]> {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = std::mem::size_of::<WALRingBlob>() as u32;
@@ -185,7 +186,7 @@ impl<F: WALStore> WALWriter<F> {
// the end of the unwritten data
let mut bbuff_cur = bbuff_start;
- for _rec in records {
+ for _rec in records.as_ref() {
let mut rec = &_rec[..];
let mut rsize = rec.len() as u32;
let mut started = false;
@@ -193,18 +194,22 @@ impl<F: WALStore> WALWriter<F> {
let remain = self.block_size - bbuff_cur;
if remain > msize {
let d = remain - msize;
+ let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64;
let blob = unsafe {std::mem::transmute::<*mut u8, &mut WALRingBlob>(
(&mut self.block_buffer[bbuff_cur as usize..]).as_mut_ptr())};
- let ring_start = self.state.next + (bbuff_cur - bbuff_start) as u64;
+ bbuff_cur += msize;
if d >= rsize {
// the remaining rec fits in the block
let payload = rec;
+ println!("rsize {} {}", rsize, d);
blob.crc32 = crc::crc32::checksum_ieee(payload);
blob.rsize = rsize;
blob.rtype = if started {WALRingType::Last} else {WALRingType::Full};
- rsize = 0;
- &mut self.block_buffer[bbuff_cur as usize..].copy_from_slice(payload);
+ &mut self.block_buffer[
+ bbuff_cur as usize..
+ bbuff_cur as usize + payload.len()].copy_from_slice(payload);
bbuff_cur += rsize;
+ rsize = 0;
} else {
// the remaining block can only accommodate partial rec
let payload = &rec[..d as usize];
@@ -214,9 +219,11 @@ impl<F: WALStore> WALWriter<F> {
started = true;
WALRingType::First
};
- rsize -= d;
- &mut self.block_buffer[bbuff_cur as usize..].copy_from_slice(payload);
+ &mut self.block_buffer[
+ bbuff_cur as usize..
+ bbuff_cur as usize + payload.len()].copy_from_slice(payload);
bbuff_cur += d;
+ rsize -= d;
rec = &rec[d as usize..];
}
let ring_end = self.state.next + (bbuff_cur - bbuff_start) as u64;
@@ -247,8 +254,8 @@ impl<F: WALStore> WALWriter<F> {
/// Inform the WALWriter that data writes (specified by a slice of (offset, length) tuples) are
/// complete so that it could automatically remove obsolete WAL files.
- pub fn peel(&mut self, records: &[WALRingId]) {
- for rec in records {
+ pub fn peel<T: AsRef<[WALRingId]>>(&mut self, records: T) {
+ for rec in records.as_ref() {
self.io_complete.push(*rec)
}
let orig_fid = self.state.first_fid;
@@ -267,14 +274,14 @@ impl<F: WALStore> WALWriter<F> {
}
}
-pub struct WALReader<F: WALStore> {
+pub struct WALLoader<F: WALStore> {
file_pool: WALFilePool<F>,
}
-impl<F: WALStore> WALReader<F> {
+impl<F: WALStore> WALLoader<F> {
pub fn new(store: F, file_nbit: u8, block_nbit: u8, cache_size: usize) -> Self {
let file_pool = WALFilePool::new(store, file_nbit, block_nbit, cache_size);
- WALReader{ file_pool }
+ WALLoader{ file_pool }
}
pub fn recover(mut self) -> WALWriter<F> {