Skip to content

Commit

Permalink
refactor OrderPreservationContext using `TreeNode.transform_with_pa…
Browse files Browse the repository at this point in the history
…yload()`
  • Loading branch information
peter-toth committed Jan 2, 2024
1 parent 5d1f644 commit 269d8ba
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 243 deletions.
41 changes: 26 additions & 15 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,29 @@ pub trait TreeNode: Sized + Clone {
/// (bottom-up) traversals. The `f_down` and `f_up` closures take payloads that they
/// propagate down and up during the transformation.
///
/// The `f_down` closure takes `FD` type payload from its parent and returns `Vec<FD>`
/// type payload to propagate down to its children. One `FD` element is propagated
/// down to each child.
/// The `f_down` closure takes
/// -`PD` type payload from its parent
/// and returns
/// - `PC` type payload to pass to `f_up` and
/// - `Vec<FD>` type payload to propagate down to its children
/// (one `FD` element is propagated down to each child).
///
/// The `f_up` closure takes `FU` type payload from its children collected into a
/// `Vec<FU>` and returns `FU` type payload to propagate up to its parent.
fn transform_with_payload<FD, PD, FU, PU>(
/// The `f_up` closure takes
/// - `PC` type payload from `f_down` and
/// - `Vec<PU>` type payload collected from its children
/// and returns
/// - `FU` type payload to propagate up to its parent.
fn transform_with_payload<FD, PD, PC, FU, PU>(
self,
f_down: &mut FD,
payload_down: PD,
f_up: &mut FU,
) -> Result<(Self, PU)>
where
FD: FnMut(Self, PD) -> Result<(Transformed<Self>, Vec<PD>)>,
FU: FnMut(Self, Vec<PU>) -> Result<(Transformed<Self>, PU)>,
FD: FnMut(Self, PD) -> Result<(Transformed<Self>, Vec<PD>, PC)>,
FU: FnMut(Self, PC, Vec<PU>) -> Result<(Transformed<Self>, PU)>,
{
let (new_node, new_payload_down) = f_down(self, payload_down)?;
let (new_node, new_payload_down, payload_current) = f_down(self, payload_down)?;
let mut new_payload_down_iter = new_payload_down.into_iter();
let mut payload_up = vec![];
let node_with_new_children = new_node.into().map_children(|node| {
Expand All @@ -150,7 +156,8 @@ pub trait TreeNode: Sized + Clone {
payload_up.push(p);
Ok(new_node)
})?;
let (new_node, new_payload_up) = f_up(node_with_new_children, payload_up)?;
let (new_node, new_payload_up) =
f_up(node_with_new_children, payload_current, payload_up)?;
Ok((new_node.into(), new_payload_up))
}

Expand Down Expand Up @@ -179,9 +186,11 @@ pub trait TreeNode: Sized + Clone {
/// Transforms the tree using `f` pre-preorder (top-down) traversal. The `f_down`
/// closure takes payloads that it propagates down during the transformation.
///
/// The `f_down` closure takes `FD` type payload from its parent and returns `Vec<FD>`
/// type payload to propagate down to its children. One `FD` element is propagated
/// down to each child.
/// The `f_down` closure takes
/// - `P` type payload from its parent
/// and returns
/// - `Vec<P>` type payload to propagate down to its children
/// (one `P` element is propagated down to each child).
fn transform_down_with_payload<F, P>(self, f: &mut F, payload: P) -> Result<Self>
where
F: FnMut(Self, P) -> Result<(Transformed<Self>, Vec<P>)>,
Expand Down Expand Up @@ -222,8 +231,10 @@ pub trait TreeNode: Sized + Clone {
/// Transforms the tree using `f_up` post-order traversal. The `f_up` closure takes
/// payloads that it propagates up during the transformation.
///
/// The `f_up` closure takes `FU` type payload from its children collected into a
/// `Vec<FU>` and returns `FU` type payload to propagate up to its parent.
/// The `f_up` closure takes
/// - `Vec<P>` type payload collected from its children
/// and returns
/// - `P` type payload to propagate up to its parent.
fn transform_up_with_payload<F, P>(self, f: &mut F) -> Result<(Self, P)>
where
F: FnMut(Self, Vec<P>) -> Result<(Transformed<Self>, P)>,
Expand Down
23 changes: 15 additions & 8 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::replace_with_order_preserving_variants::{
replace_with_order_preserving_variants, OrderPreservationContext,
propagate_order_maintaining_connections_down,
replace_with_order_preserving_variants_up,
};
use crate::physical_optimizer::sort_pushdown::pushdown_requirement_to_children;
use crate::physical_optimizer::utils::{
Expand Down Expand Up @@ -275,20 +276,26 @@ impl PhysicalOptimizerRule for EnforceSorting {
adjusted.plan
};

let plan_with_pipeline_fixer = OrderPreservationContext::new(new_plan);
let updated_plan =
plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| {
replace_with_order_preserving_variants(
plan_with_pipeline_fixer,
let (updated_plan, _) = new_plan.transform_with_payload(
&mut |plan, ordering_connection| {
propagate_order_maintaining_connections_down(plan, ordering_connection)
},
false,
&mut |plan, ordering_connection, order_preserving_children| {
replace_with_order_preserving_variants_up(
plan,
ordering_connection,
order_preserving_children,
false,
true,
config,
)
})?;
},
)?;

// Execute a top-down traversal to exploit sort push-down opportunities
// missed by the bottom-up traversal:
let plan = updated_plan.plan.transform_down_with_payload(
let plan = updated_plan.transform_down_with_payload(
&mut |mut plan, required_ordering: Option<Vec<PhysicalSortRequirement>>| {
let parent_required = required_ordering.as_deref().unwrap_or(&[]);
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
Expand Down
Loading

0 comments on commit 269d8ba

Please sign in to comment.