Skip to content

Commit

Permalink
Enhanced Pipeline Execution: Now Supporting Complex Query Plans for I…
Browse files Browse the repository at this point in the history
…mproved Performance (#132)

* Very initial test passing algorithm

* Working except a minor bug in interval calculations

* After clippy

* Plan

* initial implemantation

* Before prune check ability is added.

Order equivalence implementations will vanish after we send a seperate PR

* minor changes

* Fix bug, ordering equivalence random head

* minor changes

* Add ordering equivalence for sort merge join

* Improvement on tests

* Upstream changes

* Add ordering equivalence for sort merge join

* Fmt issues

* Update comment

* Add ordering equivalence support for hash join

* Make 1 file

* Code enhancements/comment improvements

* Add projection cast handling

* Fix output ordering for sort merge join

* projection bug fix

* Minor changes

* minor changes

* simplify sort_merge_join

* Update equivalence implementation

* Update test_utils.rs

* Update cast implementation

* More idiomatic code

* After merge

* Comments visisted

* Add key swap according to the children orders

* Refactoring

* After merge refactor

* Update sort_enforcement.rs

* Update datafusion/core/src/physical_optimizer/join_selection.rs

Co-authored-by: Mustafa Akur <[email protected]>

* Comments are applied

* Feature/determine prunability (#139)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

* Determine prunability of tables for join operations (#90)

* ready to review

* license added

* simplifications

* simplifications

* sort expr's are taken separately for each table

* we can return the sort info of the expression now

* check filter conditions

* simplifications

* simplifications

* functions are implemented for SortInfo calculations

* node specialized tableSide functions

* NotImplemented errors are added, test comments are added

* Comment change

* Simplify comparison node calculations

* Simplfications and better commenting

* is_prunable function is updated with new Prunability function

* Indices of sort expressions are updated with intermediate schema columns of the filter

* Unused function is removed

* Future-proof index updating

* An if let check is removed

* simplifications

* Simplifications

* simplifications

* Change if condition

---------

Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* fix the tables' unboundedness

---------

Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>

* Comment improvements and minor code improvements

* Splitting the order based join selection

* Update rat_exclude_files.txt

* Revert "Feature/determine prunability (#139)"

This reverts commit cf56105.

* Commented

---------

Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
Co-authored-by: Mustafa Akur <[email protected]>
Co-authored-by: Berkay Şahin <[email protected]>
  • Loading branch information
5 people committed Dec 15, 2023
1 parent 0187de8 commit 4abaa79
Show file tree
Hide file tree
Showing 10 changed files with 1,922 additions and 18 deletions.
1,365 changes: 1,365 additions & 0 deletions datafusion/core/src/physical_optimizer/join_pipeline_selection.rs

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ fn supports_swap(join_type: JoinType) -> bool {

/// This function returns the new join type we get after swapping the given
/// join's inputs.
fn swap_join_type(join_type: JoinType) -> JoinType {
pub(crate) fn swap_join_type(join_type: JoinType) -> JoinType {
match join_type {
JoinType::Inner => JoinType::Inner,
JoinType::Full => JoinType::Full,
Expand Down Expand Up @@ -177,7 +177,7 @@ fn swap_hash_join(
/// the output should not be impacted. This function creates the expressions
/// that will allow to swap back the values from the original left as the first
/// columns and those on the right next.
fn swap_reverting_projection(
pub(crate) fn swap_reverting_projection(
left_schema: &Schema,
right_schema: &Schema,
) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
Expand Down Expand Up @@ -227,8 +227,16 @@ impl PhysicalOptimizerRule for JoinSelection {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// First, we inspect all joins in the plan and make necessary modifications
// to preserve output orderings when necessary:
let plan_with_hash_joins = crate::physical_optimizer::join_pipeline_selection::PlanWithCorrespondingHashJoin::new(plan);
let state = plan_with_hash_joins
.transform_up(&|p| crate::physical_optimizer::join_pipeline_selection::select_joins_to_preserve_order_subrule(p, config))?;
// Finalize the order-preserving join selection procedure by inspecting
// the root of the plan tree:
let plan = crate::physical_optimizer::join_pipeline_selection::finalize_order_preserving_joins_at_root(state, config)?;
let pipeline = PipelineStatePropagator::new(plan);
// First, we make pipeline-fixing modifications to joins so as to accommodate
// Next, we make pipeline-fixing modifications to joins so as to accommodate
// unbounded inputs. Each pipeline-fixing subrule, which is a function
// of type `PipelineFixerSubrule`, takes a single [`PipelineStatePropagator`]
// argument storing state variables that indicate the unboundedness status
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ mod sort_pushdown;
pub mod topk_aggregation;
mod utils;

mod join_pipeline_selection;
#[cfg(test)]
pub mod test_utils;

Expand Down
107 changes: 103 additions & 4 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,25 @@ use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGro
use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::joins::utils::{JoinFilter, JoinOn};
use crate::physical_plan::joins::utils::{ColumnIndex, JoinFilter, JoinOn};
use crate::physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec};
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::streaming::StreamingTableExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::create_window_expr;
use crate::physical_plan::{ExecutionPlan, InputOrderMode, Partitioning};
use crate::prelude::{CsvReadOptions, SessionContext};

use arrow_schema::{Schema, SchemaRef, SortOptions};
use datafusion_common::{JoinType, Statistics};
use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
use datafusion_common::{JoinType, ScalarValue, Statistics};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
use datafusion_expr::{AggregateFunction, Operator, WindowFrame, WindowFunction};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::intervals::test_utils::gen_conjunctive_numerical_expr;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};

use async_trait::async_trait;
Expand Down Expand Up @@ -357,3 +359,100 @@ pub fn sort_exec(
let sort_exprs = sort_exprs.into_iter().collect();
Arc::new(SortExec::new(sort_exprs, input))
}

pub fn prunable_filter(left_index: ColumnIndex, right_index: ColumnIndex) -> JoinFilter {
// Filter columns, ensure first batches will have matching rows.
let intermediate_schema = Schema::new(vec![
Field::new("0", DataType::Int32, true),
Field::new("1", DataType::Int32, true),
]);
let column_indices = vec![left_index, right_index];
let filter_expr = gen_conjunctive_numerical_expr(
col("0", &intermediate_schema).unwrap(),
col("1", &intermediate_schema).unwrap(),
(
Operator::Plus,
Operator::Minus,
Operator::Plus,
Operator::Plus,
),
ScalarValue::Int32(Some(0)),
ScalarValue::Int32(Some(3)),
ScalarValue::Int32(Some(0)),
ScalarValue::Int32(Some(3)),
(Operator::Gt, Operator::Lt),
);
JoinFilter::new(filter_expr, column_indices, intermediate_schema)
}

pub fn memory_exec_with_sort(
schema: &SchemaRef,
sort: Option<Vec<PhysicalSortExpr>>,
) -> Arc<dyn ExecutionPlan> {
let mem = MemoryExec::try_new(&[], schema.clone(), None).unwrap();
Arc::new(if let Some(sort) = sort {
mem.with_sort_information(sort)
} else {
mem
})
}

pub fn streaming_table_exec(
schema: &SchemaRef,
sort: Option<Vec<PhysicalSortExpr>>,
) -> Arc<dyn ExecutionPlan> {
Arc::new(
StreamingTableExec::try_new(schema.clone(), vec![], None, sort, true).unwrap(),
)
}

#[macro_export]
macro_rules! assert_optimized_orthogonal {
($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => {
let session_ctx = SessionContext::new();
let state = session_ctx.state();

let physical_plan = $PLAN;
let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();

let expected_plan_lines: Vec<&str> = $EXPECTED_PLAN_LINES
.iter().map(|s| *s).collect();

assert_eq!(
expected_plan_lines, actual,
"\n**Original Plan Mismatch\n\nexpected:\n\n{expected_plan_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);

let expected_optimized_lines: Vec<&str> = $EXPECTED_OPTIMIZED_PLAN_LINES
.iter().map(|s| *s).collect();
//
// Run JoinSelection - EnforceSorting
let optimized_physical_plan = JoinSelection::new().optimize(physical_plan.clone(), state.config_options())?;
let optimized_physical_plan =
EnforceSorting::new().optimize(optimized_physical_plan, state.config_options())?;

assert_eq!(physical_plan.schema(), optimized_physical_plan.schema());

// Get string representation of the plan
let actual = get_plan_string(&optimized_physical_plan);
assert_eq!(
expected_optimized_lines, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);
// Run EnforceSorting - JoinSelection
let optimized_physical_plan_2 =
EnforceSorting::new().optimize(physical_plan.clone(), state.config_options())?;
let optimized_physical_plan_2 = JoinSelection::new().optimize(optimized_physical_plan_2.clone(), state.config_options())?;

assert_eq!(physical_plan.schema(), optimized_physical_plan_2.schema());

// Get string representation of the plan
let actual = get_plan_string(&optimized_physical_plan_2);
assert_eq!(
expected_optimized_lines, actual,
"\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n"
);

};
}
14 changes: 13 additions & 1 deletion datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt::Formatter;
use std::sync::Arc;

use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::joins::HashJoinExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
Expand Down Expand Up @@ -144,7 +145,6 @@ pub fn is_sort_preserving_merge(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_coalesce_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<CoalescePartitionsExec>()
}

/// Checks whether the given operator is a [`UnionExec`].
pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<UnionExec>()
Expand All @@ -154,3 +154,15 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RepartitionExec>()
}

/// Checks whether the given operator is a [`HashJoinExec`].
pub fn is_hash_join(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<HashJoinExec>()
}

/// Utility function yielding a string representation of the given [`ExecutionPlan`].
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
let actual: Vec<&str> = formatted.trim().lines().collect();
actual.iter().map(|elem| elem.to_string()).collect()
}
90 changes: 90 additions & 0 deletions datafusion/physical-expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,96 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> Result<ArrayRef> {
let data = mutable.freeze();
Ok(make_array(data))
}
/// This function attempts to find a full match between required and provided
/// sorts, returning the indices and sort options of the matches found.
///
/// First, it normalizes the sort requirements and then checks for matches.
/// If no full match is found, it then checks against ordering equivalence properties.
/// If still no full match is found, it returns `None`.
pub fn get_indices_of_matching_sort_exprs_with_order_eq<
F: Fn() -> EquivalenceProperties,
F2: Fn() -> OrderingEquivalenceProperties,
>(
provided_sorts: &[PhysicalSortExpr],
required_columns: &[Column],
equal_properties: F,
ordering_equal_properties: F2,
) -> Option<(Vec<SortOptions>, Vec<usize>)> {
// Transform the required columns into a vector of Arc<PhysicalExpr>:
let required_exprs = required_columns
.iter()
.map(|required_column| Arc::new(required_column.clone()) as _)
.collect::<Vec<Arc<dyn PhysicalExpr>>>();

// Create a vector of `PhysicalSortRequirement`s from the required expressions:
let sort_requirement_on_requirements = required_exprs
.iter()
.map(|required_expr| PhysicalSortRequirement {
expr: required_expr.clone(),
options: None,
})
.collect::<Vec<_>>();

let order_eq_properties = ordering_equal_properties();
let eq_properties = equal_properties();

let normalized_required = normalize_sort_requirements(
&sort_requirement_on_requirements,
eq_properties.classes(),
&[],
);
let normalized_provided_requirements = normalize_sort_requirements(
&PhysicalSortRequirement::from_sort_exprs(provided_sorts.iter()),
eq_properties.classes(),
&[],
);

let provided_sorts = normalized_provided_requirements
.iter()
.map(|req| req.expr.clone())
.collect::<Vec<_>>();

let normalized_required_expr = normalized_required
.iter()
.map(|req| req.expr.clone())
.collect::<Vec<_>>();

let indices_of_equality =
get_indices_of_exprs_strict(&normalized_required_expr, &provided_sorts);
// If we found all the expressions, return early:
if indices_of_equality.len() == normalized_required_expr.len() {
return Some((
indices_of_equality
.iter()
.filter_map(|index| normalized_provided_requirements[*index].options)
.collect(),
indices_of_equality,
));
}

// We did not find all the expressions, consult ordering equivalence properties:
for class in order_eq_properties.classes() {
let head = class.head();
for ordering in class.others().iter().chain(std::iter::once(head)) {
let order_eq_class_exprs = convert_to_expr(ordering);
let indices_of_equality = get_indices_of_exprs_strict(
&normalized_required_expr,
&order_eq_class_exprs,
);
if indices_of_equality.len() == normalized_required_expr.len() {
return Some((
indices_of_equality
.iter()
.map(|index| ordering[*index].options)
.collect::<Vec<_>>(),
indices_of_equality,
));
}
}
}
// If no match found, return `None`:
None
}

/// Merge left and right sort expressions, checking for duplicates.
pub fn merge_vectors(
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/joins/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ impl ExecutionPlan for SortMergeJoinExec {
]
}

fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
Ok(children.iter().any(|c| *c))
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![
Some(PhysicalSortRequirement::from_sort_exprs(
Expand Down
Loading

0 comments on commit 4abaa79

Please sign in to comment.