Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Add rust metadata gprc reader #2075

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions chromadb/segment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ class SegmentType(Enum):
HNSW_LOCAL_MEMORY = "urn:chroma:segment/vector/hnsw-local-memory"
HNSW_LOCAL_PERSISTED = "urn:chroma:segment/vector/hnsw-local-persisted"
HNSW_DISTRIBUTED = "urn:chroma:segment/vector/hnsw-distributed"
# TODO: rename record to blockfile record
RECORD = "urn:chroma:segment/record"
BLOCKFILE_METADATA = "urn:chroma:segment/metadata/blockfile"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is urn? i'm guessing this is just some common context in the rust codebase that i'm missing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use https://en.wikipedia.org/wiki/Uniform_Resource_Name for the name of the segment types 🤷



class SegmentImplementation(Component):
Expand Down
9 changes: 5 additions & 4 deletions chromadb/segment/impl/manager/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
SEGMENT_TYPE_IMPLS = {
SegmentType.SQLITE: "chromadb.segment.impl.metadata.sqlite.SqliteMetadataSegment",
SegmentType.HNSW_DISTRIBUTED: "chromadb.segment.impl.vector.grpc_segment.GrpcVectorSegment",
SegmentType.BLOCKFILE_METADATA: "chromadb.segment.impl.metadata.grpc_segment.GrpcMetadataSegment",
}


Expand Down Expand Up @@ -64,11 +65,11 @@ def create_segments(self, collection: Collection) -> Sequence[Segment]:
vector_segment = _segment(
SegmentType.HNSW_DISTRIBUTED, SegmentScope.VECTOR, collection
)
# metadata_segment = _segment(
# SegmentType.SQLITE, SegmentScope.METADATA, collection
# )
metadata_segment = _segment(
SegmentType.BLOCKFILE_METADATA, SegmentScope.METADATA, collection
)
record_segment = _segment(SegmentType.RECORD, SegmentScope.RECORD, collection)
return [vector_segment, record_segment]
return [vector_segment, record_segment, metadata_segment]

@override
def delete_segments(self, collection_id: UUID) -> Sequence[UUID]:
Expand Down
12 changes: 6 additions & 6 deletions chromadb/segment/impl/metadata/grpc_segment.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ def get_metadata(
) -> Sequence[MetadataEmbeddingRecord]:
"""Query for embedding metadata."""

where_pb = self._where_to_proto(where)
where_document_pb = self._where_document_to_proto(where_document)
request: pb.QueryMetadataRequest = pb.QueryMetadataRequest(
segment_id=self._segment["id"].hex,
where=where_pb,
where_document=where_document_pb,
where=self._where_to_proto(where) if where is not None else None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for fixing my silly bugs!

where_document=self._where_document_to_proto(where_document)
if where_document is not None
else None,
ids=ids,
limit=limit,
offset=offset,
Expand All @@ -89,7 +89,7 @@ def get_metadata(
result = self._from_proto(record)
results.append(result)

return []
return results

def _where_to_proto(self, where: Optional[Where]) -> pb.Where:
response = pb.Where()
Expand Down Expand Up @@ -263,7 +263,7 @@ def _where_to_proto(self, where: Optional[Where]) -> pb.Where:
sfc = pb.SingleDoubleComparison()
sfc.value = operand
sfc.generic_comparator = pb.GenericComparator.EQ
dc.double_list_operand.CopyFrom(sfc)
dc.single_double_operand.CopyFrom(sfc)

response.direct_comparison.CopyFrom(dc)
return response
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/coordinator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func init() {
// Query service memberlist
Cmd.Flags().StringVar(&conf.QueryServiceMemberlistName, "query-memberlist-name", "query-service-memberlist", "Query service memberlist name")
Cmd.Flags().StringVar(&conf.QueryServicePodLabel, "query-pod-label", "query-service", "Query pod label")
Cmd.Flags().DurationVar(&conf.WatchInterval, "watch-interval", 60*time.Second, "Watch interval")
Cmd.Flags().DurationVar(&conf.WatchInterval, "watch-interval", 5*time.Second, "Watch interval")
HammadB marked this conversation as resolved.
Show resolved Hide resolved

// Compaction service Memberlist
Cmd.Flags().StringVar(&conf.CompactionServiceMemberlistName, "compaction-memberlist-name", "compaction-service-memberlist", "Compaction memberlist name")
Expand Down
3 changes: 2 additions & 1 deletion rust/worker/src/execution/operators/brute_force_knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct BruteForceKnnOperatorInput {
/// The output of the brute force k-nearest neighbors operator.
/// # Parameters
/// * `data` - The vectors to query against. Only the vectors that are nearest neighbors are visible.
/// * `indices` - The indices of the nearest neighbors. This is a mask against the `query_vecs` input.
/// * `indices` - The indices of the nearest neighbors. This is a mask against the `data` input.
/// One row for each query vector.
/// * `distances` - The distances of the nearest neighbors.
/// One row for each query vector.
Expand Down Expand Up @@ -91,6 +91,7 @@ impl Operator<BruteForceKnnOperatorInput, BruteForceKnnOperatorOutput> for Brute
let embedding = match &log_record.record.embedding {
Some(embedding) => embedding,
None => {
// Implies that the record is a delete or update of irrelevant field
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to go add delete/update

continue;
}
};
Expand Down
204 changes: 204 additions & 0 deletions rust/worker/src/execution/operators/merge_metadata_results.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
use std::f64::consts::E;

use crate::{
blockstore::provider::BlockfileProvider,
errors::{ChromaError, ErrorCodes},
execution::{data::data_chunk::Chunk, operator::Operator},
segment::record_segment::RecordSegmentReader,
types::{
update_metdata_to_metdata, LogRecord, Metadata, MetadataValueConversionError, Segment,
},
};
use async_trait::async_trait;
use thiserror::Error;

#[derive(Debug)]
pub struct MergeMetadataResultsOperator {}

impl MergeMetadataResultsOperator {
pub fn new() -> Box<Self> {
Box::new(MergeMetadataResultsOperator {})
}
}

#[derive(Debug)]
pub struct MergeMetadataResultsOperatorInput {
// The records that were found in the log based on the filter conditions
// TODO: Once we support update/delete this should be MaterializedLogRecord
filtered_log: Chunk<LogRecord>,
// The query ids that were not found in the log, that we need to pull from the record segment
remaining_query_ids: Vec<String>,
// The offset ids that were found in the log, from where/where_document results
filtered_index_offset_ids: Vec<u32>,
record_segment_definition: Segment,
blockfile_provider: BlockfileProvider,
}

impl MergeMetadataResultsOperatorInput {
pub fn new(
filtered_log: Chunk<LogRecord>,
remaining_query_ids: Vec<String>,
filtered_index_offset_ids: Vec<u32>,
record_segment_definition: Segment,
blockfile_provider: BlockfileProvider,
) -> Self {
Self {
filtered_log: filtered_log,
remaining_query_ids: remaining_query_ids,
filtered_index_offset_ids: filtered_index_offset_ids,
record_segment_definition,
blockfile_provider: blockfile_provider,
}
}
}

#[derive(Debug)]
pub struct MergeMetadataResultsOperatorOutput {
pub ids: Vec<String>,
pub metadata: Vec<Option<Metadata>>,
pub documents: Vec<Option<String>>,
}

#[derive(Error, Debug)]
pub enum MergeMetadataResultsOperatorError {
#[error("Error creating Record Segment")]
RecordSegmentError,
#[error("Error reading Record Segment")]
RecordSegmentReadError,
#[error("Error converting metadata")]
MetadataConversionError(#[from] MetadataValueConversionError),
}

impl ChromaError for MergeMetadataResultsOperatorError {
fn code(&self) -> ErrorCodes {
match self {
MergeMetadataResultsOperatorError::RecordSegmentError => ErrorCodes::Internal,
MergeMetadataResultsOperatorError::RecordSegmentReadError => ErrorCodes::Internal,
MergeMetadataResultsOperatorError::MetadataConversionError(e) => e.code(),
}
}
}

pub type MergeMetadataResultsOperatorResult =
Result<MergeMetadataResultsOperatorOutput, MergeMetadataResultsOperatorError>;

#[async_trait]
impl Operator<MergeMetadataResultsOperatorInput, MergeMetadataResultsOperatorOutput>
for MergeMetadataResultsOperator
{
type Error = MergeMetadataResultsOperatorError;

async fn run(
&self,
input: &MergeMetadataResultsOperatorInput,
) -> MergeMetadataResultsOperatorResult {
let record_segment_reader = match RecordSegmentReader::from_segment(
&input.record_segment_definition,
&input.blockfile_provider,
)
.await
{
Ok(reader) => reader,
Err(e) => {
return Err(MergeMetadataResultsOperatorError::RecordSegmentError);
}
};

let mut ids: Vec<String> = Vec::new();
let mut metadata = Vec::new();
let mut documents = Vec::new();

// Hydrate the data from the record segment for filtered data
for index_offset_id in input.filtered_index_offset_ids.iter() {
let record = match record_segment_reader
.get_data_for_offset_id(*index_offset_id as u32)
.await
{
Ok(record) => record,
Err(e) => {
println!("Error reading Record Segment: {:?}", e);
return Err(MergeMetadataResultsOperatorError::RecordSegmentReadError);
}
};

let user_id = match record_segment_reader
.get_user_id_for_offset_id(*index_offset_id as u32)
.await
{
Ok(user_id) => user_id,
Err(e) => {
println!("Error reading Record Segment: {:?}", e);
return Err(MergeMetadataResultsOperatorError::RecordSegmentReadError);
}
};

ids.push(user_id.to_string());
metadata.push(record.metadata.clone());
match record.document {
Some(document) => documents.push(Some(document.to_string())),
None => documents.push(None),
}
}

// Hydrate the data from the record segment for the remaining data
for query_id in input.remaining_query_ids.iter() {
let offset_id = match record_segment_reader
.get_offset_id_for_user_id(query_id)
.await
{
Ok(offset_id) => offset_id,
Err(e) => {
println!("Error reading Record Segment: {:?}", e);
return Err(MergeMetadataResultsOperatorError::RecordSegmentReadError);
}
};

let record = match record_segment_reader
.get_data_for_offset_id(offset_id)
.await
{
Ok(record) => record,
Err(e) => {
println!("Error reading Record Segment: {:?}", e);
return Err(MergeMetadataResultsOperatorError::RecordSegmentReadError);
}
};

ids.push(record.id.to_string());
metadata.push(record.metadata.clone());
match record.document {
Some(document) => documents.push(Some(document.to_string())),
None => documents.push(None),
}
}

// Merge the data from the brute force results
for (log_entry, index) in input.filtered_log.iter() {
ids.push(log_entry.record.id.to_string());
let output_metadata = match &log_entry.record.metadata {
Some(log_metadata) => match update_metdata_to_metdata(log_metadata) {
Ok(metadata) => Some(metadata),
Err(e) => {
println!("Error converting log metadata: {:?}", e);
return Err(MergeMetadataResultsOperatorError::MetadataConversionError(
e,
));
}
},
None => {
println!("No metadata found for log entry");
None
}
};
metadata.push(output_metadata);
// TODO: document
documents.push(Some("log_placeholder".to_string()));
}

Ok(MergeMetadataResultsOperatorOutput {
ids,
metadata,
documents,
})
}
}
1 change: 1 addition & 0 deletions rust/worker/src/execution/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub(super) mod brute_force_knn;
pub(super) mod flush_s3;
pub(super) mod hnsw_knn;
pub(super) mod merge_knn_results;
pub(super) mod merge_metadata_results;
pub(super) mod normalize_vectors;
pub(super) mod partition;
pub(super) mod pull_log;
Expand Down
8 changes: 0 additions & 8 deletions rust/worker/src/execution/orchestration/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,6 @@ impl HnswQueryOrchestrator {
let segment = match segments {
Ok(mut segments) => {
if segments.is_empty() {
println!(
"1. Record segment not found for collection: {:?}",
collection_id
);
return Err(Box::new(HnswSegmentQueryError::RecordSegmentNotFound(
*collection_id,
)));
Expand All @@ -398,10 +394,6 @@ impl HnswQueryOrchestrator {
};

if segment.r#type != SegmentType::Record {
println!(
"2. Record segment not found for collection: {:?}",
collection_id
);
return Err(Box::new(HnswSegmentQueryError::RecordSegmentNotFound(
*collection_id,
)));
Expand Down
Loading
Loading