From f30093029e2064beef955ee845a9b19485bff507 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 23 Sep 2024 09:04:23 -0400 Subject: [PATCH] Improve documentation and add Display impl to EquivalenceProperties --- .../sort_preserving_repartition_fuzz.rs | 2 +- .../physical-expr-common/src/physical_expr.rs | 24 ++- .../physical-expr/src/equivalence/class.rs | 35 +++++ .../physical-expr/src/equivalence/mod.rs | 2 +- .../physical-expr/src/equivalence/ordering.rs | 26 +++- .../src/equivalence/properties.rs | 138 +++++++++++++++--- datafusion/physical-plan/src/filter.rs | 4 +- datafusion/physical-plan/src/windows/mod.rs | 2 +- 8 files changed, 200 insertions(+), 33 deletions(-) diff --git a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs index ceae13a469f0..408cadc35f48 100644 --- a/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/sort_preserving_repartition_fuzz.rs @@ -80,7 +80,7 @@ mod sp_repartition_fuzz_tests { // Define a and f are aliases eq_properties.add_equal_conditions(col_a, col_f)?; // Column e has constant value. - eq_properties = eq_properties.add_constants([ConstExpr::from(col_e)]); + eq_properties = eq_properties.with_constants([ConstExpr::from(col_e)]); // Randomly order columns for sorting let mut rng = StdRng::seed_from_u64(seed); diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index a443a65eaa8f..cc725cf2cefb 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::fmt::{Debug, Display}; +use std::fmt::{Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -223,3 +223,25 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any { any } } + +/// Returns [`Display`] able a list of [`PhysicalExpr`] +/// +/// Example output: `[a + 1, b]` +pub fn format_physical_expr_list(exprs: &[Arc]) -> impl Display + '_ { + struct DisplayWrapper<'a>(&'a [Arc]); + impl<'a> Display for DisplayWrapper<'a> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut iter = self.0.iter(); + write!(f, "[")?; + if let Some(expr) = iter.next() { + write!(f, "{}", expr)?; + } + for expr in iter { + write!(f, ", {}", expr)?; + } + write!(f, "]")?; + Ok(()) + } + } + DisplayWrapper(exprs) +} diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 0296b7a247d6..12f3df0214a4 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::fmt::Display; use std::sync::Arc; use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping}; @@ -27,6 +28,7 @@ use crate::{ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; +use datafusion_physical_expr_common::physical_expr::format_physical_expr_list; #[derive(Debug, Clone)] /// A structure representing a expression known to be constant in a physical execution plan. @@ -101,6 +103,19 @@ impl ConstExpr { } } +/// Display implementation for `ConstExpr` +/// +/// Example `c` or `c(across_partitions)` +impl Display for ConstExpr { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.expr)?; + if self.across_partitions { + write!(f, "(across_partitions)")?; + } + Ok(()) + } +} + impl From> for ConstExpr { fn from(expr: Arc) -> Self { Self::new(expr) @@ -224,6 +239,12 @@ impl EquivalenceClass { } } +impl Display for EquivalenceClass { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "[{}]", format_physical_expr_list(&self.exprs)) + } +} + /// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each /// class represents a distinct equivalence class in a relation. #[derive(Debug, Clone)] @@ -575,6 +596,20 @@ impl EquivalenceGroup { } } +impl Display for EquivalenceGroup { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "[")?; + let mut iter = self.iter(); + if let Some(cls) = iter.next() { + write!(f, "{}", cls)?; + } + for cls in iter { + write!(f, ", {}", cls)?; + } + write!(f, "]") + } +} + #[cfg(test)] mod tests { diff --git a/datafusion/physical-expr/src/equivalence/mod.rs b/datafusion/physical-expr/src/equivalence/mod.rs index d862eda5018e..f76a17ff793e 100644 --- a/datafusion/physical-expr/src/equivalence/mod.rs +++ b/datafusion/physical-expr/src/equivalence/mod.rs @@ -207,7 +207,7 @@ mod tests { // Define a and f are aliases eq_properties.add_equal_conditions(col_a, col_f)?; // Column e has constant value. - eq_properties = eq_properties.add_constants([ConstExpr::from(col_e)]); + eq_properties = eq_properties.with_constants([ConstExpr::from(col_e)]); // Randomly order columns for sorting let mut rng = StdRng::seed_from_u64(seed); diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs b/datafusion/physical-expr/src/equivalence/ordering.rs index 49a0de7252ab..65423033d5e0 100644 --- a/datafusion/physical-expr/src/equivalence/ordering.rs +++ b/datafusion/physical-expr/src/equivalence/ordering.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. +use std::fmt::Display; use std::hash::Hash; use std::sync::Arc; -use arrow_schema::SortOptions; - use crate::equivalence::add_offset_to_expr; use crate::{LexOrdering, PhysicalExpr, PhysicalSortExpr}; +use arrow_schema::SortOptions; /// An `OrderingEquivalenceClass` object keeps track of different alternative /// orderings than can describe a schema. For example, consider the following table: @@ -104,6 +104,11 @@ impl OrderingEquivalenceClass { self.remove_redundant_entries(); } + /// Adds a single ordering to the existing ordering equivalence class. + pub fn add_new_ordering(&mut self, ordering: LexOrdering) { + self.add_new_orderings([ordering]); + } + /// Removes redundant orderings from this equivalence class. For instance, /// if we already have the ordering `[a ASC, b ASC, c DESC]`, then there is /// no need to keep ordering `[a ASC, b ASC]` in the state. @@ -219,6 +224,21 @@ fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) -> false } +impl Display for OrderingEquivalenceClass { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "[")?; + let mut iter = self.orderings.iter(); + if let Some(ordering) = iter.next() { + write!(f, "{}", PhysicalSortExpr::format_list(ordering))?; + } + for ordering in iter { + write!(f, "{}", PhysicalSortExpr::format_list(ordering))?; + } + write!(f, "]")?; + Ok(()) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -559,7 +579,7 @@ mod tests { let constants = constants .into_iter() .map(|expr| ConstExpr::from(expr).with_across_partitions(true)); - eq_properties = eq_properties.add_constants(constants); + eq_properties = eq_properties.with_constants(constants); let reqs = convert_to_sort_exprs(&reqs); assert_eq!( diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index a5d54ee56cff..c260a8314f97 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::fmt::Display; use std::hash::{Hash, Hasher}; use std::sync::Arc; @@ -41,11 +42,16 @@ use datafusion_physical_expr_common::utils::ExprPropertiesNode; use indexmap::{IndexMap, IndexSet}; use itertools::Itertools; -/// A `EquivalenceProperties` object stores useful information related to a schema. +/// A `EquivalenceProperties` object stores information known about the output +/// of a plan node, that can be used to optimize the plan. +/// /// Currently, it keeps track of: -/// - Equivalent expressions, e.g expressions that have same value. -/// - Valid sort expressions (orderings) for the schema. -/// - Constants expressions (e.g expressions that are known to have constant values). +/// - Sort expressions (orderings) +/// - Equivalent expressions: expressions that are known to have same value. +/// - Constants expressions: expressions that are known to contain a single +/// constant value. +/// +/// # Example equivalent sort expressions /// /// Consider table below: /// @@ -60,9 +66,13 @@ use itertools::Itertools; /// └---┴---┘ /// ``` /// -/// where both `a ASC` and `b DESC` can describe the table ordering. With -/// `EquivalenceProperties`, we can keep track of these different valid sort -/// expressions and treat `a ASC` and `b DESC` on an equal footing. +/// In this case, both `a ASC` and `b DESC` can describe the table ordering. +/// `EquivalenceProperties`, tracks these different valid sort expressions and +/// treat `a ASC` and `b DESC` on an equal footing. For example if the query +/// specifies the output sorted by EITHER `a ASC` or `b DESC`, the sort can be +/// avoided. +/// +/// # Example equivalent expressions /// /// Similarly, consider the table below: /// @@ -77,11 +87,39 @@ use itertools::Itertools; /// └---┴---┘ /// ``` /// -/// where columns `a` and `b` always have the same value. We keep track of such -/// equivalences inside this object. With this information, we can optimize -/// things like partitioning. For example, if the partition requirement is -/// `Hash(a)` and output partitioning is `Hash(b)`, then we can deduce that -/// the existing partitioning satisfies the requirement. +/// In this case, columns `a` and `b` always have the same value, which can of +/// such equivalences inside this object. With this information, Datafusion can +/// optimize operations such as. For example, if the partition requirement is +/// `Hash(a)` and output partitioning is `Hash(b)`, then DataFusion avoids +/// repartitioning the data as the existing partitioning satisfies the +/// requirement. +/// +/// # Code Example +/// ``` +/// # use std::sync::Arc; +/// # use arrow_schema::{Schema, Field, DataType, SchemaRef}; +/// # use datafusion_physical_expr::{ConstExpr, EquivalenceProperties}; +/// # use datafusion_physical_expr::expressions::col; +/// use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; +/// # let schema: SchemaRef = Arc::new(Schema::new(vec![ +/// # Field::new("a", DataType::Int32, false), +/// # Field::new("b", DataType::Int32, false), +/// # Field::new("c", DataType::Int32, false), +/// # ])); +/// # let col_a = col("a", &schema).unwrap(); +/// # let col_b = col("b", &schema).unwrap(); +/// # let col_c = col("c", &schema).unwrap(); +/// // This object represents data that is sorted by a ASC, c DESC +/// // with a single constant value of b +/// let mut eq_properties = EquivalenceProperties::new(schema) +/// .with_constants(vec![ConstExpr::from(col_b)]); +/// eq_properties.add_new_ordering(vec![ +/// PhysicalSortExpr::new_default(col_a).asc(), +/// PhysicalSortExpr::new_default(col_c).desc(), +/// ]); +/// +/// assert_eq!(eq_properties.to_string(), "order: [a@0 ASC,c@2 DESC], const: [b@1]") +/// ``` #[derive(Debug, Clone)] pub struct EquivalenceProperties { /// Collection of equivalence classes that store expressions with the same @@ -164,7 +202,7 @@ impl EquivalenceProperties { pub fn extend(mut self, other: Self) -> Self { self.eq_group.extend(other.eq_group); self.oeq_class.extend(other.oeq_class); - self.add_constants(other.constants) + self.with_constants(other.constants) } /// Clears (empties) the ordering equivalence class within this object. @@ -193,6 +231,11 @@ impl EquivalenceProperties { self.oeq_class.add_new_orderings(orderings); } + /// Adds a single ordering to the existing ordering equivalence class. + pub fn add_new_ordering(&mut self, ordering: LexOrdering) { + self.add_new_orderings([ordering]); + } + /// Incorporates the given equivalence group to into the existing /// equivalence group within. pub fn add_equivalence_group(&mut self, other_eq_group: EquivalenceGroup) { @@ -231,7 +274,13 @@ impl EquivalenceProperties { } /// Track/register physical expressions with constant values. - pub fn add_constants( + #[deprecated(since = "43.0.0", note = "Use [`with_constants`] instead")] + pub fn add_constants(self, constants: impl IntoIterator) -> Self { + self.with_constants(constants) + } + + /// Track/register physical expressions with constant values. + pub fn with_constants( mut self, constants: impl IntoIterator, ) -> Self { @@ -427,7 +476,7 @@ impl EquivalenceProperties { // we add column `a` as constant to the algorithm state. This enables us // to deduce that `(b + c) ASC` is satisfied, given `a` is constant. eq_properties = eq_properties - .add_constants(std::iter::once(ConstExpr::from(normalized_req.expr))); + .with_constants(std::iter::once(ConstExpr::from(normalized_req.expr))); } true } @@ -923,7 +972,7 @@ impl EquivalenceProperties { // an implementation strategy confined to this function. for (PhysicalSortExpr { expr, .. }, idx) in &ordered_exprs { eq_properties = - eq_properties.add_constants(std::iter::once(ConstExpr::from(expr))); + eq_properties.with_constants(std::iter::once(ConstExpr::from(expr))); search_indices.shift_remove(idx); } // Add new ordered section to the state. @@ -1049,6 +1098,41 @@ impl EquivalenceProperties { } } +/// More readable display version of the `EquivalenceProperties`. +/// +/// Format: +/// ```text +/// order: [[a ASC, b ASC], [a ASC, c ASC]], eq: [[a = b], [a = c]], const: [a = 1] +/// ``` +impl Display for EquivalenceProperties { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.eq_group.is_empty() + && self.oeq_class.is_empty() + && self.constants.is_empty() + { + return write!(f, "No properties"); + } + if !self.oeq_class.is_empty() { + write!(f, "order: {}", self.oeq_class)?; + } + if !self.eq_group.is_empty() { + write!(f, ", eq: {}", self.eq_group)?; + } + if !self.constants.is_empty() { + write!(f, ", const: [")?; + let mut iter = self.constants.iter(); + if let Some(c) = iter.next() { + write!(f, "{}", c)?; + } + for c in iter { + write!(f, ", {}", c)?; + } + write!(f, "]")?; + } + Ok(()) + } +} + /// Calculates the properties of a given [`ExprPropertiesNode`]. /// /// Order information can be retrieved as: @@ -1476,10 +1560,10 @@ pub fn join_equivalence_properties( } match join_type { JoinType::LeftAnti | JoinType::LeftSemi => { - result = result.add_constants(left_constants); + result = result.with_constants(left_constants); } JoinType::RightAnti | JoinType::RightSemi => { - result = result.add_constants(right_constants); + result = result.with_constants(right_constants); } _ => {} } @@ -2288,7 +2372,7 @@ mod tests { let col_h = &col("h", &test_schema)?; // Add column h as constant - eq_properties = eq_properties.add_constants(vec![ConstExpr::from(col_h)]); + eq_properties = eq_properties.with_constants(vec![ConstExpr::from(col_h)]); let test_cases = vec![ // TEST CASE 1 @@ -2562,13 +2646,13 @@ mod tests { for [left, right] in &case.equal_conditions { properties.add_equal_conditions(left, right)? } - properties.add_constants( + properties.with_constants( case.constants.iter().cloned().map(ConstExpr::from), ) }, // Constants before equal conditions { - let mut properties = base_properties.clone().add_constants( + let mut properties = base_properties.clone().with_constants( case.constants.iter().cloned().map(ConstExpr::from), ); for [left, right] in &case.equal_conditions { @@ -2600,6 +2684,12 @@ mod tests { Ok(()) } + /// Return a new schema with the same types, but new field names + /// + /// The new field names are the old field names with `text` appended. + /// + /// For example, the schema "a", "b", "c" becomes "a1", "b1", "c1" + /// if `text` is "1". fn append_fields(schema: &SchemaRef, text: &str) -> SchemaRef { Arc::new(Schema::new( schema @@ -2955,7 +3045,7 @@ mod tests { .map(|expr| ConstExpr::new(Arc::clone(expr))) .collect::>(); let mut lhs = EquivalenceProperties::new(Arc::clone(first_schema)); - lhs = lhs.add_constants(first_constants); + lhs = lhs.with_constants(first_constants); lhs.add_new_orderings(first_orderings); let second_orderings = second_child_orderings @@ -2967,7 +3057,7 @@ mod tests { .map(|expr| ConstExpr::new(Arc::clone(expr))) .collect::>(); let mut rhs = EquivalenceProperties::new(Arc::clone(second_schema)); - rhs = rhs.add_constants(second_constants); + rhs = rhs.with_constants(second_constants); rhs.add_new_orderings(second_orderings); let union_expected_orderings = union_orderings @@ -2979,7 +3069,7 @@ mod tests { .map(|expr| ConstExpr::new(Arc::clone(expr))) .collect::>(); let mut union_expected_eq = EquivalenceProperties::new(Arc::clone(&schema)); - union_expected_eq = union_expected_eq.add_constants(union_constants); + union_expected_eq = union_expected_eq.with_constants(union_constants); union_expected_eq.add_new_orderings(union_expected_orderings); let actual_union_eq = calculate_union_binary(lhs, rhs)?; diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 3da0f21156d9..417d2098b083 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -255,11 +255,11 @@ impl FilterExec { ConstExpr::new(expr).with_across_partitions(true) }); // this is for statistics - eq_properties = eq_properties.add_constants(constants); + eq_properties = eq_properties.with_constants(constants); // this is for logical constant (for example: a = '1', then a could be marked as a constant) // to do: how to deal with multiple situation to represent = (for example c1 between 0 and 0) eq_properties = - eq_properties.add_constants(Self::extend_constants(input, predicate)); + eq_properties.with_constants(Self::extend_constants(input, predicate)); let mut output_partitioning = input.output_partitioning().clone(); // If contains projection, update the PlanProperties. diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 981a8e285166..6c1018148c62 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -576,7 +576,7 @@ pub fn get_window_mode( })); // Treat partition by exprs as constant. During analysis of requirements are satisfied. let const_exprs = partitionby_exprs.iter().map(ConstExpr::from); - let partition_by_eqs = input_eqs.add_constants(const_exprs); + let partition_by_eqs = input_eqs.with_constants(const_exprs); let order_by_reqs = PhysicalSortRequirement::from_sort_exprs(orderby_keys); let reverse_order_by_reqs = PhysicalSortRequirement::from_sort_exprs(&reverse_order_bys(orderby_keys));