Skip to content

Commit

Permalink
[FEAT] Decouple pipeline building and running from new executor (#2522)
Browse files Browse the repository at this point in the history
Co-authored-by: Colin Ho <[email protected]>
  • Loading branch information
colin-ho and Colin Ho authored Jul 17, 2024
1 parent 11bd106 commit 474427a
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 101 deletions.
1 change: 1 addition & 0 deletions src/daft-local-execution/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod channel;
mod intermediate_ops;
mod pipeline;
mod run;
mod sinks;
mod sources;
Expand Down
185 changes: 185 additions & 0 deletions src/daft-local-execution/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
use std::{collections::HashMap, sync::Arc};

use daft_dsl::Expr;
use daft_micropartition::MicroPartition;
use daft_physical_plan::{
Concat, Filter, InMemoryScan, Limit, LocalPhysicalPlan, PhysicalScan, Project,
UnGroupedAggregate,
};
use daft_plan::populate_aggregation_stages;

use crate::{
channel::MultiSender,
intermediate_ops::{
aggregate::AggregateOperator,
filter::FilterOperator,
intermediate_op::{run_intermediate_op, IntermediateOperator},
project::ProjectOperator,
},
sinks::{
aggregate::AggregateSink,
concat::ConcatSink,
limit::LimitSink,
sink::{run_double_input_sink, run_single_input_sink, DoubleInputSink, SingleInputSink},
},
sources::{
in_memory::InMemorySource,
scan_task::ScanTaskSource,
source::{run_source, Source},
},
};

pub enum PipelineNode {
Source {
source: Arc<dyn Source>,
},
IntermediateOp {
intermediate_op: Box<dyn IntermediateOperator>,
child: Box<PipelineNode>,
},
SingleInputSink {
sink: Box<dyn SingleInputSink>,
child: Box<PipelineNode>,
},
DoubleInputSink {
sink: Box<dyn DoubleInputSink>,
left_child: Box<PipelineNode>,
right_child: Box<PipelineNode>,
},
}

impl PipelineNode {
pub fn start(&self, sender: MultiSender) {
match self {
PipelineNode::Source { source } => {
run_source(source.clone(), sender);
}
PipelineNode::IntermediateOp {
intermediate_op,
child,
} => {
let sender = run_intermediate_op(intermediate_op.clone(), sender);
child.start(sender);
}
PipelineNode::SingleInputSink { sink, child } => {
let sender = run_single_input_sink(sink.clone(), sender);
child.start(sender);
}
PipelineNode::DoubleInputSink {
sink,
left_child,
right_child,
} => {
let (left_sender, right_sender) = run_double_input_sink(sink.clone(), sender);
left_child.start(left_sender);
right_child.start(right_sender);
}
}
}
}

pub fn physical_plan_to_pipeline(
physical_plan: &LocalPhysicalPlan,
psets: &HashMap<String, Vec<Arc<MicroPartition>>>,
) -> PipelineNode {
match physical_plan {
LocalPhysicalPlan::PhysicalScan(PhysicalScan { scan_tasks, .. }) => {
let scan_task_source = ScanTaskSource::new(scan_tasks.clone());
PipelineNode::Source {
source: Arc::new(scan_task_source),
}
}
LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => {
let partitions = psets.get(&info.cache_key).expect("Cache key not found");
let in_memory_source = InMemorySource::new(partitions.clone());
PipelineNode::Source {
source: Arc::new(in_memory_source),
}
}
LocalPhysicalPlan::Project(Project {
input, projection, ..
}) => {
let proj_op = ProjectOperator::new(projection.clone());
let child_node = physical_plan_to_pipeline(input, psets);
PipelineNode::IntermediateOp {
intermediate_op: Box::new(proj_op),
child: Box::new(child_node),
}
}
LocalPhysicalPlan::Filter(Filter {
input, predicate, ..
}) => {
let filter_op = FilterOperator::new(predicate.clone());
let child_node = physical_plan_to_pipeline(input, psets);
PipelineNode::IntermediateOp {
intermediate_op: Box::new(filter_op),
child: Box::new(child_node),
}
}
LocalPhysicalPlan::Limit(Limit {
input, num_rows, ..
}) => {
let sink = LimitSink::new(*num_rows as usize);
let child_node = physical_plan_to_pipeline(input, psets);
PipelineNode::SingleInputSink {
sink: Box::new(sink),
child: Box::new(child_node),
}
}
LocalPhysicalPlan::Concat(Concat { input, other, .. }) => {
let sink = ConcatSink::new();
let left_child = physical_plan_to_pipeline(input, psets);
let right_child = physical_plan_to_pipeline(other, psets);
PipelineNode::DoubleInputSink {
sink: Box::new(sink),
left_child: Box::new(left_child),
right_child: Box::new(right_child),
}
}
LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate {
input,
aggregations,
schema,
..
}) => {
let (first_stage_aggs, second_stage_aggs, final_exprs) =
populate_aggregation_stages(aggregations, schema, &[]);
let first_stage_agg_op = AggregateOperator::new(
first_stage_aggs
.values()
.cloned()
.map(|e| Arc::new(Expr::Agg(e.clone())))
.collect(),
vec![],
);
let second_stage_agg_sink = AggregateSink::new(
second_stage_aggs
.values()
.cloned()
.map(|e| Arc::new(Expr::Agg(e.clone())))
.collect(),
vec![],
);
let final_stage_project = ProjectOperator::new(final_exprs);

let child_node = physical_plan_to_pipeline(input, psets);
let intermediate_agg_op_node = PipelineNode::IntermediateOp {
intermediate_op: Box::new(first_stage_agg_op),
child: Box::new(child_node),
};

let sink_node = PipelineNode::SingleInputSink {
sink: Box::new(second_stage_agg_sink),
child: Box::new(intermediate_agg_op_node),
};

PipelineNode::IntermediateOp {
intermediate_op: Box::new(final_stage_project),
child: Box::new(sink_node),
}
}
_ => {
unimplemented!("Physical plan not supported: {}", physical_plan.name());
}
}
}
105 changes: 4 additions & 101 deletions src/daft-local-execution/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
use std::{collections::HashMap, sync::Arc};

use common_error::DaftResult;
use daft_dsl::Expr;
use daft_micropartition::MicroPartition;
use daft_physical_plan::{
translate, Concat, Filter, InMemoryScan, Limit, LocalPhysicalPlan, PhysicalScan, Project,
UnGroupedAggregate,
};
use daft_plan::populate_aggregation_stages;
use daft_physical_plan::{translate, LocalPhysicalPlan};

#[cfg(feature = "python")]
use {
Expand All @@ -16,20 +11,7 @@ use {
pyo3::{pyclass, pymethods, IntoPy, PyObject, PyRef, PyRefMut, PyResult, Python},
};

use crate::{
channel::{create_channel, MultiSender},
intermediate_ops::{
aggregate::AggregateOperator, filter::FilterOperator, intermediate_op::run_intermediate_op,
project::ProjectOperator,
},
sinks::{
aggregate::AggregateSink,
concat::ConcatSink,
limit::LimitSink,
sink::{run_double_input_sink, run_single_input_sink},
},
sources::{in_memory::InMemorySource, scan_task::ScanTaskSource, source::run_source},
};
use crate::{channel::create_channel, pipeline::physical_plan_to_pipeline};

#[cfg(feature = "python")]
#[pyclass]
Expand Down Expand Up @@ -103,8 +85,9 @@ pub fn run_local(
) -> DaftResult<Box<dyn Iterator<Item = DaftResult<Arc<MicroPartition>>> + Send>> {
let runtime = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime");
let res = runtime.block_on(async {
let pipeline = physical_plan_to_pipeline(physical_plan, &psets);
let (sender, mut receiver) = create_channel(1, true);
run_physical_plan(physical_plan, &psets, sender);
pipeline.start(sender);
let mut result = vec![];
while let Some(val) = receiver.recv().await {
result.push(val);
Expand All @@ -113,83 +96,3 @@ pub fn run_local(
});
Ok(Box::new(res))
}

pub fn run_physical_plan(
physical_plan: &LocalPhysicalPlan,
psets: &HashMap<String, Vec<Arc<MicroPartition>>>,
sender: MultiSender,
) {
match physical_plan {
LocalPhysicalPlan::PhysicalScan(PhysicalScan { scan_tasks, .. }) => {
run_source(Arc::new(ScanTaskSource::new(scan_tasks.clone())), sender);
}
LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => {
let partitions = psets.get(&info.cache_key).expect("Cache key not found");
run_source(Arc::new(InMemorySource::new(partitions.clone())), sender);
}
LocalPhysicalPlan::Project(Project {
input, projection, ..
}) => {
let proj_op = ProjectOperator::new(projection.clone());
let next_sender = run_intermediate_op(Box::new(proj_op), sender);
run_physical_plan(input, psets, next_sender);
}
LocalPhysicalPlan::Filter(Filter {
input, predicate, ..
}) => {
let filter_op = FilterOperator::new(predicate.clone());
let next_sender = run_intermediate_op(Box::new(filter_op), sender);
run_physical_plan(input, psets, next_sender);
}
LocalPhysicalPlan::Limit(Limit {
input, num_rows, ..
}) => {
let sink = LimitSink::new(*num_rows as usize);
let sink_sender = run_single_input_sink(Box::new(sink), sender);
run_physical_plan(input, psets, sink_sender);
}
LocalPhysicalPlan::Concat(Concat { input, other, .. }) => {
let sink = ConcatSink::new();
let (left_sender, right_sender) = run_double_input_sink(Box::new(sink), sender);

run_physical_plan(input, psets, left_sender);
run_physical_plan(other, psets, right_sender);
}
LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate {
input,
aggregations,
schema,
..
}) => {
let (first_stage_aggs, second_stage_aggs, final_exprs) =
populate_aggregation_stages(aggregations, schema, &[]);

let final_stage_project = ProjectOperator::new(final_exprs);
let next_sender = run_intermediate_op(Box::new(final_stage_project), sender);

let second_stage_agg_sink = AggregateSink::new(
second_stage_aggs
.values()
.cloned()
.map(|e| Arc::new(Expr::Agg(e.clone())))
.collect(),
vec![],
);
let next_sender = run_single_input_sink(Box::new(second_stage_agg_sink), next_sender);

let first_stage_agg_op = AggregateOperator::new(
first_stage_aggs
.values()
.cloned()
.map(|e| Arc::new(Expr::Agg(e.clone())))
.collect(),
vec![],
);
let next_sender = run_intermediate_op(Box::new(first_stage_agg_op), next_sender);
run_physical_plan(input, psets, next_sender);
}
_ => {
unimplemented!("Physical plan not supported: {}", physical_plan.name());
}
}
}

0 comments on commit 474427a

Please sign in to comment.