Skip to content

Commit

Permalink
[Bifrost] Improve merge operator performance by coalescing metadata u…
Browse files Browse the repository at this point in the history
…pdates

~20% P100 latency improvement when CPU bound according to bifrost-benchpress. Why didn't I do this from the get-go is beyond me!

It'd be best to move away from the Vec<> structure to a fixed size struct to avoid searching but it'd break backward compatibility.
  • Loading branch information
AhmedSoliman committed Jul 19, 2024
1 parent 66e6136 commit 9f32462
Showing 1 changed file with 49 additions and 7 deletions.
56 changes: 49 additions & 7 deletions crates/bifrost/src/providers/local_loglet/log_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use super::LogStoreError;
pub struct LogStateUpdates {
/// SmallVec is used to avoid heap allocation for the common case of small
/// number of updates.
updates: SmallVec<[LogStateUpdate; 1]>,
updates: SmallVec<[LogStateUpdate; 3]>,
}

/// Represents a single update to the log state.
Expand All @@ -46,26 +46,65 @@ impl LogStateUpdates {
}

pub fn update_release_pointer(mut self, release_pointer: LogletOffset) -> Self {
// update existing release pointer if exists, otherwise, add to the vec.
for update in &mut self.updates {
if let LogStateUpdate::ReleasePointer(ref mut existing) = update {
*existing = (*existing).max(release_pointer.into());
return self;
}
}

self.updates
.push(LogStateUpdate::ReleasePointer(release_pointer.into()));
self
}

pub fn update_trim_point(mut self, trim_point: LogletOffset) -> Self {
// update existing release pointer if exists, otherwise, add to the vec.
for update in &mut self.updates {
if let LogStateUpdate::TrimPoint(ref mut existing) = update {
*existing = (*existing).max(trim_point.into());
return self;
}
}
self.updates
.push(LogStateUpdate::TrimPoint(trim_point.into()));
self
}

pub fn seal(mut self) -> Self {
// do nothing if we will seal already
for update in &self.updates {
if matches!(update, LogStateUpdate::Seal) {
return self;
}
}
self.updates.push(LogStateUpdate::Seal);
self
}

pub fn merge(mut self, rhs: LogStateUpdates) -> Self {
for update in rhs.updates {
match update {
LogStateUpdate::ReleasePointer(release_pointer) => {
self = self.update_release_pointer(LogletOffset::from(release_pointer));
}
LogStateUpdate::TrimPoint(trim_point) => {
self = self.update_trim_point(LogletOffset::from(trim_point));
}
LogStateUpdate::Seal => {
self = self.seal();
}
}
}
self
}
}

impl LogStateUpdates {
pub fn to_bytes(&self) -> Result<Bytes, LogStoreError> {
let mut buf = BytesMut::default();
// trying to avoid buffer resizing by having plenty of space for serialization
let mut buf = BytesMut::with_capacity(std::mem::size_of::<Self>() * 2);
self.encode(&mut buf)?;
Ok(buf.freeze())
}
Expand All @@ -90,7 +129,8 @@ pub struct LogState {

impl LogState {
pub fn to_bytes(&self) -> Result<Bytes, LogStoreError> {
let mut buf = BytesMut::default();
// trying to avoid buffer resizing by having plenty of space for serialization
let mut buf = BytesMut::with_capacity(std::mem::size_of::<Self>() * 2);
StorageCodec::encode(self, &mut buf)?;
Ok(buf.freeze())
}
Expand Down Expand Up @@ -172,19 +212,21 @@ pub fn log_state_partial_merge(
warn!(key = ?key, "Merge is only supported for log-state");
return None;
}
let mut merged = LogStateUpdates::with_capacity(operands.len());
// assuming one entry per LogStateUpdate variant at most since everything can be merged.
let mut merged = LogStateUpdates::with_capacity(3);

for op in operands {
let updates = LogStateUpdates::from_slice(op);
let mut updates = match updates {
let updates = match updates {
Err(e) => {
error!(key = ?key,"Failed to decode log state updates: {}", e);
return None;
}
Ok(updates) => updates,
};

// todo (asoli): actually merge updates
merged.updates.append(&mut updates.updates);
// deduplicates all operations
merged = merged.merge(updates);
}
match merged.to_bytes() {
Ok(bytes) => Some(bytes.into()),
Expand Down

0 comments on commit 9f32462

Please sign in to comment.