aboutsummaryrefslogtreecommitdiff
path: root/src/wal.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/wal.rs')
-rw-r--r--src/wal.rs32
1 files changed, 25 insertions, 7 deletions
diff --git a/src/wal.rs b/src/wal.rs
index fe4f11d..c378e0a 100644
--- a/src/wal.rs
+++ b/src/wal.rs
@@ -61,6 +61,22 @@ impl PartialOrd for WALRingId {
}
}
+pub trait Record {
+ fn serialize(&self) -> WALBytes;
+}
+
+impl Record for WALBytes {
+ fn serialize(&self) -> WALBytes { self[..].into() }
+}
+
+impl Record for String {
+ fn serialize(&self) -> WALBytes { self.as_bytes().into() }
+}
+
+impl Record for str {
+ fn serialize(&self) -> WALBytes { self.as_bytes().into() }
+}
+
/// the state for a WAL writer
struct WALState {
/// the first file id of WAL
@@ -369,10 +385,10 @@ impl<F: WALStore> WALWriter<F> {
/// record is already logged. Then, after finalizing the changes encoded by that record to
/// the persistent storage, the caller can recycle the WAL files by invoking the given
/// `peel` with the given `WALRingId`s.
- pub fn grow<'a, T: AsRef<[WALBytes]>>(
+ pub fn grow<'a, R: Record + 'a>(
&'a mut self,
- records: T,
- ) -> Vec<impl Future<Output = Result<WALRingId, ()>> + 'a> {
+ records: Vec<R>,
+ ) -> Vec<impl Future<Output = Result<(R, WALRingId), ()>> + 'a> {
let mut res = Vec::new();
let mut writes = Vec::new();
let msize = self.msize as u32;
@@ -382,8 +398,9 @@ impl<F: WALStore> WALWriter<F> {
// the end of the unwritten data
let mut bbuff_cur = bbuff_start;
- for _rec in records.as_ref() {
- let mut rec = &_rec[..];
+ for rec in records.iter() {
+ let bytes = rec.serialize();
+ let mut rec = &bytes[..];
let mut rsize = rec.len() as u32;
let mut ring_start = None;
while rsize > 0 {
@@ -494,12 +511,13 @@ impl<F: WALStore> WALWriter<F> {
.collect();
let res = res
.into_iter()
- .map(|(ringid, blks)| {
+ .zip(records.into_iter())
+ .map(|((ringid, blks), rec)| {
future::try_join_all(
blks.into_iter().map(|idx| writes[idx].clone()),
)
.or_else(|_| future::ready(Err(())))
- .and_then(move |_| future::ready(Ok(ringid)))
+ .and_then(move |_| future::ready(Ok((rec, ringid))))
})
.collect();
res