From d8a71dd7e8ab639e106cf64080a3ec523bf19ab8 Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Mon, 17 Jun 2024 17:36:00 -0700 Subject: [PATCH] [ENH] Handle metadata deletes + fix bugs related to Updates/deletes in the metadata writer (#2344) ## Description of changes *Summarize the changes made by this PR.* - Improvements & Bug fixes - Handles metadata deletes - Full text writer adds (token, freq) pair even if freq is 0. Fixes this. - Full text writer does not remove postings list of documents that have been deleted. Fixes this. - Fix for test_query_without_add ## Test plan - [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes None --- chromadb/proto/convert.py | 1 + chromadb/test/property/test_embeddings.py | 17 +- rust/worker/src/blockstore/arrow/blockfile.rs | 2 +- .../positional_posting_list_value.rs | 2 + .../operators/merge_metadata_results.rs | 7 +- .../src/execution/orchestration/hnsw.rs | 29 + rust/worker/src/index/fulltext/types.rs | 111 +++- rust/worker/src/segment/metadata_segment.rs | 542 ++++++++++-------- rust/worker/src/segment/record_segment.rs | 11 +- rust/worker/src/segment/types.rs | 244 +++++++- rust/worker/src/types/metadata.rs | 60 +- 11 files changed, 657 insertions(+), 369 deletions(-) diff --git a/chromadb/proto/convert.py b/chromadb/proto/convert.py index b52296e546c..ebc74d5ab00 100644 --- a/chromadb/proto/convert.py +++ b/chromadb/proto/convert.py @@ -189,6 +189,7 @@ def to_proto_metadata_update_value( return proto.UpdateMetadataValue(int_value=value) elif isinstance(value, float): return proto.UpdateMetadataValue(float_value=value) + # None is used to delete the metadata key. elif value is None: return proto.UpdateMetadataValue() else: diff --git a/chromadb/test/property/test_embeddings.py b/chromadb/test/property/test_embeddings.py index 3768205e14e..fa062ed394b 100644 --- a/chromadb/test/property/test_embeddings.py +++ b/chromadb/test/property/test_embeddings.py @@ -26,6 +26,7 @@ ) from collections import defaultdict import chromadb.test.property.invariants as invariants +from chromadb.test.conftest import reset import numpy as np @@ -75,7 +76,7 @@ def __init__(self, api: ServerAPI): @initialize(collection=collection_st) # type: ignore def initialize(self, collection: strategies.Collection): - self.api.reset() + reset(self.api) self.collection = self.api.create_collection( name=collection.name, metadata=collection.metadata, @@ -306,7 +307,7 @@ def test_embeddings_state(caplog: pytest.LogCaptureFixture, api: ServerAPI) -> N def test_multi_add(api: ServerAPI) -> None: - api.reset() + reset(api) coll = api.create_collection(name="foo") coll.add(ids=["a"], embeddings=[[0.0]]) assert coll.count() == 1 @@ -325,7 +326,7 @@ def test_multi_add(api: ServerAPI) -> None: def test_dup_add(api: ServerAPI) -> None: - api.reset() + reset(api) coll = api.create_collection(name="foo") with pytest.raises(errors.DuplicateIDError): coll.add(ids=["a", "a"], embeddings=[[0.0], [1.1]]) @@ -334,7 +335,7 @@ def test_dup_add(api: ServerAPI) -> None: def test_query_without_add(api: ServerAPI) -> None: - api.reset() + reset(api) coll = api.create_collection(name="foo") fields: Include = ["documents", "metadatas", "embeddings", "distances"] N = np.random.randint(1, 2000) @@ -349,7 +350,7 @@ def test_query_without_add(api: ServerAPI) -> None: def test_get_non_existent(api: ServerAPI) -> None: - api.reset() + reset(api) coll = api.create_collection(name="foo") result = coll.get(ids=["a"], include=["documents", "metadatas", "embeddings"]) assert len(result["ids"]) == 0 @@ -361,7 +362,7 @@ def test_get_non_existent(api: ServerAPI) -> None: # TODO: Use SQL escaping correctly internally @pytest.mark.xfail(reason="We don't properly escape SQL internally, causing problems") def test_escape_chars_in_ids(api: ServerAPI) -> None: - api.reset() + reset(api) id = "\x1f" coll = api.create_collection(name="foo") coll.add(ids=[id], embeddings=[[0.0]]) @@ -381,7 +382,7 @@ def test_escape_chars_in_ids(api: ServerAPI) -> None: ], ) def test_delete_empty_fails(api: ServerAPI, kwargs: dict): - api.reset() + reset(api) coll = api.create_collection(name="foo") with pytest.raises(Exception) as e: coll.delete(**kwargs) @@ -404,7 +405,7 @@ def test_delete_empty_fails(api: ServerAPI, kwargs: dict): ], ) def test_delete_success(api: ServerAPI, kwargs: dict): - api.reset() + reset(api) coll = api.create_collection(name="foo") # Should not raise coll.delete(**kwargs) diff --git a/rust/worker/src/blockstore/arrow/blockfile.rs b/rust/worker/src/blockstore/arrow/blockfile.rs index 606362c3558..04da4756047 100644 --- a/rust/worker/src/blockstore/arrow/blockfile.rs +++ b/rust/worker/src/blockstore/arrow/blockfile.rs @@ -523,7 +523,7 @@ mod tests { log::config::{self, GrpcLogConfig}, segment::DataRecord, storage::{local::LocalStorage, Storage}, - types::{update_metdata_to_metdata, MetadataValue}, + types::MetadataValue, }; use arrow::array::Int32Array; use proptest::prelude::*; diff --git a/rust/worker/src/blockstore/positional_posting_list_value.rs b/rust/worker/src/blockstore/positional_posting_list_value.rs index 55a742cdc76..5cddeb36044 100644 --- a/rust/worker/src/blockstore/positional_posting_list_value.rs +++ b/rust/worker/src/blockstore/positional_posting_list_value.rs @@ -113,6 +113,8 @@ impl PositionalPostingListBuilder { return Err(PositionalPostingListBuilderError::DocIdDoesNotExist); } + // Safe to unwrap here since this is called for >= 2nd time a token + // exists in the document. self.positions.get_mut(&doc_id).unwrap().extend(positions); Ok(()) } diff --git a/rust/worker/src/execution/operators/merge_metadata_results.rs b/rust/worker/src/execution/operators/merge_metadata_results.rs index f2268830c84..dc58d03ebad 100644 --- a/rust/worker/src/execution/operators/merge_metadata_results.rs +++ b/rust/worker/src/execution/operators/merge_metadata_results.rs @@ -6,11 +6,8 @@ use crate::{ record_segment::{RecordSegmentReader, RecordSegmentReaderCreationError}, LogMaterializer, LogMaterializerError, }, - types::{ - update_metdata_to_metdata, LogRecord, Metadata, MetadataValueConversionError, Operation, - Segment, - }, - utils::{merge_sorted_vecs_conjunction, merge_sorted_vecs_disjunction}, + types::{LogRecord, Metadata, MetadataValueConversionError, Operation, Segment}, + utils::merge_sorted_vecs_conjunction, }; use async_trait::async_trait; use std::{ diff --git a/rust/worker/src/execution/orchestration/hnsw.rs b/rust/worker/src/execution/orchestration/hnsw.rs index cd7ba55d412..33dcc1c3d01 100644 --- a/rust/worker/src/execution/orchestration/hnsw.rs +++ b/rust/worker/src/execution/orchestration/hnsw.rs @@ -429,6 +429,28 @@ impl HnswQueryOrchestrator { } } + fn terminate_with_empty_response(&mut self, ctx: &ComponentContext) { + let result_channel = self + .result_channel + .take() + .expect("Invariant violation. Result channel is not set."); + let mut empty_resp = vec![]; + for _ in 0..self.query_vectors.len() { + empty_resp.push(vec![]); + } + match result_channel.send(Ok(empty_resp)) { + Ok(_) => (), + Err(e) => { + // Log an error - this implied the listener was dropped + tracing::error!( + "[HnswQueryOrchestrator] Result channel dropped before sending empty response" + ); + } + } + // Cancel the orchestrator so it stops processing + ctx.cancellation_token.cancel(); + } + fn terminate_with_error(&mut self, error: Box, ctx: &ComponentContext) { let result_channel = self .result_channel @@ -501,6 +523,13 @@ impl Component for HnswQueryOrchestrator { } }; + // If segment is uninitialized and dimension is not set then we assume + // that this is a query before any add so return empty response. + if hnsw_segment.file_path.len() <= 0 && collection.dimension.is_none() { + self.terminate_with_empty_response(ctx); + return; + } + // Validate that the collection has a dimension set. Downstream steps will rely on this // so that they can unwrap the dimension without checking for None if collection.dimension.is_none() { diff --git a/rust/worker/src/index/fulltext/types.rs b/rust/worker/src/index/fulltext/types.rs index e9d854aa07f..0d4a624c51b 100644 --- a/rust/worker/src/index/fulltext/types.rs +++ b/rust/worker/src/index/fulltext/types.rs @@ -19,8 +19,6 @@ use super::tokenizer::ChromaTokenStream; #[derive(Error, Debug)] pub enum FullTextIndexError { - #[error("Multiple tokens found in frequencies blockfile")] - MultipleTokenFrequencies, #[error("Empty value in positional posting list")] EmptyValueInPositionalPostingList, #[error("Invariant violation")] @@ -37,9 +35,24 @@ impl ChromaError for FullTextIndexError { } } +pub(crate) struct UncommittedPostings { + // token -> {doc -> [start positions]} + positional_postings: HashMap, + // (token, doc) pairs that should be deleted from storage. + deleted_token_doc_pairs: Vec<(String, i32)>, +} + +impl UncommittedPostings { + pub(crate) fn new() -> Self { + Self { + positional_postings: HashMap::new(), + deleted_token_doc_pairs: Vec::new(), + } + } +} + #[derive(Clone)] pub(crate) struct FullTextIndexWriter<'me> { - // We use this to implement updates which require read-then-write semantics. full_text_index_reader: Option>, posting_lists_blockfile_writer: BlockfileWriter, frequencies_blockfile_writer: BlockfileWriter, @@ -49,11 +62,12 @@ pub(crate) struct FullTextIndexWriter<'me> { // a lightweight lock instead. This is needed currently to // keep holding the lock across an await point. // term -> positional posting list builder for that term - uncommitted: Arc>>, + uncommitted_postings: Arc>, // TODO(Sanket): Move off this tokio::sync::mutex and use // a lightweight lock instead. This is needed currently to // keep holding the lock across an await point. - // Value of this map is a tuple because we also need to keep the old frequency + // Value of this map is a tuple (old freq and new freq) + // because we also need to keep the old frequency // around. The reason is (token, freq) is the key in the blockfile hence // when freq changes, we need to delete the old (token, freq) key. uncommitted_frequencies: Arc>>, @@ -71,7 +85,7 @@ impl<'me> FullTextIndexWriter<'me> { posting_lists_blockfile_writer, frequencies_blockfile_writer, tokenizer: Arc::new(Mutex::new(tokenizer)), - uncommitted: Arc::new(tokio::sync::Mutex::new(HashMap::new())), + uncommitted_postings: Arc::new(tokio::sync::Mutex::new(UncommittedPostings::new())), uncommitted_frequencies: Arc::new(tokio::sync::Mutex::new(HashMap::new())), } } @@ -85,9 +99,12 @@ impl<'me> FullTextIndexWriter<'me> { Some(_) => return Ok(()), None => { let frequency = match &self.full_text_index_reader { + // Readers are uninitialized until the first compaction finishes + // so there is a case when this is none hence not an error. None => 0, Some(reader) => match reader.get_frequencies_for_token(token).await { Ok(frequency) => frequency, + // New token so start with frequency of 0. Err(_) => 0, }, }; @@ -95,8 +112,8 @@ impl<'me> FullTextIndexWriter<'me> { .insert(token.to_string(), (frequency as i32, frequency as i32)); } } - let mut uncommitted = self.uncommitted.lock().await; - match uncommitted.get(token) { + let mut uncommitted_postings = self.uncommitted_postings.lock().await; + match uncommitted_postings.positional_postings.get(token) { Some(_) => { // This should never happen -- if uncommitted has the token, then // uncommitted_frequencies should have had it as well. @@ -108,9 +125,12 @@ impl<'me> FullTextIndexWriter<'me> { None => { let mut builder = PositionalPostingListBuilder::new(); let results = match &self.full_text_index_reader { + // Readers are uninitialized until the first compaction finishes + // so there is a case when this is none hence not an error. None => vec![], Some(reader) => match reader.get_all_results_for_token(token).await { Ok(results) => results, + // New token so start with empty postings list. Err(_) => vec![], }, }; @@ -123,7 +143,9 @@ impl<'me> FullTextIndexWriter<'me> { } } } - uncommitted.insert(token.to_string(), builder); + uncommitted_postings + .positional_postings + .insert(token.to_string(), builder); } } Ok(()) @@ -145,11 +167,16 @@ impl<'me> FullTextIndexWriter<'me> { self.populate_frequencies_and_posting_lists_from_previous_version(token.text.as_str()) .await?; let mut uncommitted_frequencies = self.uncommitted_frequencies.lock().await; + // The entry should always exist because self.populate_frequencies_and_posting_lists_from_previous_version + // will have created it if this token is new to the system. uncommitted_frequencies .entry(token.text.to_string()) .and_modify(|e| (*e).0 += 1); - let mut uncommitted = self.uncommitted.lock().await; - let builder = uncommitted + let mut uncommitted_postings = self.uncommitted_postings.lock().await; + // For a new token, the uncommitted list will not contain any entry so insert + // an empty builder in that case. + let builder = uncommitted_postings + .positional_postings .entry(token.text.to_string()) .or_insert(PositionalPostingListBuilder::new()); @@ -198,10 +225,19 @@ impl<'me> FullTextIndexWriter<'me> { return Err(FullTextIndexError::InvariantViolation); } } - let mut uncommitted = self.uncommitted.lock().await; - match uncommitted.get_mut(token.text.as_str()) { + let mut uncommitted_postings = self.uncommitted_postings.lock().await; + match uncommitted_postings + .positional_postings + .get_mut(token.text.as_str()) + { Some(builder) => match builder.delete_doc_id(offset_id as i32) { - Ok(_) => {} + Ok(_) => { + // Track all the deleted (token, doc) pairs. This is needed + // to remove the old postings list for this pair from storage. + uncommitted_postings + .deleted_token_doc_pairs + .push((token.text.clone(), offset_id as i32)); + } Err(e) => { // This is a fatal invariant violation: we've been asked to // delete a document which doesn't appear in the positional posting list. @@ -234,10 +270,24 @@ impl<'me> FullTextIndexWriter<'me> { Ok(()) } - // TODO(Sanket): Handle document and metadata deletes. pub async fn write_to_blockfiles(&mut self) -> Result<(), FullTextIndexError> { - let mut uncommitted = self.uncommitted.lock().await; - for (key, mut value) in uncommitted.drain() { + let mut uncommitted_postings = self.uncommitted_postings.lock().await; + // Delete (token, doc) pairs from blockfile first. Note that the ordering is + // important here i.e. we need to delete before inserting the new postings + // list otherwise we could incorrectly delete posting lists that shouldn't be deleted. + for (token, offset_id) in uncommitted_postings.deleted_token_doc_pairs.drain(..) { + match self + .posting_lists_blockfile_writer + .delete::(token.as_str(), offset_id as u32) + .await + { + Ok(_) => {} + Err(e) => { + return Err(FullTextIndexError::BlockfileWriteError(e)); + } + } + } + for (key, mut value) in uncommitted_postings.positional_postings.drain() { let built_list = value.build(); for doc_id in built_list.doc_ids.iter() { match doc_id { @@ -275,15 +325,19 @@ impl<'me> FullTextIndexWriter<'me> { } } // Insert the new frequency. + // Add only if the frequency is not zero. This can happen in case of document + // deletes. // TODO we just have token -> frequency here. Should frequency be the key or should we use an empty key and make it the value? - match self - .frequencies_blockfile_writer - .set(key.as_str(), value.0 as u32, 0) - .await - { - Ok(_) => {} - Err(e) => { - return Err(FullTextIndexError::BlockfileWriteError(e)); + if value.0 > 0 { + match self + .frequencies_blockfile_writer + .set(key.as_str(), value.0 as u32, 0) + .await + { + Ok(_) => {} + Err(e) => { + return Err(FullTextIndexError::BlockfileWriteError(e)); + } } } } @@ -385,9 +439,12 @@ impl<'me> FullTextIndexReader<'me> { return Ok(vec![]); } if res.len() > 1 { - return Err(FullTextIndexError::MultipleTokenFrequencies); + panic!("Invariant violation. Multiple frequency values found for a token."); } let res = res[0]; + if res.1 <= 0 { + panic!("Invariant violation. Zero frequency token found."); + } // Throw away the "value" since we store frequencies in the keys. token_frequencies.push((token.text.to_string(), res.1)); } @@ -509,7 +566,7 @@ impl<'me> FullTextIndexReader<'me> { return Ok(0); } if res.len() > 1 { - return Err(FullTextIndexError::MultipleTokenFrequencies); + panic!("Invariant violation. Multiple frequency values found for a token."); } Ok(res[0].1) } diff --git a/rust/worker/src/segment/metadata_segment.rs b/rust/worker/src/segment/metadata_segment.rs index ce3c0c0be69..63c01f51c05 100644 --- a/rust/worker/src/segment/metadata_segment.rs +++ b/rust/worker/src/segment/metadata_segment.rs @@ -434,6 +434,8 @@ impl<'log_records> SegmentWriter<'log_records> for MetadataSegmentWriter<'_> { let segment_offset_id = record.0.offset_id; match record.0.final_operation { Operation::Add => { + // We can ignore record.0.metadata_to_be_deleted + // for fresh adds. TODO on whether to propagate error. match &record.0.metadata_to_be_merged { Some(metadata) => { for (key, value) in metadata.iter() { @@ -584,282 +586,318 @@ impl<'log_records> SegmentWriter<'log_records> for MetadataSegmentWriter<'_> { None => {} }, Operation::Update => { - let old_metadata = match &record.0.data_record { - Some(data_record) => match &data_record.metadata { - Some(metadata) => Some(metadata), - None => None, - }, - None => None, - }; - let metadata_updates = match &record.0.metadata_to_be_merged { - Some(metadata) => Some(metadata), - None => None, - }; - match metadata_updates { - None => {} - Some(metadata_updates) => { - for (key, value) in metadata_updates.iter() { - match value { - MetadataValue::Str(value) => { - match &self.string_metadata_index_writer { - Some(writer) => { - match old_metadata { - // TODO this doesn't handle deletes - Some(old_metadata) => match old_metadata - .get(key) - { - Some(old_value) => match old_value { - MetadataValue::Str(old_value) => { - match writer - .update( - key, - old_value.as_str().into(), - value.as_str().into(), - segment_offset_id, - ) - .await - { - Ok(_) => {} - Err(e) => { - tracing::error!( - "Error updating string metadata index: {}", - e - ) - } - } - } - _ => { - tracing::error!( - "Invariant violation: previous value is not a string" - ); - } - }, - None => { - match writer - .set( - key, - value.as_str(), - segment_offset_id, - ) - .await - { - Ok(_) => {} - Err(e) => { - tracing::error!( - "Error setting string metadata index: {}", - e - ) - } - } - } - }, - None => {} - }; + let metadata_delta = record.0.metadata_delta(); + // Updates. + for (update_key, (old_value, new_value)) in metadata_delta.metadata_to_update { + match new_value { + MetadataValue::Str(new_val_str) => match old_value { + MetadataValue::Str(old_val_str) => { + match &self.string_metadata_index_writer { + Some(writer) => { + match writer + .update( + update_key, + old_val_str.as_str().into(), + new_val_str.as_str().into(), + segment_offset_id, + ) + .await + { + Ok(()) => {} + Err(e) => { + return Err(ApplyMaterializedLogError::BlockfileUpdateError); + } } - None => { - tracing::error!( - "No writer found for string metadata index" + } + None => { + panic!("Invariant violation. String metadata index writer should be set"); + } + } + } + _ => { + return Err(ApplyMaterializedLogError::MetadataUpdateNotValid); + } + }, + MetadataValue::Float(new_val_float) => match old_value { + MetadataValue::Float(old_val_float) => { + match &self.f32_metadata_index_writer { + Some(writer) => { + match writer + .update( + update_key, + (*old_val_float as f32).into(), + (*new_val_float as f32).into(), + segment_offset_id, + ) + .await + { + Ok(()) => {} + Err(e) => { + return Err(ApplyMaterializedLogError::BlockfileUpdateError); + } + } + } + None => { + panic!("Invariant violation. Float metadata index writer should be set"); + } + } + } + _ => { + return Err(ApplyMaterializedLogError::MetadataUpdateNotValid); + } + }, + MetadataValue::Int(new_val_int) => match old_value { + MetadataValue::Int(old_val_int) => { + match &self.u32_metadata_index_writer { + Some(writer) => { + match writer + .update( + update_key, + (*old_val_int as u32).into(), + (*new_val_int as u32).into(), + segment_offset_id, + ) + .await + { + Ok(()) => {} + Err(e) => { + return Err(ApplyMaterializedLogError::BlockfileUpdateError); + } + } + } + None => { + panic!("Invariant violation. u32 metadata index writer should be set"); + } + } + } + _ => { + return Err(ApplyMaterializedLogError::MetadataUpdateNotValid); + } + }, + MetadataValue::Bool(new_val_bool) => match old_value { + MetadataValue::Bool(old_val_bool) => { + match &self.bool_metadata_index_writer { + Some(writer) => { + match writer + .update( + update_key, + (*old_val_bool).into(), + (*new_val_bool).into(), + segment_offset_id, + ) + .await + { + Ok(()) => {} + Err(e) => { + return Err(ApplyMaterializedLogError::BlockfileUpdateError); + } + } + } + None => { + panic!("Invariant violation. Bool metadata index writer should be set"); + } + } + } + _ => { + return Err(ApplyMaterializedLogError::MetadataUpdateNotValid); + } + }, + } + } + // Inserts. + for (insert_key, new_value) in metadata_delta.metadata_to_insert { + match new_value { + MetadataValue::Str(new_val_str) => { + match &self.string_metadata_index_writer { + Some(writer) => { + match writer + .set( + insert_key, + new_val_str.as_str(), + segment_offset_id, + ) + .await + { + Ok(()) => {} + Err(e) => { + return Err( + ApplyMaterializedLogError::BlockfileSetError, ); } } } - MetadataValue::Float(value) => { - match &self.f32_metadata_index_writer { - Some(writer) => { - match old_metadata { - // TODO this doesn't handle deletes - Some(old_metadata) => { - match old_metadata.get(key) { - Some(old_value) => match old_value { - MetadataValue::Float(old_value) => { - match writer - .update( - key, - (*old_value as f32) - .into(), - (*value as f32).into(), - segment_offset_id, - ) - .await - { - Ok(_) => {} - Err(e) => { - tracing::error!( - "Error updating f32 metadata index: {}", - e - ) - } - } - } - _ => { - tracing::error!( - "Invariant violation: previous value is not a float" - ); - } - }, - None => { - match writer - .set( - key, - *value as f32, - segment_offset_id, - ) - .await - { - Ok(_) => {} - Err(e) => { - tracing::error!( - "Error setting f32 metadata index: {}", - e - ) - } - } - } - } - } - None => {} - }; + None => { + panic!("Invariant violation. String metadata index writer should be set"); + } + } + } + MetadataValue::Float(new_val_float) => { + match &self.f32_metadata_index_writer { + Some(writer) => { + match writer + .set( + insert_key, + *new_val_float as f32, + segment_offset_id, + ) + .await + { + Ok(()) => {} + Err(e) => { + return Err( + ApplyMaterializedLogError::BlockfileSetError, + ); } - None => { - tracing::error!( - "No writer found for f32 metadata index" + } + } + None => { + panic!("Invariant violation. Float metadata index writer should be set"); + } + } + } + MetadataValue::Int(new_val_int) => { + match &self.u32_metadata_index_writer { + Some(writer) => { + match writer + .set(insert_key, *new_val_int as u32, segment_offset_id) + .await + { + Ok(()) => {} + Err(e) => { + return Err( + ApplyMaterializedLogError::BlockfileSetError, ); } } } - MetadataValue::Bool(value) => { - match &self.bool_metadata_index_writer { - Some(writer) => { - match old_metadata { - // TODO this doesn't handle deletes - Some(old_metadata) => { - match old_metadata.get(key) { - Some(old_value) => match old_value { - MetadataValue::Bool(old_value) => { - match writer - .update( - key, - (*old_value).into(), - (*value).into(), - segment_offset_id, - ) - .await - { - Ok(_) => {} - Err(e) => { - tracing::error!( - "Error updating bool metadata index: {}", - e - ) - } - } - } - _ => { - tracing::error!( - "Invariant violation: previous value is not a bool" - ); - } - }, - None => { - match writer - .set( - key, - *value, - segment_offset_id, - ) - .await - { - Ok(_) => {} - Err(e) => { - tracing::error!( - "Error setting bool metadata index: {}", - e - ) - } - } - } - } - } - None => {} - }; + None => { + panic!("Invariant violation. Int metadata index writer should be set"); + } + } + } + MetadataValue::Bool(new_val_bool) => { + match &self.bool_metadata_index_writer { + Some(writer) => { + match writer + .set(insert_key, *new_val_bool, segment_offset_id) + .await + { + Ok(()) => {} + Err(e) => { + return Err( + ApplyMaterializedLogError::BlockfileSetError, + ); } - None => { - tracing::error!( - "No writer found for bool metadata index" + } + } + None => { + panic!("Invariant violation. Bool metadata index writer should be set"); + } + } + } + } + } + // Deletes. + for (delete_key, old_value) in metadata_delta.metadata_to_delete { + match old_value { + MetadataValue::Str(old_val_str) => { + match &self.string_metadata_index_writer { + Some(writer) => { + match writer + .delete( + delete_key, + old_val_str.as_str(), + segment_offset_id, + ) + .await + { + Ok(()) => {} + Err(e) => { + return Err( + ApplyMaterializedLogError::BlockfileDeleteError, ); } } } - MetadataValue::Int(value) => { - match &self.u32_metadata_index_writer { - Some(writer) => { - match old_metadata { - // TODO this doesn't handle deletes - Some(old_metadata) => { - match old_metadata.get(key) { - Some(old_value) => match old_value { - MetadataValue::Int(old_value) => { - match writer - .update( - key, - (*old_value as u32) - .into(), - (*value as u32).into(), - segment_offset_id, - ) - .await - { - Ok(_) => {} - Err(e) => { - tracing::error!( - "Error updating u32 metadata index: {}", - e - ) - } - } - } - _ => { - tracing::error!( - "Invariant violation: previous value is not an int" - ); - } - }, - None => { - match writer - .set( - key, - *value as u32, - segment_offset_id, - ) - .await - { - Ok(_) => {} - Err(e) => { - tracing::error!( - "Error setting u32 metadata index: {}", - e - ) - } - } - } - } - } - None => {} - }; + None => { + panic!("Invariant violation. String metadata index writer should be set"); + } + } + } + MetadataValue::Float(old_val_float) => { + match &self.f32_metadata_index_writer { + Some(writer) => { + match writer + .delete( + delete_key, + *old_val_float as f32, + segment_offset_id, + ) + .await + { + Ok(()) => {} + Err(e) => { + return Err( + ApplyMaterializedLogError::BlockfileDeleteError, + ); + } + } + } + None => { + panic!("Invariant violation. Float metadata index writer should be set"); + } + } + } + MetadataValue::Int(old_val_int) => { + match &self.u32_metadata_index_writer { + Some(writer) => { + match writer + .delete( + delete_key, + *old_val_int as u32, + segment_offset_id, + ) + .await + { + Ok(()) => {} + Err(e) => { + return Err( + ApplyMaterializedLogError::BlockfileDeleteError, + ); } - None => { - tracing::error!( - "No writer found for u32 metadata index" + } + } + None => { + panic!("Invariant violation. Int metadata index writer should be set"); + } + } + } + MetadataValue::Bool(old_val_bool) => { + match &self.bool_metadata_index_writer { + Some(writer) => { + match writer + .set(delete_key, *old_val_bool, segment_offset_id) + .await + { + Ok(()) => {} + Err(e) => { + return Err( + ApplyMaterializedLogError::BlockfileDeleteError, ); } } } + None => { + panic!("Invariant violation. Bool metadata index writer should be set"); + } } } } - }; + } + } + Operation::Upsert => { + panic!("Invariant violation. There should be no upserts in materialized log"); } - _ => todo!(), } } Ok(()) diff --git a/rust/worker/src/segment/record_segment.rs b/rust/worker/src/segment/record_segment.rs index 4b0b6554f20..3ce1421659b 100644 --- a/rust/worker/src/segment/record_segment.rs +++ b/rust/worker/src/segment/record_segment.rs @@ -4,10 +4,7 @@ use crate::blockstore::provider::{BlockfileProvider, CreateError, OpenError}; use crate::blockstore::{BlockfileFlusher, BlockfileReader, BlockfileWriter}; use crate::errors::{ChromaError, ErrorCodes}; use crate::execution::data::data_chunk::Chunk; -use crate::types::{ - merge_update_metadata, update_metdata_to_metdata, LogRecord, Metadata, MetadataValue, - Operation, Segment, SegmentType, -}; +use crate::types::{Operation, Segment, SegmentType}; use async_trait::async_trait; use std::collections::HashMap; use std::fmt::{self, Debug, Formatter}; @@ -294,8 +291,12 @@ pub enum ApplyMaterializedLogError { BlockfileSetError, #[error("Error deleting from blockfile")] BlockfileDeleteError, + #[error("Error updating blockfile")] + BlockfileUpdateError, #[error("Embedding not set in the user write")] EmbeddingNotSet, + #[error("Metadata update not valid")] + MetadataUpdateNotValid, } impl ChromaError for ApplyMaterializedLogError { @@ -303,6 +304,8 @@ impl ChromaError for ApplyMaterializedLogError { match self { ApplyMaterializedLogError::BlockfileSetError => ErrorCodes::Internal, ApplyMaterializedLogError::BlockfileDeleteError => ErrorCodes::Internal, + ApplyMaterializedLogError::BlockfileUpdateError => ErrorCodes::Internal, + ApplyMaterializedLogError::MetadataUpdateNotValid => ErrorCodes::Internal, ApplyMaterializedLogError::EmbeddingNotSet => ErrorCodes::InvalidArgument, } } diff --git a/rust/worker/src/segment/types.rs b/rust/worker/src/segment/types.rs index c5af9dcff43..2ccb07c2610 100644 --- a/rust/worker/src/segment/types.rs +++ b/rust/worker/src/segment/types.rs @@ -1,24 +1,108 @@ -use std::collections::{HashMap, HashSet}; -use std::sync::atomic::AtomicU32; -use std::sync::Arc; - -use crate::blockstore::key::KeyWrapper; use crate::errors::{ChromaError, ErrorCodes}; use crate::execution::data::data_chunk::Chunk; -use crate::index::metadata::types::MetadataIndexError; use crate::types::{ - merge_update_metadata, update_metdata_to_metdata, BooleanOperator, LogRecord, Metadata, - MetadataType, MetadataValue, MetadataValueConversionError, Operation, OperationRecord, Where, - WhereClauseComparator, WhereComparison, + DeletedMetadata, LogRecord, Metadata, MetadataDelta, MetadataValue, + MetadataValueConversionError, Operation, OperationRecord, UpdateMetadata, UpdateMetadataValue, }; -use crate::utils::{merge_sorted_vecs_conjunction, merge_sorted_vecs_disjunction}; use async_trait::async_trait; -use futures::future::BoxFuture; -use roaring::RoaringBitmap; +use std::collections::{HashMap, HashSet}; +use std::sync::atomic::AtomicU32; +use std::sync::Arc; use thiserror::Error; use super::record_segment::{ApplyMaterializedLogError, RecordSegmentReader}; +// Materializes metadata from update metadata, populating the delete list +// and upsert list. +pub(crate) fn materialize_update_metadata( + update_metdata: &UpdateMetadata, +) -> Result<(Metadata, DeletedMetadata), MetadataValueConversionError> { + let mut metadata = Metadata::new(); + let mut deleted_metadata = DeletedMetadata::new(); + for (key, value) in update_metdata { + match value { + UpdateMetadataValue::None => { + deleted_metadata.insert(key.clone()); + continue; + } + _ => {} + } + // Should be a valid conversion for not None values. + let res = value.try_into(); + match res { + Ok(value) => { + metadata.insert(key.clone(), value); + } + Err(err) => { + return Err(err); + } + } + } + Ok((metadata, deleted_metadata)) +} + +// Merges update metadata to base metadata, updating +// the delete list and upsert list. +pub(crate) fn merge_update_metadata( + base_metadata: (&Option, &Option), + update_metadata: &Option, +) -> Result<(Option, Option), MetadataValueConversionError> { + let mut merged_metadata = HashMap::new(); + let mut deleted_metadata = DeletedMetadata::new(); + match base_metadata.0 { + Some(base_mt) => { + merged_metadata = base_mt.clone(); + } + None => (), + } + match base_metadata.1 { + Some(deleted_mt) => { + deleted_metadata = deleted_mt.clone(); + } + None => (), + } + match update_metadata { + Some(update_metadata) => { + match materialize_update_metadata(update_metadata) { + Ok((metadata, deleted_mt)) => { + // Overwrite with new kv. + for (key, value) in metadata { + merged_metadata.insert(key.clone(), value); + // Also remove from deleted list. This is important + // because it can happen that the user deleted and then + // reinserted the key. + deleted_metadata.remove(&key); + } + // apply the deletes. + for key in deleted_mt { + deleted_metadata.insert(key.clone()); + // Again important to remove from this map since the user + // could have previously update the key (and is now deleting it). + merged_metadata.remove(&key); + } + } + Err(e) => { + return Err(e); + } + }; + } + None => (), + } + let mut final_mt; + if merged_metadata.is_empty() { + final_mt = None; + } else { + final_mt = Some(merged_metadata); + } + let mut final_deleted; + if deleted_metadata.is_empty() { + final_deleted = None; + } else { + final_deleted = Some(deleted_metadata); + } + Ok((final_mt, final_deleted)) +} + #[derive(Error, Debug)] pub enum LogMaterializerError { #[error("Error materializing document metadata {0}")] @@ -67,9 +151,12 @@ pub(crate) struct MaterializedLogRecord<'referred_data> { // This is the metadata obtained by combining all the operations // present in the log for this id. // E.g. if has log has [Insert(a: h), Update(a: b, c: d), Update(a: e, f: g)] then this - // will contain (a: e, c: d, f: g). This is None if the final operation - // above is Delete. + // will contain (a: e, c: d, f: g). pub(crate) metadata_to_be_merged: Option, + // Keys from the metadata that the user wants to delete. This is guaranteed + // to be disjoint from metadata_to_be_merged i.e. there won't be keys + // present in both the places. + pub(crate) metadata_to_be_deleted: Option>, // This is the final document obtained from the last non null operation. // E.g. if log has [Insert(str0), Update(str1), Update(str2), Update()] then this will contain // str2. None if final operation is Delete. @@ -151,9 +238,70 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> { } None => {} } + match self.metadata_to_be_deleted.as_ref() { + Some(metadata) => { + for key in metadata { + final_metadata.remove(key); + } + } + None => {} + } final_metadata } + pub(crate) fn metadata_delta(&'referred_data self) -> MetadataDelta<'referred_data> { + let mut metadata_delta = MetadataDelta::new(); + let mut base_metadata: HashMap<&str, &MetadataValue> = HashMap::new(); + match &self.data_record { + Some(data_record) => match &data_record.metadata { + Some(meta) => { + for (meta_key, meta_val) in meta { + base_metadata.insert(meta_key, meta_val); + } + } + None => (), + }, + None => (), + }; + // Populate updates. + match &self.metadata_to_be_merged { + Some(meta) => { + for (meta_key, meta_val) in meta { + match base_metadata.get(meta_key.as_str()) { + Some(old_value) => { + metadata_delta + .metadata_to_update + .insert(meta_key.as_str(), (old_value, meta_val)); + } + None => { + metadata_delta + .metadata_to_insert + .insert(meta_key.as_str(), meta_val); + } + } + } + } + None => (), + }; + // Populate deletes. + match &self.metadata_to_be_deleted { + Some(meta) => { + for key in meta { + match base_metadata.get(key.as_str()) { + Some(old_value) => { + metadata_delta + .metadata_to_delete + .insert(key.as_str(), old_value); + } + None => {} + } + } + } + None => (), + } + metadata_delta + } + // Returns references to metadata present in the materialized log record. pub(crate) fn merged_metadata_ref(&self) -> HashMap<&str, &MetadataValue> { let mut final_metadata: HashMap<&str, &MetadataValue> = HashMap::new(); @@ -176,6 +324,15 @@ impl<'referred_data> MaterializedLogRecord<'referred_data> { } None => (), }; + // Remove the deleted metadatas. + match &self.metadata_to_be_deleted { + Some(meta) => { + for key in meta { + final_metadata.remove(key.as_str()); + } + } + None => (), + } final_metadata } @@ -202,6 +359,7 @@ impl<'referred_data> From<(DataRecord<'referred_data>, u32)> user_id: None, final_operation: Operation::Add, metadata_to_be_merged: None, + metadata_to_be_deleted: None, final_document: None, final_embedding: None, } @@ -222,14 +380,22 @@ impl<'referred_data> TryFrom<(&'referred_data OperationRecord, u32, &'referred_d let log_record = log_operation_info.0; let offset_id = log_operation_info.1; let user_id = log_operation_info.2; - let metadata = match &log_record.metadata { - Some(metadata) => match update_metdata_to_metdata(metadata) { - Ok(m) => Some(m), + let merged_metadata; + let deleted_metadata; + match &log_record.metadata { + Some(metadata) => match materialize_update_metadata(metadata) { + Ok(m) => { + merged_metadata = Some(m.0); + deleted_metadata = Some(m.1); + } Err(e) => { return Err(LogMaterializerError::MetadataMaterializationError(e)); } }, - None => None, + None => { + merged_metadata = None; + deleted_metadata = None; + } }; let document = match &log_record.document { @@ -249,7 +415,8 @@ impl<'referred_data> TryFrom<(&'referred_data OperationRecord, u32, &'referred_d offset_id, user_id: Some(user_id), final_operation: Operation::Add, - metadata_to_be_merged: metadata, + metadata_to_be_merged: merged_metadata, + metadata_to_be_deleted: deleted_metadata, final_document: document, final_embedding: embedding, }) @@ -376,7 +543,7 @@ impl<'me> LogMaterializer<'me> { // If the delete is for a record that is currently not in the // record segment, then we can just NOT process these records // at all. On the other hand if it is for a record that is currently - // in the record segment then we'll have to pass it as a delete + // in segment then we'll have to pass it as a delete // to the compactor so that it can be deleted. if new_id_to_materialized.contains_key(log_record.record.id.as_str()) { new_id_to_materialized.remove(log_record.record.id.as_str()); @@ -393,6 +560,7 @@ impl<'me> LogMaterializer<'me> { record_from_map.final_document = None; record_from_map.final_embedding = None; record_from_map.metadata_to_be_merged = None; + record_from_map.metadata_to_be_deleted = None; record_from_map.user_id = None; } } @@ -415,11 +583,17 @@ impl<'me> LogMaterializer<'me> { }, }; - record_from_map.metadata_to_be_merged = match merge_update_metadata( - &record_from_map.metadata_to_be_merged, + match merge_update_metadata( + ( + &record_from_map.metadata_to_be_merged, + &record_from_map.metadata_to_be_deleted, + ), &log_record.record.metadata, ) { - Ok(meta) => meta, + Ok(meta) => { + record_from_map.metadata_to_be_merged = meta.0; + record_from_map.metadata_to_be_deleted = meta.1; + } Err(e) => { return Err(LogMaterializerError::MetadataMaterializationError(e)); } @@ -444,11 +618,17 @@ impl<'me> LogMaterializer<'me> { let record_from_map = existing_id_to_materialized .get_mut(log_record.record.id.as_str()) .unwrap(); - record_from_map.metadata_to_be_merged = match merge_update_metadata( - &record_from_map.metadata_to_be_merged, + match merge_update_metadata( + ( + &record_from_map.metadata_to_be_merged, + &record_from_map.metadata_to_be_deleted, + ), &log_record.record.metadata, ) { - Ok(meta) => meta, + Ok(meta) => { + record_from_map.metadata_to_be_merged = meta.0; + record_from_map.metadata_to_be_deleted = meta.1; + } Err(e) => { return Err(LogMaterializerError::MetadataMaterializationError(e)); } @@ -469,11 +649,17 @@ impl<'me> LogMaterializer<'me> { let record_from_map = new_id_to_materialized .get_mut(log_record.record.id.as_str()) .unwrap(); - record_from_map.metadata_to_be_merged = match merge_update_metadata( - &record_from_map.metadata_to_be_merged, + match merge_update_metadata( + ( + &record_from_map.metadata_to_be_merged, + &record_from_map.metadata_to_be_deleted, + ), &log_record.record.metadata, ) { - Ok(meta) => meta, + Ok(meta) => { + record_from_map.metadata_to_be_merged = meta.0; + record_from_map.metadata_to_be_deleted = meta.1; + } Err(e) => { return Err(LogMaterializerError::MetadataMaterializationError(e)); } diff --git a/rust/worker/src/types/metadata.rs b/rust/worker/src/types/metadata.rs index a0407d34b1b..43f471b20fd 100644 --- a/rust/worker/src/types/metadata.rs +++ b/rust/worker/src/types/metadata.rs @@ -2,7 +2,7 @@ use crate::{ chroma_proto, errors::{ChromaError, ErrorCodes}, }; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use thiserror::Error; #[derive(Clone, Debug, PartialEq)] @@ -271,6 +271,7 @@ Metadata */ pub(crate) type Metadata = HashMap; +pub(crate) type DeletedMetadata = HashSet; impl TryFrom for Metadata { type Error = MetadataValueConversionError; @@ -289,22 +290,23 @@ impl TryFrom for Metadata { } } -pub(crate) fn update_metdata_to_metdata( - update_metdata: &UpdateMetadata, -) -> Result { - let mut metadata = Metadata::new(); - for (key, value) in update_metdata { - let res = value.try_into(); - match res { - Ok(value) => { - metadata.insert(key.clone(), value); - } - Err(err) => { - return Err(err); - } +pub(crate) struct MetadataDelta<'referred_data> { + pub(crate) metadata_to_update: HashMap< + &'referred_data str, + (&'referred_data MetadataValue, &'referred_data MetadataValue), + >, + pub(crate) metadata_to_delete: HashMap<&'referred_data str, &'referred_data MetadataValue>, + pub(crate) metadata_to_insert: HashMap<&'referred_data str, &'referred_data MetadataValue>, +} + +impl<'referred_data> MetadataDelta<'referred_data> { + pub(crate) fn new() -> Self { + Self { + metadata_to_update: HashMap::new(), + metadata_to_delete: HashMap::new(), + metadata_to_insert: HashMap::new(), } } - Ok(metadata) } /* @@ -707,34 +709,6 @@ impl TryFrom for WhereDocumentOperator { } } -pub(crate) fn merge_update_metadata( - base_metadata: &Option, - update_metadata: &Option, -) -> Result, MetadataValueConversionError> { - let mut merged_metadata = HashMap::new(); - if base_metadata.is_some() { - for (key, value) in base_metadata.as_ref().unwrap() { - merged_metadata.insert(key.clone(), value.clone()); - } - } - if update_metadata.is_some() { - match update_metdata_to_metdata(update_metadata.as_ref().unwrap()) { - Ok(metadata) => { - for (key, value) in metadata { - merged_metadata.insert(key, value); - } - } - Err(e) => { - return Err(e); - } - }; - } - if merged_metadata.is_empty() { - return Ok(None); - } - Ok(Some(merged_metadata)) -} - #[cfg(test)] mod tests { use super::*;