Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid redundant pass-by-value in physical optimizer #12261

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ fn adjust_input_keys_ordering(
return reorder_partitioned_join_keys(
requirements,
on,
vec![],
&[],
&join_constructor,
)
.map(Transformed::yes);
Expand Down Expand Up @@ -373,7 +373,7 @@ fn adjust_input_keys_ordering(
return reorder_partitioned_join_keys(
requirements,
on,
sort_options.clone(),
sort_options,
&join_constructor,
)
.map(Transformed::yes);
Expand Down Expand Up @@ -421,7 +421,7 @@ fn adjust_input_keys_ordering(
fn reorder_partitioned_join_keys<F>(
mut join_plan: PlanWithKeyRequirements,
on: &[(PhysicalExprRef, PhysicalExprRef)],
sort_options: Vec<SortOptions>,
sort_options: &[SortOptions],
join_constructor: &F,
) -> Result<PlanWithKeyRequirements>
where
Expand Down
24 changes: 12 additions & 12 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ fn try_pushdown_through_hash_join(

if !join_allows_pushdown(
&projection_as_columns,
hash_join.schema(),
&hash_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
Expand Down Expand Up @@ -662,7 +662,7 @@ fn try_pushdown_through_hash_join(
};

let (new_left, new_right) = new_join_children(
projection_as_columns,
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
hash_join.left(),
Expand Down Expand Up @@ -700,15 +700,15 @@ fn try_swapping_with_cross_join(

if !join_allows_pushdown(
&projection_as_columns,
cross_join.schema(),
&cross_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
return Ok(None);
}

let (new_left, new_right) = new_join_children(
projection_as_columns,
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
cross_join.left(),
Expand Down Expand Up @@ -740,7 +740,7 @@ fn try_swapping_with_nested_loop_join(

if !join_allows_pushdown(
&projection_as_columns,
nl_join.schema(),
&nl_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
Expand All @@ -762,7 +762,7 @@ fn try_swapping_with_nested_loop_join(
};

let (new_left, new_right) = new_join_children(
projection_as_columns,
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
nl_join.left(),
Expand Down Expand Up @@ -796,7 +796,7 @@ fn try_swapping_with_sort_merge_join(

if !join_allows_pushdown(
&projection_as_columns,
sm_join.schema(),
&sm_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
Expand All @@ -813,7 +813,7 @@ fn try_swapping_with_sort_merge_join(
};

let (new_left, new_right) = new_join_children(
projection_as_columns,
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
sm_join.children()[0],
Expand Down Expand Up @@ -850,7 +850,7 @@ fn try_swapping_with_sym_hash_join(

if !join_allows_pushdown(
&projection_as_columns,
sym_join.schema(),
&sym_join.schema(),
far_right_left_col_ind,
far_left_right_col_ind,
) {
Expand Down Expand Up @@ -881,7 +881,7 @@ fn try_swapping_with_sym_hash_join(
};

let (new_left, new_right) = new_join_children(
projection_as_columns,
&projection_as_columns,
far_right_left_col_ind,
far_left_right_col_ind,
sym_join.left(),
Expand Down Expand Up @@ -1243,7 +1243,7 @@ fn new_indices_for_join_filter(
/// - Left or right table is not lost after the projection.
fn join_allows_pushdown(
projection_as_columns: &[(Column, String)],
join_schema: SchemaRef,
join_schema: &SchemaRef,
far_right_left_col_ind: i32,
far_left_right_col_ind: i32,
) -> bool {
Expand All @@ -1260,7 +1260,7 @@ fn join_allows_pushdown(
/// this function constructs the new [`ProjectionExec`]s that will come on top
/// of the original children of the join.
fn new_join_children(
projection_as_columns: Vec<(Column, String)>,
projection_as_columns: &[(Column, String)],
far_right_left_col_ind: i32,
far_left_right_col_ind: i32,
left_child: &Arc<dyn ExecutionPlan>,
Expand Down
26 changes: 14 additions & 12 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ fn pushdown_requirement_to_children(
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
let request_child = required_input_ordering[0].as_deref().unwrap_or(&[]);
let child_plan = plan.children().swap_remove(0).clone();
let child_plan = plan.children().swap_remove(0);
match determine_children_requirement(parent_required, request_child, child_plan) {
RequirementsCompatibility::Satisfy => {
let req = (!request_child.is_empty()).then(|| request_child.to_vec());
Expand Down Expand Up @@ -225,7 +225,7 @@ fn pushdown_requirement_to_children(
Some(JoinSide::Left) => try_pushdown_requirements_to_join(
smj,
parent_required,
parent_required_expr,
&parent_required_expr,
JoinSide::Left,
),
Some(JoinSide::Right) => {
Expand All @@ -238,7 +238,7 @@ fn pushdown_requirement_to_children(
try_pushdown_requirements_to_join(
smj,
parent_required,
new_right_required_expr,
&new_right_required_expr,
JoinSide::Right,
)
}
Expand Down Expand Up @@ -321,7 +321,7 @@ fn pushdown_would_violate_requirements(
fn determine_children_requirement(
parent_required: LexRequirementRef,
request_child: LexRequirementRef,
child_plan: Arc<dyn ExecutionPlan>,
child_plan: &Arc<dyn ExecutionPlan>,
) -> RequirementsCompatibility {
if child_plan
.equivalence_properties()
Expand All @@ -344,7 +344,7 @@ fn determine_children_requirement(
fn try_pushdown_requirements_to_join(
smj: &SortMergeJoinExec,
parent_required: LexRequirementRef,
sort_expr: Vec<PhysicalSortExpr>,
sort_expr: &[PhysicalSortExpr],
push_side: JoinSide,
) -> Result<Option<Vec<Option<LexRequirement>>>> {
let left_eq_properties = smj.left().equivalence_properties();
Expand All @@ -356,25 +356,27 @@ fn try_pushdown_requirements_to_join(
let right_ordering = smj.right().output_ordering().unwrap_or(&[]);
let (new_left_ordering, new_right_ordering) = match push_side {
JoinSide::Left => {
let left_eq_properties =
left_eq_properties.clone().with_reorder(sort_expr.clone());
let left_eq_properties = left_eq_properties
.clone()
.with_reorder(Vec::from(sort_expr));
if left_eq_properties
.ordering_satisfy_requirement(&left_requirement.unwrap_or_default())
{
// After re-ordering requirement is still satisfied
(sort_expr.as_slice(), right_ordering)
(sort_expr, right_ordering)
} else {
return Ok(None);
}
}
JoinSide::Right => {
let right_eq_properties =
right_eq_properties.clone().with_reorder(sort_expr.clone());
let right_eq_properties = right_eq_properties
.clone()
.with_reorder(Vec::from(sort_expr));
if right_eq_properties
.ordering_satisfy_requirement(&right_requirement.unwrap_or_default())
{
// After re-ordering requirement is still satisfied
(left_ordering, sort_expr.as_slice())
(left_ordering, sort_expr)
} else {
return Ok(None);
}
Expand All @@ -397,7 +399,7 @@ fn try_pushdown_requirements_to_join(
let should_pushdown = smj_eqs.ordering_satisfy_requirement(parent_required);
Ok(should_pushdown.then(|| {
let mut required_input_ordering = smj.required_input_ordering();
let new_req = Some(PhysicalSortRequirement::from_sort_exprs(&sort_expr));
let new_req = Some(PhysicalSortRequirement::from_sort_exprs(sort_expr));
match push_side {
JoinSide::Left => {
required_input_ordering[0] = new_req;
Expand Down
14 changes: 6 additions & 8 deletions datafusion/core/src/physical_optimizer/topk_aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl TopKAggregation {
Some(Arc::new(new_aggr))
}

fn transform_sort(plan: Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
fn transform_sort(plan: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
let sort = plan.as_any().downcast_ref::<SortExec>()?;

let children = sort.children();
Expand Down Expand Up @@ -142,13 +142,11 @@ impl PhysicalOptimizerRule for TopKAggregation {
) -> Result<Arc<dyn ExecutionPlan>> {
if config.optimizer.enable_topk_aggregation {
plan.transform_down(|plan| {
Ok(
if let Some(plan) = TopKAggregation::transform_sort(plan.clone()) {
Transformed::yes(plan)
} else {
Transformed::no(plan)
},
)
Ok(if let Some(plan) = TopKAggregation::transform_sort(&plan) {
Transformed::yes(plan)
} else {
Transformed::no(plan)
})
})
.data()
} else {
Expand Down