From ccb372cf6fff09fac53447e0883e82671d20e644 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Fri, 17 Sep 2021 14:33:55 +0200 Subject: [PATCH] [fix] removed parallelization --- datafusion/src/physical_plan/planner.rs | 22 ++++------------------ 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 751ed41f1270..7f3e5bbc2082 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -309,9 +309,6 @@ impl DefaultPhysicalPlanner { async move { let batch_size = ctx_state.config.batch_size; - // TODO make this configurable - let parallelism = 4; - let exec_plan: Result> = match logical_plan { LogicalPlan::TableScan { source, @@ -599,13 +596,8 @@ impl DefaultPhysicalPlanner { Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?) ) } LogicalPlan::Union { inputs, .. } => { - let physical_plan_futures = inputs - .iter() - .map(|input| self.create_initial_plan(input, ctx_state)) - .collect::>(); - - let physical_plans = futures::stream::iter(physical_plan_futures) - .buffered(parallelism) + let physical_plans = futures::stream::iter(inputs) + .then(|lp| self.create_initial_plan(lp, ctx_state)) .try_collect::>() .await?; Ok(Arc::new(UnionExec::new(physical_plans)) ) @@ -784,14 +776,8 @@ impl DefaultPhysicalPlanner { Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema))) } LogicalPlan::Extension { node } => { - let physical_inputs_futures = node - .inputs() - .into_iter() - .map(|input_plan| self.create_initial_plan(input_plan, ctx_state)) - .collect::>(); - - let physical_inputs = futures::stream::iter(physical_inputs_futures) - .buffered(parallelism) + let physical_inputs = futures::stream::iter(node.inputs()) + .then(|lp| self.create_initial_plan(lp, ctx_state)) .try_collect::>() .await?;