summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDeterminant <[email protected]>2020-06-13 15:20:39 -0400
committerDeterminant <[email protected]>2020-06-13 15:20:39 -0400
commitd0e8ceeb250ce362d7d9bf2c6e5c297c716259cc (patch)
tree0a2d915d2c332c73e1fb1fa1eea2608936e26da9 /src
parentf02152a1c3ff0123efd2076fd9190feb81107b8e (diff)
...
Diffstat (limited to 'src')
-rw-r--r--src/wal.rs32
1 files changed, 25 insertions, 7 deletions
diff --git a/src/wal.rs b/src/wal.rs
index fe4f11d..c378e0a 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -61,6 +61,22 @@ impl PartialOrd for WALRingId {
}
}
+pub trait Record {
+ fn serialize(&self) -> WALBytes;
+}
+
+impl Record for WALBytes {
+ fn serialize(&self) -> WALBytes { self[..].into() }
+}
+
+impl Record for String {
+ fn serialize(&self) -> WALBytes { self.as_bytes().into() }
+}
+
+impl Record for str {
+ fn serialize(&self) -> WALBytes { self.as_bytes().into() }
+}
+
/// the state for a WAL writer
struct WALState {
/// the first file id of WAL
@@ -369,10 +385,10 @@ impl<F: WALStore> WALWriter<F> {
/// record is already logged. Then, after finalizing the changes encoded by that record to
/// the persistent storage, the caller can recycle the WAL files by invoking the given
/// `peel` with the given `WALRingId`s.
- pub fn grow<'a, T: AsRef<[WALBytes]>>(
+ pub fn grow<'a, R: Record + 'a>(
&'a mut self,
- records: T,
- ) -> Vec<impl Future<Output = Result<WALRingId, ()>> + 'a> {
+ records: Vec<R>,
+ ) -> Vec<impl Future<Output = Result<(R, WALRingId), ()>> + 'a> {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = self.msize as u32;
@@ -382,8 +398,9 @@ impl<F: WALStore> WALWriter<F> {
// the end of the unwritten data
let mut bbuff_cur = bbuff_start;
- for _rec in records.as_ref() {
- let mut rec = &_rec[..];
+ for rec in records.iter() {
+ let bytes = rec.serialize();
+ let mut rec = &bytes[..];
let mut rsize = rec.len() as u32;
let mut ring_start = None;
while rsize > 0 {
@@ -494,12 +511,13 @@ impl<F: WALStore> WALWriter<F> {
.collect();
let res = res
.into_iter()
- .map(|(ringid, blks)| {
+ .zip(records.into_iter())
+ .map(|((ringid, blks), rec)| {
future::try_join_all(
blks.into_iter().map(|idx| writes[idx].clone()),
)
.or_else(|_| future::ready(Err(())))
- .and_then(move |_| future::ready(Ok(ringid)))
+ .and_then(move |_| future::ready(Ok((rec, ringid))))
})
.collect();
res