From 5272007f364b8d43deab19f530e6d7a6ffe7b678 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Sun, 1 Sep 2024 14:59:32 +0200 Subject: [PATCH] Avoid redundant pass-by-value in physical optimizer (#12261) --- .../enforce_distribution.rs | 6 ++--- .../physical_optimizer/projection_pushdown.rs | 24 ++++++++--------- .../src/physical_optimizer/sort_pushdown.rs | 26 ++++++++++--------- .../physical_optimizer/topk_aggregation.rs | 14 +++++----- 4 files changed, 35 insertions(+), 35 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index ba6f7d0439c2..095590fe03f6 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -309,7 +309,7 @@ fn adjust_input_keys_ordering( return reorder_partitioned_join_keys( requirements, on, - vec![], + &[], &join_constructor, ) .map(Transformed::yes); @@ -373,7 +373,7 @@ fn adjust_input_keys_ordering( return reorder_partitioned_join_keys( requirements, on, - sort_options.clone(), + sort_options, &join_constructor, ) .map(Transformed::yes); @@ -421,7 +421,7 @@ fn adjust_input_keys_ordering( fn reorder_partitioned_join_keys( mut join_plan: PlanWithKeyRequirements, on: &[(PhysicalExprRef, PhysicalExprRef)], - sort_options: Vec, + sort_options: &[SortOptions], join_constructor: &F, ) -> Result where diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index b3f3f90154d0..e09332e8c36a 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -631,7 +631,7 @@ fn try_pushdown_through_hash_join( if !join_allows_pushdown( &projection_as_columns, - hash_join.schema(), + &hash_join.schema(), far_right_left_col_ind, far_left_right_col_ind, ) { @@ -662,7 +662,7 @@ fn try_pushdown_through_hash_join( }; let (new_left, new_right) = new_join_children( - projection_as_columns, + &projection_as_columns, far_right_left_col_ind, far_left_right_col_ind, hash_join.left(), @@ -700,7 +700,7 @@ fn try_swapping_with_cross_join( if !join_allows_pushdown( &projection_as_columns, - cross_join.schema(), + &cross_join.schema(), far_right_left_col_ind, far_left_right_col_ind, ) { @@ -708,7 +708,7 @@ fn try_swapping_with_cross_join( } let (new_left, new_right) = new_join_children( - projection_as_columns, + &projection_as_columns, far_right_left_col_ind, far_left_right_col_ind, cross_join.left(), @@ -740,7 +740,7 @@ fn try_swapping_with_nested_loop_join( if !join_allows_pushdown( &projection_as_columns, - nl_join.schema(), + &nl_join.schema(), far_right_left_col_ind, far_left_right_col_ind, ) { @@ -762,7 +762,7 @@ fn try_swapping_with_nested_loop_join( }; let (new_left, new_right) = new_join_children( - projection_as_columns, + &projection_as_columns, far_right_left_col_ind, far_left_right_col_ind, nl_join.left(), @@ -796,7 +796,7 @@ fn try_swapping_with_sort_merge_join( if !join_allows_pushdown( &projection_as_columns, - sm_join.schema(), + &sm_join.schema(), far_right_left_col_ind, far_left_right_col_ind, ) { @@ -813,7 +813,7 @@ fn try_swapping_with_sort_merge_join( }; let (new_left, new_right) = new_join_children( - projection_as_columns, + &projection_as_columns, far_right_left_col_ind, far_left_right_col_ind, sm_join.children()[0], @@ -850,7 +850,7 @@ fn try_swapping_with_sym_hash_join( if !join_allows_pushdown( &projection_as_columns, - sym_join.schema(), + &sym_join.schema(), far_right_left_col_ind, far_left_right_col_ind, ) { @@ -881,7 +881,7 @@ fn try_swapping_with_sym_hash_join( }; let (new_left, new_right) = new_join_children( - projection_as_columns, + &projection_as_columns, far_right_left_col_ind, far_left_right_col_ind, sym_join.left(), @@ -1243,7 +1243,7 @@ fn new_indices_for_join_filter( /// - Left or right table is not lost after the projection. fn join_allows_pushdown( projection_as_columns: &[(Column, String)], - join_schema: SchemaRef, + join_schema: &SchemaRef, far_right_left_col_ind: i32, far_left_right_col_ind: i32, ) -> bool { @@ -1260,7 +1260,7 @@ fn join_allows_pushdown( /// this function constructs the new [`ProjectionExec`]s that will come on top /// of the original children of the join. fn new_join_children( - projection_as_columns: Vec<(Column, String)>, + projection_as_columns: &[(Column, String)], far_right_left_col_ind: i32, far_left_right_col_ind: i32, left_child: &Arc, diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 9ab6802d18f1..41059ef7b1ef 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -165,7 +165,7 @@ fn pushdown_requirement_to_children( if is_window(plan) { let required_input_ordering = plan.required_input_ordering(); let request_child = required_input_ordering[0].as_deref().unwrap_or(&[]); - let child_plan = plan.children().swap_remove(0).clone(); + let child_plan = plan.children().swap_remove(0); match determine_children_requirement(parent_required, request_child, child_plan) { RequirementsCompatibility::Satisfy => { let req = (!request_child.is_empty()).then(|| request_child.to_vec()); @@ -225,7 +225,7 @@ fn pushdown_requirement_to_children( Some(JoinSide::Left) => try_pushdown_requirements_to_join( smj, parent_required, - parent_required_expr, + &parent_required_expr, JoinSide::Left, ), Some(JoinSide::Right) => { @@ -238,7 +238,7 @@ fn pushdown_requirement_to_children( try_pushdown_requirements_to_join( smj, parent_required, - new_right_required_expr, + &new_right_required_expr, JoinSide::Right, ) } @@ -321,7 +321,7 @@ fn pushdown_would_violate_requirements( fn determine_children_requirement( parent_required: LexRequirementRef, request_child: LexRequirementRef, - child_plan: Arc, + child_plan: &Arc, ) -> RequirementsCompatibility { if child_plan .equivalence_properties() @@ -344,7 +344,7 @@ fn determine_children_requirement( fn try_pushdown_requirements_to_join( smj: &SortMergeJoinExec, parent_required: LexRequirementRef, - sort_expr: Vec, + sort_expr: &[PhysicalSortExpr], push_side: JoinSide, ) -> Result>>> { let left_eq_properties = smj.left().equivalence_properties(); @@ -356,25 +356,27 @@ fn try_pushdown_requirements_to_join( let right_ordering = smj.right().output_ordering().unwrap_or(&[]); let (new_left_ordering, new_right_ordering) = match push_side { JoinSide::Left => { - let left_eq_properties = - left_eq_properties.clone().with_reorder(sort_expr.clone()); + let left_eq_properties = left_eq_properties + .clone() + .with_reorder(Vec::from(sort_expr)); if left_eq_properties .ordering_satisfy_requirement(&left_requirement.unwrap_or_default()) { // After re-ordering requirement is still satisfied - (sort_expr.as_slice(), right_ordering) + (sort_expr, right_ordering) } else { return Ok(None); } } JoinSide::Right => { - let right_eq_properties = - right_eq_properties.clone().with_reorder(sort_expr.clone()); + let right_eq_properties = right_eq_properties + .clone() + .with_reorder(Vec::from(sort_expr)); if right_eq_properties .ordering_satisfy_requirement(&right_requirement.unwrap_or_default()) { // After re-ordering requirement is still satisfied - (left_ordering, sort_expr.as_slice()) + (left_ordering, sort_expr) } else { return Ok(None); } @@ -397,7 +399,7 @@ fn try_pushdown_requirements_to_join( let should_pushdown = smj_eqs.ordering_satisfy_requirement(parent_required); Ok(should_pushdown.then(|| { let mut required_input_ordering = smj.required_input_ordering(); - let new_req = Some(PhysicalSortRequirement::from_sort_exprs(&sort_expr)); + let new_req = Some(PhysicalSortRequirement::from_sort_exprs(sort_expr)); match push_side { JoinSide::Left => { required_input_ordering[0] = new_req; diff --git a/datafusion/core/src/physical_optimizer/topk_aggregation.rs b/datafusion/core/src/physical_optimizer/topk_aggregation.rs index 82cf44ad7796..b2c8f640fda0 100644 --- a/datafusion/core/src/physical_optimizer/topk_aggregation.rs +++ b/datafusion/core/src/physical_optimizer/topk_aggregation.rs @@ -84,7 +84,7 @@ impl TopKAggregation { Some(Arc::new(new_aggr)) } - fn transform_sort(plan: Arc) -> Option> { + fn transform_sort(plan: &Arc) -> Option> { let sort = plan.as_any().downcast_ref::()?; let children = sort.children(); @@ -142,13 +142,11 @@ impl PhysicalOptimizerRule for TopKAggregation { ) -> Result> { if config.optimizer.enable_topk_aggregation { plan.transform_down(|plan| { - Ok( - if let Some(plan) = TopKAggregation::transform_sort(plan.clone()) { - Transformed::yes(plan) - } else { - Transformed::no(plan) - }, - ) + Ok(if let Some(plan) = TopKAggregation::transform_sort(&plan) { + Transformed::yes(plan) + } else { + Transformed::no(plan) + }) }) .data() } else {