Skip to content

Commit

Permalink
Mat logs should not populate invalid Adds
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Jun 17, 2024
1 parent ef75293 commit 506bae5
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions rust/worker/src/segment/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU32;
use std::sync::Arc;

Expand Down Expand Up @@ -281,6 +281,7 @@ impl<'me> LogMaterializer<'me> {
// Populate entries that are present in the record segment.
let mut existing_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new();
let mut new_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new();
let mut invalid_adds: HashSet<&str> = HashSet::new();
match &self.record_segment_reader {
Some(reader) => {
for (log_record, _) in self.logs.iter() {
Expand All @@ -294,6 +295,11 @@ impl<'me> LogMaterializer<'me> {
return Err(LogMaterializerError::RecordSegmentError(e));
}
};
// Ignore loading Adds.
if exists && log_record.record.operation == Operation::Add {
invalid_adds.insert(log_record.record.id.as_str());
continue;
}
if exists {
match reader
.get_data_and_offset_id_for_user_id(log_record.record.id.as_str())
Expand Down Expand Up @@ -321,11 +327,10 @@ impl<'me> LogMaterializer<'me> {
for (log_record, _) in self.logs.iter() {
match log_record.record.operation {
Operation::Add => {
// If user is trying to insert a key that already exists in
// storage then ignore. Also if it already existed in the log
// before then also ignore.
if !existing_id_to_materialized.contains_key(log_record.record.id.as_str())
&& !new_id_to_materialized.contains_key(log_record.record.id.as_str())
// If user is trying to insert a key that is invalid then ignore.
// Also if it already existed in the log before then ignore.
if !new_id_to_materialized.contains_key(log_record.record.id.as_str())
&& !invalid_adds.contains(log_record.record.id.as_str())
{
let next_offset_id = self
.offset_id
Expand Down

0 comments on commit 506bae5

Please sign in to comment.