From 8839da1eb6d116f456066e6c40be2905c3b8c8e3 Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Mon, 17 Jun 2024 11:49:26 -0700 Subject: [PATCH] Mat logs should not populate invalid Adds --- rust/worker/src/segment/types.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/rust/worker/src/segment/types.rs b/rust/worker/src/segment/types.rs index d562d921b12..34ba72c8c9c 100644 --- a/rust/worker/src/segment/types.rs +++ b/rust/worker/src/segment/types.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::atomic::AtomicU32; use std::sync::Arc; @@ -281,6 +281,7 @@ impl<'me> LogMaterializer<'me> { // Populate entries that are present in the record segment. let mut existing_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); let mut new_id_to_materialized: HashMap<&str, MaterializedLogRecord> = HashMap::new(); + let mut invalid_adds: HashSet<&str> = HashSet::new(); match &self.record_segment_reader { Some(reader) => { for (log_record, _) in self.logs.iter() { @@ -294,6 +295,11 @@ impl<'me> LogMaterializer<'me> { return Err(LogMaterializerError::RecordSegmentError(e)); } }; + // Ignore loading Adds. + if exists && log_record.record.operation == Operation::Add { + invalid_adds.insert(log_record.record.id.as_str()); + continue; + } if exists { match reader .get_data_and_offset_id_for_user_id(log_record.record.id.as_str()) @@ -321,11 +327,10 @@ impl<'me> LogMaterializer<'me> { for (log_record, _) in self.logs.iter() { match log_record.record.operation { Operation::Add => { - // If user is trying to insert a key that already exists in - // storage then ignore. Also if it already existed in the log - // before then also ignore. - if !existing_id_to_materialized.contains_key(log_record.record.id.as_str()) - && !new_id_to_materialized.contains_key(log_record.record.id.as_str()) + // If user is trying to insert a key that is invalid then ignore. + // Also if it already existed in the log before then ignore. + if !new_id_to_materialized.contains_key(log_record.record.id.as_str()) + && !invalid_adds.contains(log_record.record.id.as_str()) { let next_offset_id = self .offset_id