Skip to content

Commit

Permalink
Leverage need_data_exchange of ExecutionPlan
Browse files Browse the repository at this point in the history
  • Loading branch information
kyotoYaho committed Dec 13, 2022
1 parent a22916f commit 9dbcedc
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 78 deletions.
148 changes: 71 additions & 77 deletions ballista/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//!
//! This code is EXPERIMENTAL and still under development

use datafusion::common::DataFusionError;
use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -27,9 +28,7 @@ use ballista_core::{
execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec},
serde::scheduler::PartitionLocation,
};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::repartition::RepartitionExec;
use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::{
with_new_children_if_necessary, ExecutionPlan, Partitioning,
Expand Down Expand Up @@ -99,87 +98,82 @@ impl DistributedPlanner {
stages.append(&mut child_stages);
}

if let Some(_coalesce) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
} else if let Some(_sort_preserving_merge) = execution_plan
.as_any()
.downcast_ref::<SortPreservingMergeExec>(
) {
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(execution_plan, vec![unresolved_shuffle])?,
stages,
))
if execution_plan.need_data_exchange() {
// RepartitionExec is kind of a placeholder, we need to replace it with a shuffle_writer and unresolved_shuffle;
// While for other cases, we need to insert a shuffle_writer and unresolved_shuffle
if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
match repart.output_partitioning() {
Partitioning::Hash(_, _) => {
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
Some(repart.partitioning().to_owned()),
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((unresolved_shuffle, stages))
}
_ => Err(BallistaError::DataFusionError(
DataFusionError::NotImplemented(format!(
"Partitioning policy {:?} is not supported when data exchange is needed",
repart.output_partitioning()
)),
)),
}
} else {
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
None,
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((
with_new_children_if_necessary(
execution_plan,
vec![unresolved_shuffle],
)?,
stages,
))
}
} else if let Some(repart) =
execution_plan.as_any().downcast_ref::<RepartitionExec>()
{
match repart.output_partitioning() {
Partitioning::Hash(_, _) => {
let shuffle_writer = create_shuffle_writer(
job_id,
self.next_stage_id(),
children[0].clone(),
Some(repart.partitioning().to_owned()),
)?;
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
shuffle_writer.stage_id(),
shuffle_writer.schema(),
shuffle_writer.output_partitioning().partition_count(),
shuffle_writer
.shuffle_output_partitioning()
.map(|p| p.partition_count())
.unwrap_or_else(|| {
shuffle_writer.output_partitioning().partition_count()
}),
));
stages.push(shuffle_writer);
Ok((unresolved_shuffle, stages))
}
_ => {
// remove any non-hash repartition from the distributed plan
Partitioning::RoundRobinBatch(_) => {
// remove round robin repartition from the distributed plan
Ok((children[0].clone(), stages))
}
_ => Err(BallistaError::DataFusionError(
DataFusionError::NotImplemented(format!(
"Partitioning policy {:?} is not supported when data exchange is not needed",
repart.output_partitioning()
)),
)),
}
} else if let Some(window) =
execution_plan.as_any().downcast_ref::<WindowAggExec>()
Expand Down
2 changes: 1 addition & 1 deletion dependencies/arrow-datafusion
Submodule arrow-datafusion updated from 3b3b36 to 259855

0 comments on commit 9dbcedc

Please sign in to comment.