Skip to content

Commit

Permalink
[fix] removed parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Sep 17, 2021
1 parent b6de17d commit ccb372c
Showing 1 changed file with 4 additions and 18 deletions.
22 changes: 4 additions & 18 deletions datafusion/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,6 @@ impl DefaultPhysicalPlanner {
async move {
let batch_size = ctx_state.config.batch_size;

// TODO make this configurable
let parallelism = 4;

let exec_plan: Result<Arc<dyn ExecutionPlan>> = match logical_plan {
LogicalPlan::TableScan {
source,
Expand Down Expand Up @@ -599,13 +596,8 @@ impl DefaultPhysicalPlanner {
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?) )
}
LogicalPlan::Union { inputs, .. } => {
let physical_plan_futures = inputs
.iter()
.map(|input| self.create_initial_plan(input, ctx_state))
.collect::<Vec<_>>();

let physical_plans = futures::stream::iter(physical_plan_futures)
.buffered(parallelism)
let physical_plans = futures::stream::iter(inputs)
.then(|lp| self.create_initial_plan(lp, ctx_state))
.try_collect::<Vec<_>>()
.await?;
Ok(Arc::new(UnionExec::new(physical_plans)) )
Expand Down Expand Up @@ -784,14 +776,8 @@ impl DefaultPhysicalPlanner {
Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema)))
}
LogicalPlan::Extension { node } => {
let physical_inputs_futures = node
.inputs()
.into_iter()
.map(|input_plan| self.create_initial_plan(input_plan, ctx_state))
.collect::<Vec<_>>();

let physical_inputs = futures::stream::iter(physical_inputs_futures)
.buffered(parallelism)
let physical_inputs = futures::stream::iter(node.inputs())
.then(|lp| self.create_initial_plan(lp, ctx_state))
.try_collect::<Vec<_>>()
.await?;

Expand Down

0 comments on commit ccb372c

Please sign in to comment.