Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve documentation and add Display impl to EquivalenceProperties #12590

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ mod sp_repartition_fuzz_tests {
// Define a and f are aliases
eq_properties.add_equal_conditions(col_a, col_f)?;
// Column e has constant value.
eq_properties = eq_properties.add_constants([ConstExpr::from(col_e)]);
eq_properties = eq_properties.with_constants([ConstExpr::from(col_e)]);

// Randomly order columns for sorting
let mut rng = StdRng::seed_from_u64(seed);
Expand Down
24 changes: 23 additions & 1 deletion datafusion/physical-expr-common/src/physical_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use std::any::Any;
use std::fmt::{Debug, Display};
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;

Expand Down Expand Up @@ -223,3 +223,25 @@ pub fn down_cast_any_ref(any: &dyn Any) -> &dyn Any {
any
}
}

/// Returns [`Display`] able a list of [`PhysicalExpr`]
///
/// Example output: `[a + 1, b]`
pub fn format_physical_expr_list(exprs: &[Arc<dyn PhysicalExpr>]) -> impl Display + '_ {
struct DisplayWrapper<'a>(&'a [Arc<dyn PhysicalExpr>]);
impl<'a> Display for DisplayWrapper<'a> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut iter = self.0.iter();
write!(f, "[")?;
if let Some(expr) = iter.next() {
write!(f, "{}", expr)?;
}
for expr in iter {
write!(f, ", {}", expr)?;
}
write!(f, "]")?;
Ok(())
}
}
DisplayWrapper(exprs)
}
35 changes: 35 additions & 0 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Display;
use std::sync::Arc;

use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
Expand All @@ -27,6 +28,7 @@ use crate::{

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::JoinType;
use datafusion_physical_expr_common::physical_expr::format_physical_expr_list;

#[derive(Debug, Clone)]
/// A structure representing a expression known to be constant in a physical execution plan.
Expand Down Expand Up @@ -101,6 +103,19 @@ impl ConstExpr {
}
}

/// Display implementation for `ConstExpr`
///
/// Example `c` or `c(across_partitions)`
impl Display for ConstExpr {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.expr)?;
if self.across_partitions {
write!(f, "(across_partitions)")?;
}
Ok(())
}
}

impl From<Arc<dyn PhysicalExpr>> for ConstExpr {
fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
Self::new(expr)
Expand Down Expand Up @@ -224,6 +239,12 @@ impl EquivalenceClass {
}
}

impl Display for EquivalenceClass {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "[{}]", format_physical_expr_list(&self.exprs))
}
}

/// An `EquivalenceGroup` is a collection of `EquivalenceClass`es where each
/// class represents a distinct equivalence class in a relation.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -575,6 +596,20 @@ impl EquivalenceGroup {
}
}

impl Display for EquivalenceGroup {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "[")?;
let mut iter = self.iter();
if let Some(cls) = iter.next() {
write!(f, "{}", cls)?;
}
for cls in iter {
write!(f, ", {}", cls)?;
}
write!(f, "]")
}
}

#[cfg(test)]
mod tests {

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod tests {
// Define a and f are aliases
eq_properties.add_equal_conditions(col_a, col_f)?;
// Column e has constant value.
eq_properties = eq_properties.add_constants([ConstExpr::from(col_e)]);
eq_properties = eq_properties.with_constants([ConstExpr::from(col_e)]);

// Randomly order columns for sorting
let mut rng = StdRng::seed_from_u64(seed);
Expand Down
26 changes: 23 additions & 3 deletions datafusion/physical-expr/src/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Display;
use std::hash::Hash;
use std::sync::Arc;

use arrow_schema::SortOptions;

use crate::equivalence::add_offset_to_expr;
use crate::{LexOrdering, PhysicalExpr, PhysicalSortExpr};
use arrow_schema::SortOptions;

/// An `OrderingEquivalenceClass` object keeps track of different alternative
/// orderings than can describe a schema. For example, consider the following table:
Expand Down Expand Up @@ -104,6 +104,11 @@ impl OrderingEquivalenceClass {
self.remove_redundant_entries();
}

/// Adds a single ordering to the existing ordering equivalence class.
pub fn add_new_ordering(&mut self, ordering: LexOrdering) {
self.add_new_orderings([ordering]);
}

/// Removes redundant orderings from this equivalence class. For instance,
/// if we already have the ordering `[a ASC, b ASC, c DESC]`, then there is
/// no need to keep ordering `[a ASC, b ASC]` in the state.
Expand Down Expand Up @@ -219,6 +224,21 @@ fn resolve_overlap(orderings: &mut [LexOrdering], idx: usize, pre_idx: usize) ->
false
}

impl Display for OrderingEquivalenceClass {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[")?;
let mut iter = self.orderings.iter();
if let Some(ordering) = iter.next() {
write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
}
for ordering in iter {
write!(f, "{}", PhysicalSortExpr::format_list(ordering))?;
}
write!(f, "]")?;
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -559,7 +579,7 @@ mod tests {
let constants = constants
.into_iter()
.map(|expr| ConstExpr::from(expr).with_across_partitions(true));
eq_properties = eq_properties.add_constants(constants);
eq_properties = eq_properties.with_constants(constants);

let reqs = convert_to_sort_exprs(&reqs);
assert_eq!(
Expand Down
Loading
Loading