Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change required input ordering physical plan API to allow any NULLS FIRST / LAST and ASC / DESC #5772

Merged
merged 2 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ fn init() {
mod tests {
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortRequirement,
};

use super::*;
use crate::datasource::listing::PartitionedFile;
Expand Down Expand Up @@ -1131,8 +1134,10 @@ mod tests {
}

// model that it requires the output ordering of its input
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![self.input.output_ordering()]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![self
.output_ordering()
.map(make_sort_requirements_from_exprs)]
}

fn with_new_children(
Expand Down
32 changes: 15 additions & 17 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ use crate::physical_plan::{with_new_children_if_necessary, Distribution, Executi
use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{reverse_sort_options, DataFusionError};
use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete};
use datafusion_physical_expr::utils::{
make_sort_exprs_from_requirements, ordering_satisfy,
ordering_satisfy_requirement_concrete,
};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use itertools::{concat, izip};
use std::iter::zip;
Expand Down Expand Up @@ -471,17 +474,20 @@ fn ensure_sorting(
let physical_ordering = child.output_ordering();
match (required_ordering, physical_ordering) {
(Some(required_ordering), Some(physical_ordering)) => {
let is_ordering_satisfied = ordering_satisfy_concrete(
if !ordering_satisfy_requirement_concrete(
physical_ordering,
required_ordering,
&required_ordering,
|| child.equivalence_properties(),
);
if !is_ordering_satisfied {
) {
// Make sure we preserve the ordering requirements:
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
let sort_expr = required_ordering.to_vec();
let sort_expr = make_sort_exprs_from_requirements(&required_ordering);
add_sort_above(child, sort_expr)?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
if is_sort(child) {
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
} else {
*sort_onwards = None;
}
}
if let Some(tree) = sort_onwards {
// For window expressions, we can remove some sorts when we can
Expand All @@ -497,7 +503,8 @@ fn ensure_sorting(
}
(Some(required), None) => {
// Ordering requirement is not met, we should add a `SortExec` to the plan.
add_sort_above(child, required.to_vec())?;
let sort_expr = make_sort_exprs_from_requirements(&required);
add_sort_above(child, sort_expr)?;
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
}
(None, Some(_)) => {
Expand Down Expand Up @@ -592,7 +599,6 @@ fn analyze_window_sort_removal(
};

let mut first_should_reverse = None;
let mut physical_ordering_common = vec![];
for sort_any in sort_tree.get_leaves() {
let sort_output_ordering = sort_any.output_ordering();
// Variable `sort_any` will either be a `SortExec` or a
Expand All @@ -609,11 +615,6 @@ fn analyze_window_sort_removal(
DataFusionError::Plan("A SortExec should have output ordering".to_string())
})?;
if let Some(physical_ordering) = physical_ordering {
if physical_ordering_common.is_empty()
|| physical_ordering.len() < physical_ordering_common.len()
{
physical_ordering_common = physical_ordering.to_vec();
}
let (can_skip_sorting, should_reverse) = can_skip_sort(
window_expr[0].partition_by(),
required_ordering,
Expand Down Expand Up @@ -664,15 +665,13 @@ fn analyze_window_sort_removal(
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering_common),
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
new_child,
new_schema,
partition_keys.to_vec(),
Some(physical_ordering_common),
)?) as _
};
return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
Expand Down Expand Up @@ -1889,7 +1888,6 @@ mod tests {
input.clone(),
input.schema(),
vec![],
Some(sort_exprs),
)
.unwrap(),
)
Expand Down
10 changes: 8 additions & 2 deletions datafusion/core/src/physical_plan/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ use crate::physical_plan::{
};

use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortRequirement,
};

/// join execution plan executes partitions in parallel and combines them into a set of
/// partitions.
Expand Down Expand Up @@ -225,8 +228,11 @@ impl ExecutionPlan for SortMergeJoinExec {
]
}

fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![Some(&self.left_sort_exprs), Some(&self.right_sort_exprs)]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![
Some(make_sort_requirements_from_exprs(&self.left_sort_exprs)),
Some(make_sort_requirements_from_exprs(&self.right_sort_exprs)),
]
}

fn output_partitioning(&self) -> Partitioning {
Expand Down
14 changes: 9 additions & 5 deletions datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ use hashbrown::{raw::RawTable, HashSet};

use datafusion_common::{utils::bisect, ScalarValue};
use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval};
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, PhysicalSortRequirement,
};

use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
Expand Down Expand Up @@ -399,11 +402,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
self.schema.clone()
}

fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![
Some(&self.left_required_sort_exprs),
Some(&self.right_required_sort_exprs),
]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
let left_required =
make_sort_requirements_from_exprs(&self.left_required_sort_exprs);
let right_required =
make_sort_requirements_from_exprs(&self.right_required_sort_exprs);
vec![Some(left_required), Some(right_required)]
}

fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
/// NOTE that checking `!is_empty()` does **not** check for a
/// required input ordering. Instead, the correct check is that at
/// least one entry must be `Some`
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![None; self.children().len()]
}

Expand Down Expand Up @@ -591,11 +591,11 @@ impl Distribution {

use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::{
expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
};
pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};

/// Applies an optional projection to a [`SchemaRef`], returning the
/// projected schema
Expand Down
29 changes: 0 additions & 29 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,33 +577,6 @@ impl DefaultPhysicalPlanner {

let logical_input_schema = input.schema();

let physical_sort_keys = if sort_keys.is_empty() {
None
} else {
let physical_input_schema = input_exec.schema();
let sort_keys = sort_keys
.iter()
.map(|(e, _)| match e {
Expr::Sort(expr::Sort {
expr,
asc,
nulls_first,
}) => create_physical_sort_expr(
expr,
logical_input_schema,
&physical_input_schema,
SortOptions {
descending: !*asc,
nulls_first: *nulls_first,
},
session_state.execution_props(),
),
_ => unreachable!(),
})
.collect::<Result<Vec<_>>>()?;
Some(sort_keys)
};

let physical_input_schema = input_exec.schema();
let window_expr = window_expr
.iter()
Expand All @@ -628,15 +601,13 @@ impl DefaultPhysicalPlanner {
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?)
} else {
Arc::new(WindowAggExec::try_new(
window_expr,
input_exec,
physical_input_schema,
physical_partition_keys,
physical_sort_keys,
)?)
})
}
Expand Down
14 changes: 10 additions & 4 deletions datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ use crate::physical_plan::{
Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::{
make_sort_requirements_from_exprs, EquivalenceProperties, PhysicalSortRequirement,
};

/// Sort preserving merge execution plan
///
Expand Down Expand Up @@ -125,12 +127,16 @@ impl ExecutionPlan for SortPreservingMergeExec {
vec![Distribution::UnspecifiedDistribution]
}

fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![Some(&self.expr)]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![Some(make_sort_requirements_from_exprs(&self.expr))]
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
Some(&self.expr)
self.input.output_ordering()
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
}

fn equivalence_properties(&self) -> EquivalenceProperties {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::physical_plan::windows::calc_requirements;
use datafusion_physical_expr::window::{
PartitionBatchState, PartitionBatches, PartitionKey, PartitionWindowAggStates,
WindowAggState, WindowState,
};
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
use datafusion_physical_expr::{
EquivalenceProperties, PhysicalExpr, PhysicalSortRequirement,
};
use indexmap::IndexMap;
use log::debug;

Expand All @@ -71,8 +74,6 @@ pub struct BoundedWindowAggExec {
input_schema: SchemaRef,
/// Partition Keys
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Sort Keys
pub sort_keys: Option<Vec<PhysicalSortExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}
Expand All @@ -84,7 +85,6 @@ impl BoundedWindowAggExec {
input: Arc<dyn ExecutionPlan>,
input_schema: SchemaRef,
partition_keys: Vec<Arc<dyn PhysicalExpr>>,
sort_keys: Option<Vec<PhysicalSortExpr>>,
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
Expand All @@ -94,7 +94,6 @@ impl BoundedWindowAggExec {
schema,
input_schema,
partition_keys,
sort_keys,
metrics: ExecutionPlanMetricsSet::new(),
})
}
Expand Down Expand Up @@ -123,7 +122,7 @@ impl BoundedWindowAggExec {
let mut result = vec![];
// All window exprs have the same partition by, so we just use the first one:
let partition_by = self.window_expr()[0].partition_by();
let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]);
let sort_keys = self.input.output_ordering().unwrap_or(&[]);
for item in partition_by {
if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
result.push(a.clone());
Expand Down Expand Up @@ -167,17 +166,18 @@ impl ExecutionPlan for BoundedWindowAggExec {
self.input().output_ordering()
}

fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
let sort_keys = self.sort_keys.as_deref();
vec![sort_keys]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
let partition_bys = self.window_expr()[0].partition_by();
let order_keys = self.window_expr()[0].order_by();
let requirements = calc_requirements(partition_bys, order_keys);
vec![requirements]
}

fn required_input_distribution(&self) -> Vec<Distribution> {
if self.partition_keys.is_empty() {
debug!("No partition defined for BoundedWindowAggExec!!!");
vec![Distribution::SinglePartition]
} else {
//TODO support PartitionCollections if there is no common partition columns in the window_expr
vec![Distribution::HashPartitioned(self.partition_keys.clone())]
}
}
Expand All @@ -199,7 +199,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
children[0].clone(),
self.input_schema.clone(),
self.partition_keys.clone(),
self.sort_keys.clone(),
)?))
}

Expand Down
Loading