Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Add tracing for task names, materialize logs tracing, and pull logs count #2479

Merged
merged 3 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rust/worker/src/execution/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ mod tests {
#[async_trait]
impl Operator<f32, String> for MockOperator {
type Error = ();

fn get_name(&self) -> &'static str {
"MockOperator"
}

async fn run(&self, input: &f32) -> Result<String, Self::Error> {
// sleep to simulate work
tokio::time::sleep(tokio::time::Duration::from_millis(
Expand Down
6 changes: 6 additions & 0 deletions rust/worker/src/execution/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ where
// It would have been nice to do this with a default trait for result
// but that's not stable in rust yet.
async fn run(&self, input: &I) -> Result<O, Self::Error>;
fn get_name(&self) -> &'static str;
}

/// A task result is a wrapper around the result of a task.
Expand Down Expand Up @@ -56,6 +57,7 @@ pub(crate) type TaskMessage = Box<dyn TaskWrapper>;
/// erase the I, O types from the Task struct so that tasks.
#[async_trait]
pub(crate) trait TaskWrapper: Send + Debug {
fn get_name(&self) -> &'static str;
async fn run(&self);
fn id(&self) -> Uuid;
}
Expand All @@ -70,6 +72,10 @@ where
Input: Send + Sync + Debug,
Output: Send + Sync + Debug,
{
fn get_name(&self) -> &'static str {
self.operator.get_name()
}

async fn run(&self) {
let result = self.operator.run(&self.input).await;
let task_result = TaskResult {
Expand Down
11 changes: 10 additions & 1 deletion rust/worker/src/execution/operators/brute_force_knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use thiserror::Error;
use tracing::trace;
use tracing::Instrument;

/// The brute force k-nearest neighbors operator is responsible for computing the k-nearest neighbors
/// of a given query vector against a set of vectors using brute force calculation.
Expand Down Expand Up @@ -115,6 +116,10 @@ impl ChromaError for BruteForceKnnOperatorError {
impl Operator<BruteForceKnnOperatorInput, BruteForceKnnOperatorOutput> for BruteForceKnnOperator {
type Error = BruteForceKnnOperatorError;

fn get_name(&self) -> &'static str {
"BruteForceKnnOperator"
}

async fn run(
&self,
input: &BruteForceKnnOperatorInput,
Expand All @@ -134,7 +139,11 @@ impl Operator<BruteForceKnnOperatorInput, BruteForceKnnOperatorOutput> for Brute
}
};
let log_materializer = LogMaterializer::new(record_segment_reader, input.log.clone(), None);
let logs = match log_materializer.materialize().await {
let logs = match log_materializer
.materialize()
.instrument(tracing::info_span!("Materialize logs"))
HammadB marked this conversation as resolved.
Show resolved Hide resolved
.await
{
Ok(logs) => logs,
Err(e) => {
return Err(BruteForceKnnOperatorError::LogMaterializationError(e));
Expand Down
7 changes: 7 additions & 0 deletions rust/worker/src/execution/operators/count_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ impl ChromaError for CountRecordsError {
#[async_trait]
impl Operator<CountRecordsInput, CountRecordsOutput> for CountRecordsOperator {
type Error = CountRecordsError;

fn get_name(&self) -> &'static str {
"CountRecordsOperator"
}

async fn run(
&self,
input: &CountRecordsInput,
Expand Down Expand Up @@ -204,6 +209,7 @@ mod tests {
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::{collections::HashMap, str::FromStr};
use tracing::Instrument;
use uuid::Uuid;

#[tokio::test]
Expand Down Expand Up @@ -286,6 +292,7 @@ mod tests {
let materializer = LogMaterializer::new(record_segment_reader, data, None);
let mat_records = materializer
.materialize()
.instrument(tracing::info_span!("Materialize logs"))
.await
.expect("Log materialization failed");
segment_writer
Expand Down
4 changes: 4 additions & 0 deletions rust/worker/src/execution/operators/flush_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub struct FlushS3Output {
impl Operator<FlushS3Input, FlushS3Output> for FlushS3Operator {
type Error = Box<dyn ChromaError>;

fn get_name(&self) -> &'static str {
"FlushS3Operator"
}

async fn run(&self, input: &FlushS3Input) -> Result<FlushS3Output, Self::Error> {
let record_segment_flusher = input.record_segment_writer.clone().commit();
let record_segment_flush_info = match record_segment_flusher {
Expand Down
4 changes: 4 additions & 0 deletions rust/worker/src/execution/operators/get_vectors_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ impl ChromaError for GetVectorsOperatorError {
impl Operator<GetVectorsOperatorInput, GetVectorsOperatorOutput> for GetVectorsOperator {
type Error = GetVectorsOperatorError;

fn get_name(&self) -> &'static str {
"GetVectorsOperator"
}

async fn run(
&self,
input: &GetVectorsOperatorInput,
Expand Down
11 changes: 10 additions & 1 deletion rust/worker/src/execution/operators/hnsw_knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use async_trait::async_trait;
use std::collections::HashSet;
use std::sync::Arc;
use thiserror::Error;
use tracing::Instrument;

#[derive(Debug)]
pub struct HnswKnnOperator {}
Expand Down Expand Up @@ -107,6 +108,10 @@ impl HnswKnnOperator {
impl Operator<HnswKnnOperatorInput, HnswKnnOperatorOutput> for HnswKnnOperator {
type Error = Box<dyn ChromaError>;

fn get_name(&self) -> &'static str {
"HnswKnnOperator"
}

async fn run(
&self,
input: &HnswKnnOperatorInput,
Expand Down Expand Up @@ -140,7 +145,11 @@ impl Operator<HnswKnnOperatorInput, HnswKnnOperatorOutput> for HnswKnnOperator {
input.logs.clone(),
None,
);
let logs = match log_materializer.materialize().await {
let logs = match log_materializer
.materialize()
.instrument(tracing::info_span!("Materialize logs"))
.await
{
Ok(logs) => logs,
Err(e) => {
tracing::error!("[HnswKnnOperation]: Error materializing logs {:?}", e);
Expand Down
4 changes: 4 additions & 0 deletions rust/worker/src/execution/operators/merge_knn_results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ impl Operator<MergeKnnResultsOperatorInput, MergeKnnResultsOperatorOutput>
{
type Error = Box<dyn ChromaError>;

fn get_name(&self) -> &'static str {
"MergeKnnResultsOperator"
}

async fn run(
&self,
input: &MergeKnnResultsOperatorInput,
Expand Down
12 changes: 10 additions & 2 deletions rust/worker/src/execution/operators/merge_metadata_results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
use async_trait::async_trait;
use std::collections::HashSet;
use thiserror::Error;
use tracing::{error, trace};
use tracing::{error, trace, Instrument};

#[derive(Debug)]
pub struct MergeMetadataResultsOperator {}
Expand Down Expand Up @@ -89,6 +89,10 @@ impl Operator<MergeMetadataResultsOperatorInput, MergeMetadataResultsOperatorOut
{
type Error = MergeMetadataResultsOperatorError;

fn get_name(&self) -> &'static str {
"MergeMetadataResultsOperator"
}

async fn run(
&self,
input: &MergeMetadataResultsOperatorInput,
Expand Down Expand Up @@ -160,7 +164,11 @@ impl Operator<MergeMetadataResultsOperatorInput, MergeMetadataResultsOperatorOut
// Step 1: Materialize the logs.
let materializer =
LogMaterializer::new(record_segment_reader, input.filtered_log.clone(), None);
let mat_records = match materializer.materialize().await {
let mat_records = match materializer
.materialize()
.instrument(tracing::info_span!("Materialize logs"))
.await
{
Ok(records) => records,
Err(e) => {
return Err(MergeMetadataResultsOperatorError::LogMaterializationError(
Expand Down
12 changes: 11 additions & 1 deletion rust/worker/src/execution/operators/metadata_filtering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use roaring::RoaringBitmap;
use std::collections::{HashMap, HashSet};
use thiserror::Error;
use tonic::async_trait;
use tracing::Instrument;

#[derive(Debug)]
pub(crate) struct MetadataFilteringOperator {}
Expand Down Expand Up @@ -110,6 +111,11 @@ impl ChromaError for MetadataFilteringError {
#[async_trait]
impl Operator<MetadataFilteringInput, MetadataFilteringOutput> for MetadataFilteringOperator {
type Error = MetadataFilteringError;

fn get_name(&self) -> &'static str {
"MetadataFilteringOperator"
}

async fn run(
&self,
input: &MetadataFilteringInput,
Expand Down Expand Up @@ -147,7 +153,11 @@ impl Operator<MetadataFilteringInput, MetadataFilteringOutput> for MetadataFilte
// Step 1: Materialize the logs.
let materializer =
LogMaterializer::new(record_segment_reader, input.log_record.clone(), None);
let mat_records = match materializer.materialize().await {
let mat_records = match materializer
.materialize()
.instrument(tracing::info_span!("Materialize logs"))
.await
{
Ok(records) => records,
Err(e) => {
return Err(MetadataFilteringError::MetadataFilteringLogMaterializationError(e));
Expand Down
4 changes: 4 additions & 0 deletions rust/worker/src/execution/operators/normalize_vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ impl Operator<NormalizeVectorOperatorInput, NormalizeVectorOperatorOutput>
{
type Error = ();

fn get_name(&self) -> &'static str {
"NormalizeVectorOperator"
}

async fn run(
&self,
input: &NormalizeVectorOperatorInput,
Expand Down
4 changes: 4 additions & 0 deletions rust/worker/src/execution/operators/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ impl PartitionOperator {
impl Operator<PartitionInput, PartitionOutput> for PartitionOperator {
type Error = PartitionError;

fn get_name(&self) -> &'static str {
"PartitionOperator"
}

async fn run(&self, input: &PartitionInput) -> Result<PartitionOutput, PartitionError> {
let records = &input.records;
let partition_size = self.determine_partition_size(records.len(), input.max_partition_size);
Expand Down
5 changes: 5 additions & 0 deletions rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl PullLogsOutput {
impl Operator<PullLogsInput, PullLogsOutput> for PullLogsOperator {
type Error = PullLogsError;

fn get_name(&self) -> &'static str {
"PullLogsOperator"
}

async fn run(&self, input: &PullLogsInput) -> Result<PullLogsOutput, PullLogsError> {
// We expect the log to be cheaply cloneable, we need to clone it since we need
// a mutable reference to it. Not necessarily the best, but it works for our needs.
Expand Down Expand Up @@ -133,6 +137,7 @@ impl Operator<PullLogsInput, PullLogsOutput> for PullLogsOperator {
if input.num_records.is_some() && result.len() > input.num_records.unwrap() as usize {
result.truncate(input.num_records.unwrap() as usize);
}
tracing::info!("[PullLogsOperator]: Read {} records", result.len());
// Convert to DataChunk
let data_chunk = Chunk::new(result.into());
Ok(PullLogsOutput::new(data_chunk))
Expand Down
4 changes: 4 additions & 0 deletions rust/worker/src/execution/operators/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ impl ChromaError for RegisterError {
impl Operator<RegisterInput, RegisterOutput> for RegisterOperator {
type Error = RegisterError;

fn get_name(&self) -> &'static str {
"RegisterOperator"
}

async fn run(&self, input: &RegisterInput) -> Result<RegisterOutput, RegisterError> {
let mut sysdb = input.sysdb.clone();
let mut log = input.log.clone();
Expand Down
11 changes: 10 additions & 1 deletion rust/worker/src/execution/operators/write_segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
};
use async_trait::async_trait;
use thiserror::Error;
use tracing::Instrument;

#[derive(Error, Debug)]
pub enum WriteSegmentsOperatorError {
Expand Down Expand Up @@ -98,6 +99,10 @@ pub struct WriteSegmentsOutput {
impl Operator<WriteSegmentsInput, WriteSegmentsOutput> for WriteSegmentsOperator {
type Error = WriteSegmentsOperatorError;

fn get_name(&self) -> &'static str {
"WriteSegmentsOperator"
}

async fn run(&self, input: &WriteSegmentsInput) -> Result<WriteSegmentsOutput, Self::Error> {
tracing::debug!("Materializing N Records: {:?}", input.chunk.len());
// Prepare for log materialization.
Expand Down Expand Up @@ -138,7 +143,11 @@ impl Operator<WriteSegmentsInput, WriteSegmentsOutput> for WriteSegmentsOperator
Some(input.offset_id.clone()),
);
// Materialize the logs.
let res = match materializer.materialize().await {
let res = match materializer
.materialize()
.instrument(tracing::info_span!("Materialize logs"))
.await
{
Ok(records) => records,
Err(e) => {
tracing::error!("Error materializing records {}", e);
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/orchestration/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl HnswQueryOrchestrator {
Err(e) => {
match *e {
DistributedHNSWSegmentFromSegmentError::Uninitialized => {
tracing::error!("[HnswQueryOperation]: Error creating distributed hnsw segment reader {:?}", *e);
tracing::info!("[HnswQueryOperation]: Uninitialied reader {:?}", *e);
// no task, decrement the merge dependency count and return
// with an empty result
for (i, _) in self.query_vectors.iter().enumerate() {
Expand Down
5 changes: 4 additions & 1 deletion rust/worker/src/execution/worker_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::{dispatcher::TaskRequestMessage, operator::TaskMessage};
use crate::system::{Component, ComponentContext, ComponentRuntime, Handler, ReceiverForMessage};
use async_trait::async_trait;
use std::fmt::{Debug, Formatter, Result};
use tracing::{trace_span, Instrument, Span};

/// A worker thread is responsible for executing tasks
/// It sends requests to the dispatcher for new tasks.
Expand Down Expand Up @@ -56,7 +57,9 @@ impl Handler<TaskMessage> for WorkerThread {
type Result = ();

async fn handle(&mut self, task: TaskMessage, ctx: &ComponentContext<WorkerThread>) {
task.run().await;
let child_span =
trace_span!(parent: Span::current(), "Task execution", name = task.get_name());
task.run().instrument(child_span).await;
let req: TaskRequestMessage = TaskRequestMessage::new(ctx.receiver());
let res = self.dispatcher.send(req, None).await;
// TODO: task run should be able to error and we should send it as part of the result
Expand Down
23 changes: 9 additions & 14 deletions rust/worker/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,21 +143,16 @@ impl WorkerServer {

let mut proto_results_for_all = Vec::new();

let parse_vectors_span = trace_span!("Input vectors parsing");
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this as its not useful and dumps the vectors to the event

let mut query_vectors = Vec::new();
let _ = parse_vectors_span.in_scope(|| {
for proto_query_vector in request.vectors {
let (query_vector, _encoding) = match proto_query_vector.try_into() {
Ok((vector, encoding)) => (vector, encoding),
Err(e) => {
return Err(Status::internal(format!("Error converting vector: {}", e)));
}
};
query_vectors.push(query_vector);
}
trace!("Parsed vectors {:?}", query_vectors);
Ok(())
});
for proto_query_vector in request.vectors {
let (query_vector, _encoding) = match proto_query_vector.try_into() {
Ok((vector, encoding)) => (vector, encoding),
Err(e) => {
return Err(Status::internal(format!("Error converting vector: {}", e)));
}
};
query_vectors.push(query_vector);
}

let dispatcher = match self.dispatcher {
Some(ref dispatcher) => dispatcher.clone(),
Expand Down
Loading