Skip to content

Commit

Permalink
[PERF] Make pull logs an I/O operator so it runs on main runtime (#2641)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
- This moves pull logs to use the IO operator functionality introduced
in #2603
	 - Renamed IoOperator to IO since its a bit redundant. 
 - New functionality
	 - None

## Test plan
*How are these changes tested?*
Existing tests cover this change
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
HammadB authored Aug 8, 2024
1 parent e4de2da commit cdd7518
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
4 changes: 2 additions & 2 deletions rust/worker/src/execution/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Dispatcher {
/// - task: The task to enqueue
async fn enqueue_task(&mut self, task: TaskMessage) {
match task.get_type() {
OperatorType::IoOperator => {
OperatorType::IO => {
tokio::spawn(async move {
task.run().await;
});
Expand Down Expand Up @@ -292,7 +292,7 @@ mod tests {
}

fn get_type(&self) -> OperatorType {
OperatorType::IoOperator
OperatorType::IO
}
}

Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/execution/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use thiserror::Error;
use uuid::Uuid;

pub(crate) enum OperatorType {
IoOperator,
IO,
Other,
}

Expand Down
12 changes: 7 additions & 5 deletions rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::execution::operator::Operator;
use crate::log::log::Log;
use crate::log::log::PullLogsError;
use crate::execution::operator::{Operator, OperatorType};
use crate::log::log::{Log, PullLogsError};
use async_trait::async_trait;
use chroma_types::Chunk;
use chroma_types::LogRecord;
use chroma_types::{Chunk, LogRecord};
use uuid::Uuid;

/// The pull logs operator is responsible for reading logs from the log service.
Expand Down Expand Up @@ -92,6 +90,10 @@ impl Operator<PullLogsInput, PullLogsOutput> for PullLogsOperator {
"PullLogsOperator"
}

fn get_type(&self) -> OperatorType {
OperatorType::IO
}

async fn run(&self, input: &PullLogsInput) -> Result<PullLogsOutput, PullLogsError> {
// We expect the log to be cheaply cloneable, we need to clone it since we need
// a mutable reference to it. Not necessarily the best, but it works for our needs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,6 @@ impl Operator<RecordSegmentPrefetchIoInput, RecordSegmentPrefetchIoOutput>
}

fn get_type(&self) -> OperatorType {
OperatorType::IoOperator
OperatorType::IO
}
}

0 comments on commit cdd7518

Please sign in to comment.