Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Compaction error replies #2456

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions rust/worker/src/execution/orchestration/common.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
errors::{ChromaError, ErrorCodes},
sysdb::sysdb::{GetCollectionsError, GetSegmentsError, SysDb},
system::{Component, ComponentContext},
types::{Collection, Segment, SegmentType},
};
use thiserror::Error;
Expand Down Expand Up @@ -148,3 +149,32 @@ pub(super) async fn get_record_segment_by_collection_id(
}
Ok(segment)
}

/// Terminate the orchestrator with an error
/// This function sends an error to the result channel and cancels the orchestrator
/// so it stops processing
/// # Arguments
/// * `result_channel` - The result channel to send the error to
/// * `error` - The error to send
/// * `ctx` - The component context
/// # Panics
/// This function panics if the result channel is not set
pub(super) fn terminate_with_error<Output, C>(
mut result_channel: Option<tokio::sync::oneshot::Sender<Result<Output, Box<dyn ChromaError>>>>,
error: Box<dyn ChromaError>,
ctx: &ComponentContext<C>,
) where
C: Component,
{
let result_channel = result_channel
.take()
.expect("Invariant violation. Result channel is not set.");
match result_channel.send(Err(error)) {
Ok(_) => (),
Err(_) => {
tracing::error!("Result channel dropped before sending error");
}
}
// Cancel the orchestrator so it stops processing
ctx.cancellation_token.cancel();
}
89 changes: 46 additions & 43 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::execution::operators::write_segments::WriteSegmentsInput;
use crate::execution::operators::write_segments::WriteSegmentsOperator;
use crate::execution::operators::write_segments::WriteSegmentsOperatorError;
use crate::execution::operators::write_segments::WriteSegmentsOutput;
use crate::execution::orchestration::common::terminate_with_error;
use crate::index::hnsw_provider::HnswIndexProvider;
use crate::log::log::Log;
use crate::log::log::PullLogsError;
Expand Down Expand Up @@ -134,6 +135,18 @@ impl ChromaError for GetSegmentWritersError {
}
}

#[derive(Error, Debug)]
enum CompactionError {
#[error(transparent)]
SystemTimeError(#[from] std::time::SystemTimeError),
}

impl ChromaError for CompactionError {
fn code(&self) -> crate::errors::ErrorCodes {
crate::errors::ErrorCodes::Internal
}
}

// TODO: we need to improve this response
#[derive(Debug)]
pub struct CompactionResponse {
Expand Down Expand Up @@ -183,6 +196,7 @@ impl CompactOrchestrator {
async fn pull_logs(
&mut self,
self_address: Box<dyn ReceiverForMessage<TaskResult<PullLogsOutput, PullLogsError>>>,
ctx: &crate::system::ComponentContext<CompactOrchestrator>,
) {
self.state = ExecutionState::PullLogs;
let operator = PullLogsOperator::new(self.log.clone());
Expand All @@ -192,7 +206,11 @@ impl CompactOrchestrator {
// TODO: change protobuf definition to use u64 instead of i64
Ok(end_timestamp) => end_timestamp.as_nanos() as i64,
Err(e) => {
// Log an error and reply + return
terminate_with_error(
self.result_channel.take(),
Box::new(CompactionError::SystemTimeError(e)),
ctx,
);
return;
}
};
Expand Down Expand Up @@ -496,7 +514,7 @@ impl Component for CompactOrchestrator {
}

async fn on_start(&mut self, ctx: &crate::system::ComponentContext<Self>) -> () {
self.pull_logs(ctx.receiver()).await;
self.pull_logs(ctx.receiver(), ctx).await;
}
}

Expand All @@ -514,28 +532,20 @@ impl Handler<TaskResult<PullLogsOutput, PullLogsError>> for CompactOrchestrator
let records = match message {
Ok(result) => result.logs(),
Err(e) => {
// Log an error and return
let result_channel = match self.result_channel.take() {
Some(tx) => tx,
None => {
// Log an error
return;
}
};
let _ = result_channel.send(Err(Box::new(e)));
terminate_with_error(self.result_channel.take(), Box::new(e), ctx);
return;
}
};
println!("Pulled Records: {:?}", records.len());
tracing::info!("Pulled Records: {:?}", records.len());
let final_record_pulled = records.get(records.len() - 1);
match final_record_pulled {
Some(record) => {
self.pulled_log_offset = Some(record.log_offset);
println!("Pulled Logs Up To Offset: {:?}", self.pulled_log_offset);
tracing::info!("Pulled Logs Up To Offset: {:?}", self.pulled_log_offset);
self.partition(records, ctx.receiver()).await;
}
None => {
// Log an error and return
// TODO: Log an error and return
return;
}
}
Expand All @@ -549,25 +559,18 @@ impl Handler<TaskResult<PartitionOutput, PartitionError>> for CompactOrchestrato
async fn handle(
&mut self,
message: TaskResult<PartitionOutput, PartitionError>,
_ctx: &crate::system::ComponentContext<CompactOrchestrator>,
ctx: &crate::system::ComponentContext<CompactOrchestrator>,
) {
let message = message.into_inner();
let records = match message {
Ok(result) => result.records,
Err(e) => {
// Log an error and return
let result_channel = match self.result_channel.take() {
Some(tx) => tx,
None => {
// Log an error
return;
}
};
let _ = result_channel.send(Err(Box::new(e)));
tracing::error!("Error partitioning records: {:?}", e);
terminate_with_error(self.result_channel.take(), Box::new(e), ctx);
return;
}
};
self.write(records, _ctx.receiver()).await;
self.write(records, ctx.receiver()).await;
}
}

Expand All @@ -578,17 +581,17 @@ impl Handler<TaskResult<WriteSegmentsOutput, WriteSegmentsOperatorError>> for Co
async fn handle(
&mut self,
message: TaskResult<WriteSegmentsOutput, WriteSegmentsOperatorError>,
_ctx: &crate::system::ComponentContext<CompactOrchestrator>,
ctx: &crate::system::ComponentContext<CompactOrchestrator>,
) {
let message = message.into_inner();
println!("Write Segments Result: {:?}", message);
let output = match message {
Ok(output) => {
self.num_write_tasks -= 1;
output
}
Err(e) => {
// Log an error
tracing::error!("Error writing segments: {:?}", e);
terminate_with_error(self.result_channel.take(), Box::new(e), ctx);
return;
}
};
Expand All @@ -602,15 +605,17 @@ impl Handler<TaskResult<WriteSegmentsOutput, WriteSegmentsOperatorError>> for Co
let mut writer = output.metadata_segment_writer.clone();
match writer.write_to_blockfiles().await {
Ok(()) => (),
Err(_) => {
Err(e) => {
tracing::error!("Error writing metadata segment out to blockfiles: {:?}", e);
terminate_with_error(self.result_channel.take(), Box::new(e), ctx);
return;
}
}
self.flush_s3(
output.record_segment_writer,
output.hnsw_segment_writer,
output.metadata_segment_writer,
_ctx.receiver(),
ctx.receiver(),
)
.await;
}
Expand All @@ -624,7 +629,7 @@ impl Handler<TaskResult<FlushS3Output, Box<dyn ChromaError>>> for CompactOrchest
async fn handle(
&mut self,
message: TaskResult<FlushS3Output, Box<dyn ChromaError>>,
_ctx: &crate::system::ComponentContext<CompactOrchestrator>,
ctx: &crate::system::ComponentContext<CompactOrchestrator>,
) {
let message = message.into_inner();
match message {
Expand All @@ -633,12 +638,13 @@ impl Handler<TaskResult<FlushS3Output, Box<dyn ChromaError>>> for CompactOrchest
self.register(
self.pulled_log_offset.unwrap(),
msg.segment_flush_info,
_ctx.receiver(),
ctx.receiver(),
)
.await;
}
Err(e) => {
// Log an error
tracing::error!("Error flushing to S3: {:?}", e);
terminate_with_error(self.result_channel.take(), e, ctx);
}
}
}
Expand All @@ -651,17 +657,14 @@ impl Handler<TaskResult<RegisterOutput, RegisterError>> for CompactOrchestrator
async fn handle(
&mut self,
message: TaskResult<RegisterOutput, RegisterError>,
_ctx: &crate::system::ComponentContext<CompactOrchestrator>,
ctx: &crate::system::ComponentContext<CompactOrchestrator>,
) {
let message = message.into_inner();
// Return execution state to the compaction manager
let result_channel = match self.result_channel.take() {
Some(tx) => tx,
None => {
// Log an error
return;
}
};
let result_channel = self
.result_channel
.take()
.expect("Invariant violation. Result channel is not set.");

match message {
Ok(_) => {
Expand All @@ -673,8 +676,8 @@ impl Handler<TaskResult<RegisterOutput, RegisterError>> for CompactOrchestrator
let _ = result_channel.send(Ok(response));
}
Err(e) => {
// Log an error
let _ = result_channel.send(Err(Box::new(e)));
tracing::error!("Error registering compaction: {:?}", e);
terminate_with_error(Some(result_channel), Box::new(e), ctx);
}
}
}
Expand Down
50 changes: 24 additions & 26 deletions rust/worker/src/execution/orchestration/get_vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use crate::{
execution::{
data::data_chunk::Chunk,
dispatcher::Dispatcher,
operator::{wrap, TaskMessage, TaskResult},
operator::{wrap, TaskResult},
operators::{
get_vectors_operator::{
GetVectorsOperator, GetVectorsOperatorError, GetVectorsOperatorInput,
GetVectorsOperatorOutput,
},
pull_log::{PullLogsInput, PullLogsOperator, PullLogsOutput},
},
orchestration::common::terminate_with_error,
},
log::log::{Log, PullLogsError},
sysdb::sysdb::SysDb,
Expand Down Expand Up @@ -115,7 +116,11 @@ impl GetVectorsOrchestrator {
// TODO: change protobuf definition to use u64 instead of i64
Ok(end_timestamp) => end_timestamp.as_nanos() as i64,
Err(e) => {
self.terminate_with_error(Box::new(GetVectorsError::SystemTimeError(e)), ctx);
terminate_with_error(
self.result_channel.take(),
Box::new(GetVectorsError::SystemTimeError(e)),
ctx,
);
return;
}
};
Expand All @@ -140,7 +145,11 @@ impl GetVectorsOrchestrator {
match self.dispatcher.send(task, Some(Span::current())).await {
Ok(_) => (),
Err(e) => {
self.terminate_with_error(Box::new(GetVectorsError::TaskSendError(e)), ctx);
terminate_with_error(
self.result_channel.take(),
Box::new(GetVectorsError::TaskSendError(e)),
ctx,
);
}
}
}
Expand Down Expand Up @@ -172,25 +181,13 @@ impl GetVectorsOrchestrator {
match self.dispatcher.send(task, Some(Span::current())).await {
Ok(_) => (),
Err(e) => {
self.terminate_with_error(Box::new(GetVectorsError::TaskSendError(e)), ctx);
}
}
}

fn terminate_with_error(&mut self, error: Box<dyn ChromaError>, ctx: &ComponentContext<Self>) {
let result_channel = self
.result_channel
.take()
.expect("Invariant violation. Result channel is not set.");
match result_channel.send(Err(error)) {
Ok(_) => (),
Err(e) => {
// Log an error - this implied the listener was dropped
println!("[HnswQueryOrchestrator] Result channel dropped before sending error");
terminate_with_error(
self.result_channel.take(),
Box::new(GetVectorsError::TaskSendError(e)),
ctx,
);
}
}
// Cancel the orchestrator so it stops processing
ctx.cancellation_token.cancel();
}

/// Run the orchestrator and return the result.
Expand Down Expand Up @@ -225,15 +222,16 @@ impl Component for GetVectorsOrchestrator {
match get_hnsw_segment_by_id(self.sysdb.clone(), &self.hnsw_segment_id).await {
Ok(segment) => segment,
Err(e) => {
self.terminate_with_error(e, ctx);
terminate_with_error(self.result_channel.take(), e, ctx);
return;
}
};

let collection_id = match &hnsw_segment.collection {
Some(collection_id) => collection_id,
None => {
self.terminate_with_error(
terminate_with_error(
self.result_channel.take(),
Box::new(GetVectorsError::HnswSegmentHasNoCollection),
ctx,
);
Expand All @@ -244,7 +242,7 @@ impl Component for GetVectorsOrchestrator {
let collection = match get_collection_by_id(self.sysdb.clone(), collection_id).await {
Ok(collection) => collection,
Err(e) => {
self.terminate_with_error(e, ctx);
terminate_with_error(self.result_channel.take(), e, ctx);
return;
}
};
Expand All @@ -253,7 +251,7 @@ impl Component for GetVectorsOrchestrator {
match get_record_segment_by_collection_id(self.sysdb.clone(), collection_id).await {
Ok(segment) => segment,
Err(e) => {
self.terminate_with_error(e, ctx);
terminate_with_error(self.result_channel.take(), e, ctx);
return;
}
};
Expand Down Expand Up @@ -283,7 +281,7 @@ impl Handler<TaskResult<PullLogsOutput, PullLogsError>> for GetVectorsOrchestrat
self.get_vectors(ctx.receiver(), logs, ctx).await;
}
Err(e) => {
self.terminate_with_error(Box::new(e), ctx);
terminate_with_error(self.result_channel.take(), Box::new(e), ctx);
}
}
}
Expand Down Expand Up @@ -324,7 +322,7 @@ impl Handler<TaskResult<GetVectorsOperatorOutput, GetVectorsOperatorError>>
ctx.cancellation_token.cancel();
}
Err(e) => {
self.terminate_with_error(Box::new(e), ctx);
terminate_with_error(self.result_channel.take(), Box::new(e), ctx);
}
}
}
Expand Down
Loading
Loading