diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 9819ae795b74..e06f947ad5e7 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -199,9 +199,16 @@ impl DFSchema { pub fn with_functional_dependencies( mut self, functional_dependencies: FunctionalDependencies, - ) -> Self { - self.functional_dependencies = functional_dependencies; - self + ) -> Result { + if functional_dependencies.is_valid(self.fields.len()) { + self.functional_dependencies = functional_dependencies; + Ok(self) + } else { + _plan_err!( + "Invalid functional dependency: {:?}", + functional_dependencies + ) + } } /// Create a new schema that contains the fields from this schema followed by the fields diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 4587677e7726..1cb1751d713e 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -24,6 +24,7 @@ use std::ops::Deref; use std::vec::IntoIter; use crate::error::_plan_err; +use crate::utils::{merge_and_order_indices, set_difference}; use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result}; use sqlparser::ast::TableConstraint; @@ -271,6 +272,29 @@ impl FunctionalDependencies { self.deps.extend(other.deps); } + /// Sanity checks if functional dependencies are valid. For example, if + /// there are 10 fields, we cannot receive any index further than 9. + pub fn is_valid(&self, n_field: usize) -> bool { + self.deps.iter().all( + |FunctionalDependence { + source_indices, + target_indices, + .. + }| { + source_indices + .iter() + .max() + .map(|&max_index| max_index < n_field) + .unwrap_or(true) + && target_indices + .iter() + .max() + .map(|&max_index| max_index < n_field) + .unwrap_or(true) + }, + ) + } + /// Adds the `offset` value to `source_indices` and `target_indices` for /// each functional dependency. pub fn add_offset(&mut self, offset: usize) { @@ -442,44 +466,56 @@ pub fn aggregate_functional_dependencies( } in &func_dependencies.deps { // Keep source indices in a `HashSet` to prevent duplicate entries: - let mut new_source_indices = HashSet::new(); + let mut new_source_indices = vec![]; + let mut new_source_field_names = vec![]; let source_field_names = source_indices .iter() .map(|&idx| aggr_input_fields[idx].qualified_name()) .collect::>(); + for (idx, group_by_expr_name) in group_by_expr_names.iter().enumerate() { // When one of the input determinant expressions matches with // the GROUP BY expression, add the index of the GROUP BY // expression as a new determinant key: if source_field_names.contains(group_by_expr_name) { - new_source_indices.insert(idx); + new_source_indices.push(idx); + new_source_field_names.push(group_by_expr_name.clone()); } } + let existing_target_indices = + get_target_functional_dependencies(aggr_input_schema, group_by_expr_names); + let new_target_indices = get_target_functional_dependencies( + aggr_input_schema, + &new_source_field_names, + ); + let mode = if existing_target_indices == new_target_indices + && new_target_indices.is_some() + { + // If dependency covers all GROUP BY expressions, mode will be `Single`: + Dependency::Single + } else { + // Otherwise, existing mode is preserved: + *mode + }; // All of the composite indices occur in the GROUP BY expression: if new_source_indices.len() == source_indices.len() { aggregate_func_dependencies.push( FunctionalDependence::new( - new_source_indices.into_iter().collect(), + new_source_indices, target_indices.clone(), *nullable, ) - // input uniqueness stays the same when GROUP BY matches with input functional dependence determinants - .with_mode(*mode), + .with_mode(mode), ); } } + // If we have a single GROUP BY key, we can guarantee uniqueness after // aggregation: if group_by_expr_names.len() == 1 { // If `source_indices` contain 0, delete this functional dependency // as it will be added anyway with mode `Dependency::Single`: - if let Some(idx) = aggregate_func_dependencies - .iter() - .position(|item| item.source_indices.contains(&0)) - { - // Delete the functional dependency that contains zeroth idx: - aggregate_func_dependencies.remove(idx); - } + aggregate_func_dependencies.retain(|item| !item.source_indices.contains(&0)); // Add a new functional dependency associated with the whole table: aggregate_func_dependencies.push( // Use nullable property of the group by expression @@ -527,8 +563,61 @@ pub fn get_target_functional_dependencies( combined_target_indices.extend(target_indices.iter()); } } - (!combined_target_indices.is_empty()) - .then_some(combined_target_indices.iter().cloned().collect::>()) + (!combined_target_indices.is_empty()).then_some({ + let mut result = combined_target_indices.into_iter().collect::>(); + result.sort(); + result + }) +} + +/// Returns indices for the minimal subset of GROUP BY expressions that are +/// functionally equivalent to the original set of GROUP BY expressions. +pub fn get_required_group_by_exprs_indices( + schema: &DFSchema, + group_by_expr_names: &[String], +) -> Option> { + let dependencies = schema.functional_dependencies(); + let field_names = schema + .fields() + .iter() + .map(|item| item.qualified_name()) + .collect::>(); + let mut groupby_expr_indices = group_by_expr_names + .iter() + .map(|group_by_expr_name| { + field_names + .iter() + .position(|field_name| field_name == group_by_expr_name) + }) + .collect::>>()?; + + groupby_expr_indices.sort(); + for FunctionalDependence { + source_indices, + target_indices, + .. + } in &dependencies.deps + { + if source_indices + .iter() + .all(|source_idx| groupby_expr_indices.contains(source_idx)) + { + // If all source indices are among GROUP BY expression indices, we + // can remove target indices from GROUP BY expression indices and + // use source indices instead. + groupby_expr_indices = set_difference(&groupby_expr_indices, target_indices); + groupby_expr_indices = + merge_and_order_indices(groupby_expr_indices, source_indices); + } + } + groupby_expr_indices + .iter() + .map(|idx| { + group_by_expr_names + .iter() + .position(|name| &field_names[*idx] == name) + }) + .collect() } /// Updates entries inside the `entries` vector with their corresponding diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs index 6df89624fc51..ed547782e4a5 100644 --- a/datafusion/common/src/lib.rs +++ b/datafusion/common/src/lib.rs @@ -56,8 +56,9 @@ pub use file_options::file_type::{ }; pub use file_options::FileTypeWriterOptions; pub use functional_dependencies::{ - aggregate_functional_dependencies, get_target_functional_dependencies, Constraint, - Constraints, Dependency, FunctionalDependence, FunctionalDependencies, + aggregate_functional_dependencies, get_required_group_by_exprs_indices, + get_target_functional_dependencies, Constraint, Constraints, Dependency, + FunctionalDependence, FunctionalDependencies, }; pub use join_type::{JoinConstraint, JoinSide, JoinType}; pub use param_value::ParamValues; diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 52b5157b7313..c40dd522a457 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -23,44 +23,43 @@ mod parquet; use std::any::Any; use std::sync::Arc; +use crate::arrow::datatypes::{Schema, SchemaRef}; +use crate::arrow::record_batch::RecordBatch; +use crate::arrow::util::pretty; +use crate::datasource::{provider_as_source, MemTable, TableProvider}; +use crate::error::Result; +use crate::execution::{ + context::{SessionState, TaskContext}, + FunctionRegistry, +}; +use crate::logical_expr::utils::find_window_exprs; +use crate::logical_expr::{ + col, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, TableType, +}; +use crate::physical_plan::{ + collect, collect_partitioned, execute_stream, execute_stream_partitioned, + ExecutionPlan, SendableRecordBatchStream, +}; +use crate::prelude::SessionContext; + use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; use arrow::compute::{cast, concat}; use arrow::csv::WriterBuilder; use arrow::datatypes::{DataType, Field}; -use async_trait::async_trait; use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ - DataFusionError, FileType, FileTypeWriterOptions, ParamValues, SchemaError, - UnnestOptions, + Column, DFSchema, DataFusionError, FileType, FileTypeWriterOptions, ParamValues, + SchemaError, UnnestOptions, }; use datafusion_expr::dml::CopyOptions; - -use datafusion_common::{Column, DFSchema}; use datafusion_expr::{ avg, count, is_null, max, median, min, stddev, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE, }; -use crate::arrow::datatypes::Schema; -use crate::arrow::datatypes::SchemaRef; -use crate::arrow::record_batch::RecordBatch; -use crate::arrow::util::pretty; -use crate::datasource::{provider_as_source, MemTable, TableProvider}; -use crate::error::Result; -use crate::execution::{ - context::{SessionState, TaskContext}, - FunctionRegistry, -}; -use crate::logical_expr::{ - col, utils::find_window_exprs, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, - Partitioning, TableType, -}; -use crate::physical_plan::SendableRecordBatchStream; -use crate::physical_plan::{collect, collect_partitioned}; -use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan}; -use crate::prelude::SessionContext; +use async_trait::async_trait; /// Contains options that control how data is /// written out from a DataFrame @@ -1343,24 +1342,43 @@ impl TableProvider for DataFrameTableProvider { mod tests { use std::vec; - use arrow::array::Int32Array; - use arrow::datatypes::DataType; + use super::*; + use crate::execution::context::SessionConfig; + use crate::physical_plan::{ColumnarValue, Partitioning, PhysicalExpr}; + use crate::test_util::{register_aggregate_csv, test_table, test_table_with_name}; + use crate::{assert_batches_sorted_eq, execution::context::SessionContext}; + use arrow::array::{self, Int32Array}; + use arrow::datatypes::DataType; + use datafusion_common::{Constraint, Constraints, ScalarValue}; use datafusion_expr::{ avg, cast, count, count_distinct, create_udf, expr, lit, max, min, sum, - BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFrame, - WindowFunction, + BinaryExpr, BuiltInWindowFunction, Operator, ScalarFunctionImplementation, + Volatility, WindowFrame, WindowFunction, }; use datafusion_physical_expr::expressions::Column; - - use crate::execution::context::SessionConfig; - use crate::physical_plan::ColumnarValue; - use crate::physical_plan::Partitioning; - use crate::physical_plan::PhysicalExpr; - use crate::test_util::{register_aggregate_csv, test_table, test_table_with_name}; - use crate::{assert_batches_sorted_eq, execution::context::SessionContext}; - - use super::*; + use datafusion_physical_plan::get_plan_string; + + pub fn table_with_constraints() -> Arc { + let dual_schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ])); + let batch = RecordBatch::try_new( + dual_schema.clone(), + vec![ + Arc::new(array::Int32Array::from(vec![1])), + Arc::new(array::StringArray::from(vec!["a"])), + ], + ) + .unwrap(); + let provider = MemTable::try_new(dual_schema, vec![vec![batch]]) + .unwrap() + .with_constraints(Constraints::new_unverified(vec![Constraint::PrimaryKey( + vec![0], + )])); + Arc::new(provider) + } async fn assert_logical_expr_schema_eq_physical_expr_schema( df: DataFrame, @@ -1557,6 +1575,262 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_aggregate_with_pk() -> Result<()> { + // create the dataframe + let config = SessionConfig::new().with_target_partitions(1); + let ctx = SessionContext::new_with_config(config); + + let table1 = table_with_constraints(); + let df = ctx.read_table(table1)?; + let col_id = Expr::Column(datafusion_common::Column { + relation: None, + name: "id".to_string(), + }); + let col_name = Expr::Column(datafusion_common::Column { + relation: None, + name: "name".to_string(), + }); + + // group by contains id column + let group_expr = vec![col_id.clone()]; + let aggr_expr = vec![]; + let df = df.aggregate(group_expr, aggr_expr)?; + + // expr list contains id, name + let expr_list = vec![col_id, col_name]; + let df = df.select(expr_list)?; + let physical_plan = df.clone().create_physical_plan().await?; + let expected = vec![ + "AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]", + " MemoryExec: partitions=1, partition_sizes=[1]", + ]; + // Get string representation of the plan + let actual = get_plan_string(&physical_plan); + assert_eq!( + expected, actual, + "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + // Since id and name are functionally dependant, we can use name among expression + // even if it is not part of the group by expression. + let df_results = collect(physical_plan, ctx.task_ctx()).await?; + + #[rustfmt::skip] + assert_batches_sorted_eq!( + ["+----+------+", + "| id | name |", + "+----+------+", + "| 1 | a |", + "+----+------+",], + &df_results + ); + + Ok(()) + } + + #[tokio::test] + async fn test_aggregate_with_pk2() -> Result<()> { + // create the dataframe + let config = SessionConfig::new().with_target_partitions(1); + let ctx = SessionContext::new_with_config(config); + + let table1 = table_with_constraints(); + let df = ctx.read_table(table1)?; + let col_id = Expr::Column(datafusion_common::Column { + relation: None, + name: "id".to_string(), + }); + let col_name = Expr::Column(datafusion_common::Column { + relation: None, + name: "name".to_string(), + }); + + // group by contains id column + let group_expr = vec![col_id.clone()]; + let aggr_expr = vec![]; + let df = df.aggregate(group_expr, aggr_expr)?; + + let condition1 = Expr::BinaryExpr(BinaryExpr::new( + Box::new(col_id.clone()), + Operator::Eq, + Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))), + )); + let condition2 = Expr::BinaryExpr(BinaryExpr::new( + Box::new(col_name), + Operator::Eq, + Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))), + )); + // Predicate refers to id, and name fields + let predicate = Expr::BinaryExpr(BinaryExpr::new( + Box::new(condition1), + Operator::And, + Box::new(condition2), + )); + let df = df.filter(predicate)?; + let physical_plan = df.clone().create_physical_plan().await?; + + let expected = vec![ + "CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: id@0 = 1 AND name@1 = a", + " AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]", + " MemoryExec: partitions=1, partition_sizes=[1]", + ]; + // Get string representation of the plan + let actual = get_plan_string(&physical_plan); + assert_eq!( + expected, actual, + "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + // Since id and name are functionally dependant, we can use name among expression + // even if it is not part of the group by expression. + let df_results = collect(physical_plan, ctx.task_ctx()).await?; + + #[rustfmt::skip] + assert_batches_sorted_eq!( + ["+----+------+", + "| id | name |", + "+----+------+", + "| 1 | a |", + "+----+------+",], + &df_results + ); + + Ok(()) + } + + #[tokio::test] + async fn test_aggregate_with_pk3() -> Result<()> { + // create the dataframe + let config = SessionConfig::new().with_target_partitions(1); + let ctx = SessionContext::new_with_config(config); + + let table1 = table_with_constraints(); + let df = ctx.read_table(table1)?; + let col_id = Expr::Column(datafusion_common::Column { + relation: None, + name: "id".to_string(), + }); + let col_name = Expr::Column(datafusion_common::Column { + relation: None, + name: "name".to_string(), + }); + + // group by contains id column + let group_expr = vec![col_id.clone()]; + let aggr_expr = vec![]; + // group by id, + let df = df.aggregate(group_expr, aggr_expr)?; + + let condition1 = Expr::BinaryExpr(BinaryExpr::new( + Box::new(col_id.clone()), + Operator::Eq, + Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))), + )); + // Predicate refers to id field + let predicate = condition1; + // id=0 + let df = df.filter(predicate)?; + // Select expression refers to id, and name columns. + // id, name + let df = df.select(vec![col_id.clone(), col_name.clone()])?; + let physical_plan = df.clone().create_physical_plan().await?; + + let expected = vec![ + "CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: id@0 = 1", + " AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], aggr=[]", + " MemoryExec: partitions=1, partition_sizes=[1]", + ]; + // Get string representation of the plan + let actual = get_plan_string(&physical_plan); + assert_eq!( + expected, actual, + "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + // Since id and name are functionally dependant, we can use name among expression + // even if it is not part of the group by expression. + let df_results = collect(physical_plan, ctx.task_ctx()).await?; + + #[rustfmt::skip] + assert_batches_sorted_eq!( + ["+----+------+", + "| id | name |", + "+----+------+", + "| 1 | a |", + "+----+------+",], + &df_results + ); + + Ok(()) + } + + #[tokio::test] + async fn test_aggregate_with_pk4() -> Result<()> { + // create the dataframe + let config = SessionConfig::new().with_target_partitions(1); + let ctx = SessionContext::new_with_config(config); + + let table1 = table_with_constraints(); + let df = ctx.read_table(table1)?; + let col_id = Expr::Column(datafusion_common::Column { + relation: None, + name: "id".to_string(), + }); + + // group by contains id column + let group_expr = vec![col_id.clone()]; + let aggr_expr = vec![]; + // group by id, + let df = df.aggregate(group_expr, aggr_expr)?; + + let condition1 = Expr::BinaryExpr(BinaryExpr::new( + Box::new(col_id.clone()), + Operator::Eq, + Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))), + )); + // Predicate refers to id field + let predicate = condition1; + // id=1 + let df = df.filter(predicate)?; + // Select expression refers to id column. + // id + let df = df.select(vec![col_id.clone()])?; + let physical_plan = df.clone().create_physical_plan().await?; + + // In this case aggregate shouldn't be expanded, since these + // columns are not used. + let expected = vec![ + "CoalesceBatchesExec: target_batch_size=8192", + " FilterExec: id@0 = 1", + " AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]", + " MemoryExec: partitions=1, partition_sizes=[1]", + ]; + // Get string representation of the plan + let actual = get_plan_string(&physical_plan); + assert_eq!( + expected, actual, + "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + // Since id and name are functionally dependant, we can use name among expression + // even if it is not part of the group by expression. + let df_results = collect(physical_plan, ctx.task_ctx()).await?; + + #[rustfmt::skip] + assert_batches_sorted_eq!( + [ "+----+", + "| id |", + "+----+", + "| 1 |", + "+----+",], + &df_results + ); + + Ok(()) + } + #[tokio::test] async fn test_distinct() -> Result<()> { let t = test_table().await?; diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 10f4574020bf..c6b8e0e01b4f 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -1323,6 +1323,91 @@ async fn unnest_array_agg() -> Result<()> { Ok(()) } +#[tokio::test] +async fn unnest_with_redundant_columns() -> Result<()> { + let mut shape_id_builder = UInt32Builder::new(); + let mut tag_id_builder = UInt32Builder::new(); + + for shape_id in 1..=3 { + for tag_id in 1..=3 { + shape_id_builder.append_value(shape_id as u32); + tag_id_builder.append_value((shape_id * 10 + tag_id) as u32); + } + } + + let batch = RecordBatch::try_from_iter(vec![ + ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef), + ("tag_id", Arc::new(tag_id_builder.finish()) as ArrayRef), + ])?; + + let ctx = SessionContext::new(); + ctx.register_batch("shapes", batch)?; + let df = ctx.table("shapes").await?; + + let results = df.clone().collect().await?; + let expected = vec![ + "+----------+--------+", + "| shape_id | tag_id |", + "+----------+--------+", + "| 1 | 11 |", + "| 1 | 12 |", + "| 1 | 13 |", + "| 2 | 21 |", + "| 2 | 22 |", + "| 2 | 23 |", + "| 3 | 31 |", + "| 3 | 32 |", + "| 3 | 33 |", + "+----------+--------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + // Doing an `array_agg` by `shape_id` produces: + let df = df + .clone() + .aggregate( + vec![col("shape_id")], + vec![array_agg(col("shape_id")).alias("shape_id2")], + )? + .unnest_column("shape_id2")? + .select(vec![col("shape_id")])?; + + let optimized_plan = df.clone().into_optimized_plan()?; + let expected = vec![ + "Projection: shapes.shape_id [shape_id:UInt32]", + " Unnest: shape_id2 [shape_id:UInt32, shape_id2:UInt32;N]", + " Aggregate: groupBy=[[shapes.shape_id]], aggr=[[ARRAY_AGG(shapes.shape_id) AS shape_id2]] [shape_id:UInt32, shape_id2:List(Field { name: \"item\", data_type: UInt32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} });N]", + " TableScan: shapes projection=[shape_id] [shape_id:UInt32]", + ]; + + let formatted = optimized_plan.display_indent_schema().to_string(); + let actual: Vec<&str> = formatted.trim().lines().collect(); + assert_eq!( + expected, actual, + "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" + ); + + let results = df.collect().await?; + let expected = [ + "+----------+", + "| shape_id |", + "+----------+", + "| 1 |", + "| 1 |", + "| 1 |", + "| 2 |", + "| 2 |", + "| 2 |", + "| 3 |", + "| 3 |", + "| 3 |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &results); + + Ok(()) +} + async fn create_test_table(name: &str) -> Result { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c4ff9fe95435..be2c45b901fa 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -50,9 +50,9 @@ use crate::{ use arrow::datatypes::{DataType, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::{ - plan_datafusion_err, plan_err, Column, DFField, DFSchema, DFSchemaRef, - DataFusionError, FileType, OwnedTableReference, Result, ScalarValue, TableReference, - ToDFSchema, UnnestOptions, + get_target_functional_dependencies, plan_datafusion_err, plan_err, Column, DFField, + DFSchema, DFSchemaRef, DataFusionError, FileType, OwnedTableReference, Result, + ScalarValue, TableReference, ToDFSchema, UnnestOptions, }; /// Default table name for unnamed table @@ -904,8 +904,27 @@ impl LogicalPlanBuilder { group_expr: impl IntoIterator>, aggr_expr: impl IntoIterator>, ) -> Result { - let group_expr = normalize_cols(group_expr, &self.plan)?; + let mut group_expr = normalize_cols(group_expr, &self.plan)?; let aggr_expr = normalize_cols(aggr_expr, &self.plan)?; + + // Rewrite groupby exprs according to functional dependencies + let group_by_expr_names = group_expr + .iter() + .map(|group_by_expr| group_by_expr.display_name()) + .collect::>>()?; + let schema = self.plan.schema(); + if let Some(target_indices) = + get_target_functional_dependencies(schema, &group_by_expr_names) + { + for idx in target_indices { + let field = schema.field(idx); + let expr = + Expr::Column(Column::new(field.qualifier().cloned(), field.name())); + if !group_expr.contains(&expr) { + group_expr.push(expr); + } + } + } Aggregate::try_new(Arc::new(self.plan), group_expr, aggr_expr) .map(LogicalPlan::Aggregate) .map(Self::from) @@ -1166,8 +1185,8 @@ pub fn build_join_schema( ); let mut metadata = left.metadata().clone(); metadata.extend(right.metadata().clone()); - DFSchema::new_with_metadata(fields, metadata) - .map(|schema| schema.with_functional_dependencies(func_dependencies)) + let schema = DFSchema::new_with_metadata(fields, metadata)?; + schema.with_functional_dependencies(func_dependencies) } /// Errors if one or more expressions have equal names. @@ -1491,7 +1510,7 @@ pub fn unnest_with_options( let df_schema = DFSchema::new_with_metadata(fields, metadata)?; // We can use the existing functional dependencies: let deps = input_schema.functional_dependencies().clone(); - let schema = Arc::new(df_schema.with_functional_dependencies(deps)); + let schema = Arc::new(df_schema.with_functional_dependencies(deps)?); Ok(LogicalPlan::Unnest(Unnest { input: Arc::new(input), diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index d85e0b5b0a40..dfd4fbf65d8e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -946,7 +946,7 @@ impl LogicalPlan { // We can use the existing functional dependencies as is: .with_functional_dependencies( input.schema().functional_dependencies().clone(), - ), + )?, ); Ok(LogicalPlan::Unnest(Unnest { @@ -1834,8 +1834,9 @@ pub fn projection_schema(input: &LogicalPlan, exprs: &[Expr]) -> Result Schema { Schema::new(vec![ @@ -3164,15 +3165,20 @@ digraph { ) .unwrap(); assert!(!filter.is_scalar()); - let unique_schema = - Arc::new(schema.as_ref().clone().with_functional_dependencies( - FunctionalDependencies::new_from_constraints( - Some(&Constraints::new_unverified(vec![Constraint::Unique( - vec![0], - )])), - 1, - ), - )); + let unique_schema = Arc::new( + schema + .as_ref() + .clone() + .with_functional_dependencies( + FunctionalDependencies::new_from_constraints( + Some(&Constraints::new_unverified(vec![Constraint::Unique( + vec![0], + )])), + 1, + ), + ) + .unwrap(), + ); let scan = Arc::new(LogicalPlan::TableScan(TableScan { table_name: TableReference::bare("tab"), source, diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs index 1027e97d061a..dd9449198796 100644 --- a/datafusion/expr/src/type_coercion/binary.rs +++ b/datafusion/expr/src/type_coercion/binary.rs @@ -116,7 +116,7 @@ fn signature(lhs: &DataType, op: &Operator, rhs: &DataType) -> Result }) } AtArrow | ArrowAt => { - // ArrowAt and AtArrow check for whether one array ic contained in another. + // ArrowAt and AtArrow check for whether one array is contained in another. // The result type is boolean. Signature::comparison defines this signature. // Operation has nothing to do with comparison array_coercion(lhs, rhs).map(Signature::comparison).ok_or_else(|| { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index c30c734fcf1f..abdd7f5f57f6 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -17,6 +17,10 @@ //! Expression utilities +use std::cmp::Ordering; +use std::collections::HashSet; +use std::sync::Arc; + use crate::expr::{Alias, Sort, WindowFunction}; use crate::expr_rewriter::strip_outer_reference; use crate::logical_plan::Aggregate; @@ -25,16 +29,15 @@ use crate::{ and, BinaryExpr, Cast, Expr, ExprSchemable, Filter, GroupingSet, LogicalPlan, Operator, TryCast, }; + use arrow::datatypes::{DataType, TimeUnit}; use datafusion_common::tree_node::{TreeNode, VisitRecursion}; use datafusion_common::{ internal_err, plan_datafusion_err, plan_err, Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, TableReference, }; + use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, WildcardAdditionalOptions}; -use std::cmp::Ordering; -use std::collections::HashSet; -use std::sync::Arc; /// The value to which `COUNT(*)` is expanded to in /// `COUNT()` expressions @@ -433,7 +436,7 @@ pub fn expand_qualified_wildcard( let qualified_schema = DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())? // We can use the functional dependencies as is, since it only stores indices: - .with_functional_dependencies(schema.functional_dependencies().clone()); + .with_functional_dependencies(schema.functional_dependencies().clone())?; let excluded_columns = if let Some(WildcardAdditionalOptions { opt_exclude, opt_except, @@ -730,11 +733,7 @@ fn agg_cols(agg: &Aggregate) -> Vec { .collect() } -fn exprlist_to_fields_aggregate( - exprs: &[Expr], - plan: &LogicalPlan, - agg: &Aggregate, -) -> Result> { +fn exprlist_to_fields_aggregate(exprs: &[Expr], agg: &Aggregate) -> Result> { let agg_cols = agg_cols(agg); let mut fields = vec![]; for expr in exprs { @@ -743,7 +742,7 @@ fn exprlist_to_fields_aggregate( // resolve against schema of input to aggregate fields.push(expr.to_field(agg.input.schema())?); } - _ => fields.push(expr.to_field(plan.schema())?), + _ => fields.push(expr.to_field(&agg.schema)?), } } Ok(fields) @@ -760,15 +759,7 @@ pub fn exprlist_to_fields<'a>( // `GROUPING(person.state)` so in order to resolve `person.state` in this case we need to // look at the input to the aggregate instead. let fields = match plan { - LogicalPlan::Aggregate(agg) => { - Some(exprlist_to_fields_aggregate(&exprs, plan, agg)) - } - LogicalPlan::Window(window) => match window.input.as_ref() { - LogicalPlan::Aggregate(agg) => { - Some(exprlist_to_fields_aggregate(&exprs, plan, agg)) - } - _ => None, - }, + LogicalPlan::Aggregate(agg) => Some(exprlist_to_fields_aggregate(&exprs, agg)), _ => None, }; if let Some(fields) = fields { @@ -1240,10 +1231,9 @@ pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> DFSchema { #[cfg(test)] mod tests { use super::*; - use crate::expr_vec_fmt; use crate::{ - col, cube, expr, grouping_set, lit, rollup, AggregateFunction, WindowFrame, - WindowFunction, + col, cube, expr, expr_vec_fmt, grouping_set, lit, rollup, AggregateFunction, + WindowFrame, WindowFunction, }; #[test] diff --git a/datafusion/optimizer/src/optimize_projections.rs b/datafusion/optimizer/src/optimize_projections.rs index 8bee2951541d..7ae9f7edf5e5 100644 --- a/datafusion/optimizer/src/optimize_projections.rs +++ b/datafusion/optimizer/src/optimize_projections.rs @@ -15,33 +15,42 @@ // specific language governing permissions and limitations // under the License. -//! Optimizer rule to prune unnecessary Columns from the intermediate schemas inside the [LogicalPlan]. -//! This rule -//! - Removes unnecessary columns that are not showed at the output, and that are not used during computation. -//! - Adds projection to decrease table column size before operators that benefits from less memory at its input. -//! - Removes unnecessary [LogicalPlan::Projection] from the [LogicalPlan]. +//! Optimizer rule to prune unnecessary columns from intermediate schemas +//! inside the [`LogicalPlan`]. This rule: +//! - Removes unnecessary columns that do not appear at the output and/or are +//! not used during any computation step. +//! - Adds projections to decrease table column size before operators that +//! benefit from a smaller memory footprint at its input. +//! - Removes unnecessary [`LogicalPlan::Projection`]s from the [`LogicalPlan`]. + +use std::collections::HashSet; +use std::sync::Arc; + use crate::optimizer::ApplyOrder; -use datafusion_common::{Column, DFSchema, DFSchemaRef, JoinType, Result}; -use datafusion_expr::expr::{Alias, ScalarFunction}; +use crate::{OptimizerConfig, OptimizerRule}; + +use arrow::datatypes::SchemaRef; +use datafusion_common::{ + get_required_group_by_exprs_indices, Column, DFSchema, DFSchemaRef, JoinType, Result, +}; +use datafusion_expr::expr::{Alias, ScalarFunction, ScalarFunctionDefinition}; use datafusion_expr::{ logical_plan::LogicalPlan, projection_schema, Aggregate, BinaryExpr, Cast, Distinct, - Expr, Projection, ScalarFunctionDefinition, TableScan, Window, + Expr, GroupingSet, Projection, TableScan, Window, }; + use hashbrown::HashMap; use itertools::{izip, Itertools}; -use std::collections::HashSet; -use std::sync::Arc; - -use crate::{OptimizerConfig, OptimizerRule}; -/// A rule for optimizing logical plans by removing unused Columns/Fields. +/// A rule for optimizing logical plans by removing unused columns/fields. /// -/// `OptimizeProjections` is an optimizer rule that identifies and eliminates columns from a logical plan -/// that are not used in any downstream operations. This can improve query performance and reduce unnecessary -/// data processing. +/// `OptimizeProjections` is an optimizer rule that identifies and eliminates +/// columns from a logical plan that are not used by downstream operations. +/// This can improve query performance and reduce unnecessary data processing. /// -/// The rule analyzes the input logical plan, determines the necessary column indices, and then removes any -/// unnecessary columns. Additionally, it eliminates any unnecessary projections in the plan. +/// The rule analyzes the input logical plan, determines the necessary column +/// indices, and then removes any unnecessary columns. It also removes any +/// unnecessary projections from the plan tree. #[derive(Default)] pub struct OptimizeProjections {} @@ -58,8 +67,8 @@ impl OptimizerRule for OptimizeProjections { plan: &LogicalPlan, config: &dyn OptimizerConfig, ) -> Result> { - // All of the fields at the output are necessary. - let indices = require_all_indices(plan); + // All output fields are necessary: + let indices = (0..plan.schema().fields().len()).collect::>(); optimize_projections(plan, config, &indices) } @@ -72,30 +81,35 @@ impl OptimizerRule for OptimizeProjections { } } -/// Removes unnecessary columns (e.g Columns that are not referred at the output schema and -/// Columns that are not used during any computation, expression evaluation) from the logical plan and its inputs. +/// Removes unnecessary columns (e.g. columns that do not appear in the output +/// schema and/or are not used during any computation step such as expression +/// evaluation) from the logical plan and its inputs. /// -/// # Arguments +/// # Parameters /// -/// - `plan`: A reference to the input `LogicalPlan` to be optimized. -/// - `_config`: A reference to the optimizer configuration (not currently used). -/// - `indices`: A slice of column indices that represent the necessary column indices for downstream operations. +/// - `plan`: A reference to the input `LogicalPlan` to optimize. +/// - `config`: A reference to the optimizer configuration. +/// - `indices`: A slice of column indices that represent the necessary column +/// indices for downstream operations. /// /// # Returns /// -/// - `Ok(Some(LogicalPlan))`: An optimized `LogicalPlan` with unnecessary columns removed. -/// - `Ok(None)`: If the optimization process results in a logical plan that doesn't require further propagation. -/// - `Err(error)`: If an error occurs during the optimization process. +/// A `Result` object with the following semantics: +/// +/// - `Ok(Some(LogicalPlan))`: An optimized `LogicalPlan` without unnecessary +/// columns. +/// - `Ok(None)`: Signal that the given logical plan did not require any change. +/// - `Err(error)`: An error occured during the optimization process. fn optimize_projections( plan: &LogicalPlan, - _config: &dyn OptimizerConfig, + config: &dyn OptimizerConfig, indices: &[usize], ) -> Result> { // `child_required_indices` stores // - indices of the columns required for each child // - a flag indicating whether putting a projection above children is beneficial for the parent. // As an example LogicalPlan::Filter benefits from small tables. Hence for filter child this flag would be `true`. - let child_required_indices: Option, bool)>> = match plan { + let child_required_indices: Vec<(Vec, bool)> = match plan { LogicalPlan::Sort(_) | LogicalPlan::Filter(_) | LogicalPlan::Repartition(_) @@ -103,36 +117,32 @@ fn optimize_projections( | LogicalPlan::Union(_) | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Distinct(Distinct::On(_)) => { - // Re-route required indices from the parent + column indices referred by expressions in the plan - // to the child. - // All of these operators benefits from small tables at their inputs. Hence projection_beneficial flag is `true`. + // Pass index requirements from the parent as well as column indices + // that appear in this plan's expressions to its child. All these + // operators benefit from "small" inputs, so the projection_beneficial + // flag is `true`. let exprs = plan.expressions(); - let child_req_indices = plan - .inputs() + plan.inputs() .into_iter() .map(|input| { - let required_indices = - get_all_required_indices(indices, input, exprs.iter())?; - Ok((required_indices, true)) + get_all_required_indices(indices, input, exprs.iter()) + .map(|idxs| (idxs, true)) }) - .collect::>>()?; - Some(child_req_indices) + .collect::>()? } LogicalPlan::Limit(_) | LogicalPlan::Prepare(_) => { - // Re-route required indices from the parent + column indices referred by expressions in the plan - // to the child. - // Limit, Prepare doesn't benefit from small column numbers. Hence projection_beneficial flag is `false`. + // Pass index requirements from the parent as well as column indices + // that appear in this plan's expressions to its child. These operators + // do not benefit from "small" inputs, so the projection_beneficial + // flag is `false`. let exprs = plan.expressions(); - let child_req_indices = plan - .inputs() + plan.inputs() .into_iter() .map(|input| { - let required_indices = - get_all_required_indices(indices, input, exprs.iter())?; - Ok((required_indices, false)) + get_all_required_indices(indices, input, exprs.iter()) + .map(|idxs| (idxs, false)) }) - .collect::>>()?; - Some(child_req_indices) + .collect::>()? } LogicalPlan::Copy(_) | LogicalPlan::Ddl(_) @@ -141,81 +151,99 @@ fn optimize_projections( | LogicalPlan::Analyze(_) | LogicalPlan::Subquery(_) | LogicalPlan::Distinct(Distinct::All(_)) => { - // Require all of the fields of the Dml, Ddl, Copy, Explain, Analyze, Subquery, Distinct::All input(s). - // Their child plan can be treated as final plan. Otherwise expected schema may not match. - // TODO: For some subquery variants we may not need to require all indices for its input. - // such as Exists. - let child_requirements = plan - .inputs() + // These plans require all their fields, and their children should + // be treated as final plans -- otherwise, we may have schema a + // mismatch. + // TODO: For some subquery variants (e.g. a subquery arising from an + // EXISTS expression), we may not need to require all indices. + plan.inputs() .iter() - .map(|input| { - // Require all of the fields for each input. - // No projection since all of the fields at the child is required - (require_all_indices(input), false) - }) - .collect::>(); - Some(child_requirements) + .map(|input| ((0..input.schema().fields().len()).collect_vec(), false)) + .collect::>() } LogicalPlan::EmptyRelation(_) | LogicalPlan::Statement(_) | LogicalPlan::Values(_) | LogicalPlan::Extension(_) | LogicalPlan::DescribeTable(_) => { - // EmptyRelation, Values, DescribeTable, Statement has no inputs stop iteration - - // TODO: Add support for extension - // It is not known how to direct requirements to children for LogicalPlan::Extension. - // Safest behaviour is to stop propagation. - None + // These operators have no inputs, so stop the optimization process. + // TODO: Add support for `LogicalPlan::Extension`. + return Ok(None); } LogicalPlan::Projection(proj) => { return if let Some(proj) = merge_consecutive_projections(proj)? { - rewrite_projection_given_requirements(&proj, _config, indices)? - .map(|res| Ok(Some(res))) - // Even if projection cannot be optimized, return merged version - .unwrap_or_else(|| Ok(Some(LogicalPlan::Projection(proj)))) + Ok(Some( + rewrite_projection_given_requirements(&proj, config, indices)? + // Even if we cannot optimize the projection, merge if possible: + .unwrap_or_else(|| LogicalPlan::Projection(proj)), + )) } else { - rewrite_projection_given_requirements(proj, _config, indices) + rewrite_projection_given_requirements(proj, config, indices) }; } LogicalPlan::Aggregate(aggregate) => { - // Split parent requirements to group by and aggregate sections - let group_expr_len = aggregate.group_expr_len()?; - let (_group_by_reqs, mut aggregate_reqs): (Vec, Vec) = - indices.iter().partition(|&&idx| idx < group_expr_len); - // Offset aggregate indices so that they point to valid indices at the `aggregate.aggr_expr` - aggregate_reqs - .iter_mut() - .for_each(|idx| *idx -= group_expr_len); - - // Group by expressions are same - let new_group_bys = aggregate.group_expr.clone(); - - // Only use absolutely necessary aggregate expressions required by parent. + // Split parent requirements to GROUP BY and aggregate sections: + let n_group_exprs = aggregate.group_expr_len()?; + let (group_by_reqs, mut aggregate_reqs): (Vec, Vec) = + indices.iter().partition(|&&idx| idx < n_group_exprs); + // Offset aggregate indices so that they point to valid indices at + // `aggregate.aggr_expr`: + for idx in aggregate_reqs.iter_mut() { + *idx -= n_group_exprs; + } + + // Get absolutely necessary GROUP BY fields: + let group_by_expr_existing = aggregate + .group_expr + .iter() + .map(|group_by_expr| group_by_expr.display_name()) + .collect::>>()?; + let new_group_bys = if let Some(simplest_groupby_indices) = + get_required_group_by_exprs_indices( + aggregate.input.schema(), + &group_by_expr_existing, + ) { + // Some of the fields in the GROUP BY may be required by the + // parent even if these fields are unnecessary in terms of + // functional dependency. + let required_indices = + merge_slices(&simplest_groupby_indices, &group_by_reqs); + get_at_indices(&aggregate.group_expr, &required_indices) + } else { + aggregate.group_expr.clone() + }; + + // Only use the absolutely necessary aggregate expressions required + // by the parent: let mut new_aggr_expr = get_at_indices(&aggregate.aggr_expr, &aggregate_reqs); let all_exprs_iter = new_group_bys.iter().chain(new_aggr_expr.iter()); - let necessary_indices = - indices_referred_by_exprs(&aggregate.input, all_exprs_iter)?; + let schema = aggregate.input.schema(); + let necessary_indices = indices_referred_by_exprs(schema, all_exprs_iter)?; let aggregate_input = if let Some(input) = - optimize_projections(&aggregate.input, _config, &necessary_indices)? + optimize_projections(&aggregate.input, config, &necessary_indices)? { input } else { aggregate.input.as_ref().clone() }; - // Simplify input of the aggregation by adding a projection so that its input only contains - // absolutely necessary columns for the aggregate expressions. Please no that we use aggregate.input.schema() - // because necessary_indices refers to fields in this schema. - let necessary_exprs = - get_required_exprs(aggregate.input.schema(), &necessary_indices); - let (aggregate_input, _is_added) = - add_projection_on_top_if_helpful(aggregate_input, necessary_exprs, true)?; - - // Aggregate always needs at least one aggregate expression. - // With a nested count we don't require any column as input, but still need to create a correct aggregate - // The aggregate may be optimized out later (select count(*) from (select count(*) from [...]) always returns 1 + // Simplify the input of the aggregation by adding a projection so + // that its input only contains absolutely necessary columns for + // the aggregate expressions. Note that necessary_indices refer to + // fields in `aggregate.input.schema()`. + let necessary_exprs = get_required_exprs(schema, &necessary_indices); + let (aggregate_input, _) = + add_projection_on_top_if_helpful(aggregate_input, necessary_exprs)?; + + // Aggregations always need at least one aggregate expression. + // With a nested count, we don't require any column as input, but + // still need to create a correct aggregate, which may be optimized + // out later. As an example, consider the following query: + // + // SELECT COUNT(*) FROM (SELECT COUNT(*) FROM [...]) + // + // which always returns 1. if new_aggr_expr.is_empty() && new_group_bys.is_empty() && !aggregate.aggr_expr.is_empty() @@ -223,7 +251,8 @@ fn optimize_projections( new_aggr_expr = vec![aggregate.aggr_expr[0].clone()]; } - // Create new aggregate plan with updated input, and absolutely necessary fields. + // Create a new aggregate plan with the updated input and only the + // absolutely necessary fields: return Aggregate::try_new( Arc::new(aggregate_input), new_group_bys, @@ -232,43 +261,48 @@ fn optimize_projections( .map(|aggregate| Some(LogicalPlan::Aggregate(aggregate))); } LogicalPlan::Window(window) => { - // Split parent requirements to child and window expression sections. + // Split parent requirements to child and window expression sections: let n_input_fields = window.input.schema().fields().len(); let (child_reqs, mut window_reqs): (Vec, Vec) = indices.iter().partition(|&&idx| idx < n_input_fields); - // Offset window expr indices so that they point to valid indices at the `window.window_expr` - window_reqs - .iter_mut() - .for_each(|idx| *idx -= n_input_fields); + // Offset window expression indices so that they point to valid + // indices at `window.window_expr`: + for idx in window_reqs.iter_mut() { + *idx -= n_input_fields; + } - // Only use window expressions that are absolutely necessary by parent requirements. + // Only use window expressions that are absolutely necessary according + // to parent requirements: let new_window_expr = get_at_indices(&window.window_expr, &window_reqs); - // All of the required column indices at the input of the window by parent, and window expression requirements. + // Get all the required column indices at the input, either by the + // parent or window expression requirements. let required_indices = get_all_required_indices( &child_reqs, &window.input, new_window_expr.iter(), )?; let window_child = if let Some(new_window_child) = - optimize_projections(&window.input, _config, &required_indices)? + optimize_projections(&window.input, config, &required_indices)? { new_window_child } else { window.input.as_ref().clone() }; - // When no window expression is necessary, just use window input. (Remove window operator) + return if new_window_expr.is_empty() { + // When no window expression is necessary, use the input directly: Ok(Some(window_child)) } else { // Calculate required expressions at the input of the window. - // Please note that we use `old_child`, because `required_indices` refers to `old_child`. + // Please note that we use `old_child`, because `required_indices` + // refers to `old_child`. let required_exprs = get_required_exprs(window.input.schema(), &required_indices); - let (window_child, _is_added) = - add_projection_on_top_if_helpful(window_child, required_exprs, true)?; - let window = Window::try_new(new_window_expr, Arc::new(window_child))?; - Ok(Some(LogicalPlan::Window(window))) + let (window_child, _) = + add_projection_on_top_if_helpful(window_child, required_exprs)?; + Window::try_new(new_window_expr, Arc::new(window_child)) + .map(|window| Some(LogicalPlan::Window(window))) }; } LogicalPlan::Join(join) => { @@ -280,136 +314,137 @@ fn optimize_projections( get_all_required_indices(&left_req_indices, &join.left, exprs.iter())?; let right_indices = get_all_required_indices(&right_req_indices, &join.right, exprs.iter())?; - // Join benefits from small columns numbers at its input (decreases memory usage) - // Hence each child benefits from projection. - Some(vec![(left_indices, true), (right_indices, true)]) + // Joins benefit from "small" input tables (lower memory usage). + // Therefore, each child benefits from projection: + vec![(left_indices, true), (right_indices, true)] } LogicalPlan::CrossJoin(cross_join) => { let left_len = cross_join.left.schema().fields().len(); let (left_child_indices, right_child_indices) = split_join_requirements(left_len, indices, &JoinType::Inner); - // Join benefits from small columns numbers at its input (decreases memory usage) - // Hence each child benefits from projection. - Some(vec![ - (left_child_indices, true), - (right_child_indices, true), - ]) + // Joins benefit from "small" input tables (lower memory usage). + // Therefore, each child benefits from projection: + vec![(left_child_indices, true), (right_child_indices, true)] } LogicalPlan::TableScan(table_scan) => { - let projection_fields = table_scan.projected_schema.fields(); let schema = table_scan.source.schema(); - // We expect to find all of the required indices of the projected schema fields. - // among original schema. If at least one of them cannot be found. Use all of the fields in the file. - // (No projection at the source) - let projection = indices - .iter() - .map(|&idx| { - schema.fields().iter().position(|field_source| { - projection_fields[idx].field() == field_source - }) - }) - .collect::>>(); + // Get indices referred to in the original (schema with all fields) + // given projected indices. + let projection = with_indices(&table_scan.projection, schema, |map| { + indices.iter().map(|&idx| map[idx]).collect() + }); - return Ok(Some(LogicalPlan::TableScan(TableScan::try_new( + return TableScan::try_new( table_scan.table_name.clone(), table_scan.source.clone(), - projection, + Some(projection), table_scan.filters.clone(), table_scan.fetch, - )?))); + ) + .map(|table| Some(LogicalPlan::TableScan(table))); } }; - let child_required_indices = - if let Some(child_required_indices) = child_required_indices { - child_required_indices - } else { - // Stop iteration, cannot propagate requirement down below this operator. - return Ok(None); - }; - let new_inputs = izip!(child_required_indices, plan.inputs().into_iter()) .map(|((required_indices, projection_beneficial), child)| { - let (input, mut is_changed) = if let Some(new_input) = - optimize_projections(child, _config, &required_indices)? + let (input, is_changed) = if let Some(new_input) = + optimize_projections(child, config, &required_indices)? { (new_input, true) } else { (child.clone(), false) }; let project_exprs = get_required_exprs(child.schema(), &required_indices); - let (input, is_projection_added) = add_projection_on_top_if_helpful( - input, - project_exprs, - projection_beneficial, - )?; - is_changed |= is_projection_added; - Ok(is_changed.then_some(input)) + let (input, proj_added) = if projection_beneficial { + add_projection_on_top_if_helpful(input, project_exprs)? + } else { + (input, false) + }; + Ok((is_changed || proj_added).then_some(input)) }) - .collect::>>>()?; - // All of the children are same in this case, no need to change plan + .collect::>>()?; if new_inputs.iter().all(|child| child.is_none()) { + // All children are the same in this case, no need to change the plan: Ok(None) } else { - // At least one of the children is changed. + // At least one of the children is changed: let new_inputs = izip!(new_inputs, plan.inputs()) - // If new_input is `None`, this means child is not changed. Hence use `old_child` during construction. + // If new_input is `None`, this means child is not changed, so use + // `old_child` during construction: .map(|(new_input, old_child)| new_input.unwrap_or_else(|| old_child.clone())) .collect::>(); - let res = plan.with_new_inputs(&new_inputs)?; - Ok(Some(res)) + plan.with_new_inputs(&new_inputs).map(Some) } } -/// Merge Consecutive Projections +/// This function applies the given function `f` to the projection indices +/// `proj_indices` if they exist. Otherwise, applies `f` to a default set +/// of indices according to `schema`. +fn with_indices( + proj_indices: &Option>, + schema: SchemaRef, + mut f: F, +) -> Vec +where + F: FnMut(&[usize]) -> Vec, +{ + match proj_indices { + Some(indices) => f(indices.as_slice()), + None => { + let range: Vec = (0..schema.fields.len()).collect(); + f(range.as_slice()) + } + } +} + +/// Merges consecutive projections. /// /// Given a projection `proj`, this function attempts to merge it with a previous -/// projection if it exists and if the merging is beneficial. Merging is considered -/// beneficial when expressions in the current projection are non-trivial and referred to -/// more than once in its input fields. This can act as a caching mechanism for non-trivial -/// computations. +/// projection if it exists and if merging is beneficial. Merging is considered +/// beneficial when expressions in the current projection are non-trivial and +/// appear more than once in its input fields. This can act as a caching mechanism +/// for non-trivial computations. /// -/// # Arguments +/// # Parameters /// /// * `proj` - A reference to the `Projection` to be merged. /// /// # Returns /// -/// A `Result` containing an `Option` of the merged `Projection`. If merging is not beneficial -/// it returns `Ok(None)`. +/// A `Result` object with the following semantics: +/// +/// - `Ok(Some(Projection))`: Merge was beneficial and successful. Contains the +/// merged projection. +/// - `Ok(None)`: Signals that merge is not beneficial (and has not taken place). +/// - `Err(error)`: An error occured during the function call. fn merge_consecutive_projections(proj: &Projection) -> Result> { - let prev_projection = if let LogicalPlan::Projection(prev) = proj.input.as_ref() { - prev - } else { + let LogicalPlan::Projection(prev_projection) = proj.input.as_ref() else { return Ok(None); }; - // Count usages (referral counts) of each projection expression in its input fields - let column_referral_map: HashMap = proj - .expr - .iter() - .flat_map(|expr| expr.to_columns()) - .fold(HashMap::new(), |mut map, cols| { - cols.into_iter() - .for_each(|col| *map.entry(col).or_default() += 1); - map - }); - - // Merging these projections is not beneficial, e.g - // If an expression is not trivial and it is referred more than 1, consecutive projections will be - // beneficial as caching mechanism for non-trivial computations. - // See discussion in: https://github.com/apache/arrow-datafusion/issues/8296 - if column_referral_map.iter().any(|(col, usage)| { - *usage > 1 + // Count usages (referrals) of each projection expression in its input fields: + let mut column_referral_map = HashMap::::new(); + for columns in proj.expr.iter().flat_map(|expr| expr.to_columns()) { + for col in columns.into_iter() { + *column_referral_map.entry(col.clone()).or_default() += 1; + } + } + + // If an expression is non-trivial and appears more than once, consecutive + // projections will benefit from a compute-once approach. For details, see: + // https://github.com/apache/arrow-datafusion/issues/8296 + if column_referral_map.into_iter().any(|(col, usage)| { + usage > 1 && !is_expr_trivial( &prev_projection.expr - [prev_projection.schema.index_of_column(col).unwrap()], + [prev_projection.schema.index_of_column(&col).unwrap()], ) }) { return Ok(None); } - // If all of the expression of the top projection can be rewritten. Rewrite expressions and create a new projection + // If all the expression of the top projection can be rewritten, do so and + // create a new projection: let new_exprs = proj .expr .iter() @@ -429,183 +464,252 @@ fn merge_consecutive_projections(proj: &Projection) -> Result } } -/// Trim Expression -/// -/// Trim the given expression by removing any unnecessary layers of abstraction. +/// Trim the given expression by removing any unnecessary layers of aliasing. /// If the expression is an alias, the function returns the underlying expression. -/// Otherwise, it returns the original expression unchanged. -/// -/// # Arguments +/// Otherwise, it returns the given expression as is. /// -/// * `expr` - The input expression to be trimmed. +/// Without trimming, we can end up with unnecessary indirections inside expressions +/// during projection merges. /// -/// # Returns -/// -/// The trimmed expression. If the input is an alias, the underlying expression is returned. -/// -/// Without trimming, during projection merge we can end up unnecessary indirections inside the expressions. /// Consider: /// -/// Projection (a1 + b1 as sum1) -/// --Projection (a as a1, b as b1) -/// ----Source (a, b) +/// ```text +/// Projection(a1 + b1 as sum1) +/// --Projection(a as a1, b as b1) +/// ----Source(a, b) +/// ``` /// -/// After merge we want to produce +/// After merge, we want to produce: /// -/// Projection (a + b as sum1) +/// ```text +/// Projection(a + b as sum1) /// --Source(a, b) +/// ``` /// -/// Without trimming we would end up +/// Without trimming, we would end up with: /// -/// Projection (a as a1 + b as b1 as sum1) +/// ```text +/// Projection((a as a1 + b as b1) as sum1) /// --Source(a, b) +/// ``` fn trim_expr(expr: Expr) -> Expr { match expr { - Expr::Alias(alias) => *alias.expr, + Expr::Alias(alias) => trim_expr(*alias.expr), _ => expr, } } -// Check whether expression is trivial (e.g it doesn't include computation.) +// Check whether `expr` is trivial; i.e. it doesn't imply any computation. fn is_expr_trivial(expr: &Expr) -> bool { matches!(expr, Expr::Column(_) | Expr::Literal(_)) } -// Exit early when None is seen. +// Exit early when there is no rewrite to do. macro_rules! rewrite_expr_with_check { ($expr:expr, $input:expr) => { - if let Some(val) = rewrite_expr($expr, $input)? { - val + if let Some(value) = rewrite_expr($expr, $input)? { + value } else { return Ok(None); } }; } -// Rewrites expression using its input projection (Merges consecutive projection expressions). -/// Rewrites an projections expression using its input projection -/// (Helper during merging consecutive projection expressions). +/// Rewrites a projection expression using the projection before it (i.e. its input) +/// This is a subroutine to the `merge_consecutive_projections` function. /// -/// # Arguments +/// # Parameters /// -/// * `expr` - A reference to the expression to be rewritten. -/// * `input` - A reference to the input (itself a projection) of the projection expression. +/// * `expr` - A reference to the expression to rewrite. +/// * `input` - A reference to the input of the projection expression (itself +/// a projection). /// /// # Returns /// -/// A `Result` containing an `Option` of the rewritten expression. If the rewrite is successful, -/// it returns `Ok(Some)` with the modified expression. If the expression cannot be rewritten -/// it returns `Ok(None)`. +/// A `Result` object with the following semantics: +/// +/// - `Ok(Some(Expr))`: Rewrite was successful. Contains the rewritten result. +/// - `Ok(None)`: Signals that `expr` can not be rewritten. +/// - `Err(error)`: An error occured during the function call. fn rewrite_expr(expr: &Expr, input: &Projection) -> Result> { - Ok(match expr { + let result = match expr { Expr::Column(col) => { - // Find index of column + // Find index of column: let idx = input.schema.index_of_column(col)?; - Some(input.expr[idx].clone()) + input.expr[idx].clone() } - Expr::BinaryExpr(binary) => { - let lhs = trim_expr(rewrite_expr_with_check!(&binary.left, input)); - let rhs = trim_expr(rewrite_expr_with_check!(&binary.right, input)); - Some(Expr::BinaryExpr(BinaryExpr::new( - Box::new(lhs), - binary.op, - Box::new(rhs), - ))) - } - Expr::Alias(alias) => { - let new_expr = trim_expr(rewrite_expr_with_check!(&alias.expr, input)); - Some(Expr::Alias(Alias::new( - new_expr, - alias.relation.clone(), - alias.name.clone(), - ))) - } - Expr::Literal(_val) => Some(expr.clone()), + Expr::BinaryExpr(binary) => Expr::BinaryExpr(BinaryExpr::new( + Box::new(trim_expr(rewrite_expr_with_check!(&binary.left, input))), + binary.op, + Box::new(trim_expr(rewrite_expr_with_check!(&binary.right, input))), + )), + Expr::Alias(alias) => Expr::Alias(Alias::new( + trim_expr(rewrite_expr_with_check!(&alias.expr, input)), + alias.relation.clone(), + alias.name.clone(), + )), + Expr::Literal(_) => expr.clone(), Expr::Cast(cast) => { let new_expr = rewrite_expr_with_check!(&cast.expr, input); - Some(Expr::Cast(Cast::new( - Box::new(new_expr), - cast.data_type.clone(), - ))) + Expr::Cast(Cast::new(Box::new(new_expr), cast.data_type.clone())) } Expr::ScalarFunction(scalar_fn) => { - let fun = if let ScalarFunctionDefinition::BuiltIn(fun) = scalar_fn.func_def { - fun - } else { + // TODO: Support UDFs. + let ScalarFunctionDefinition::BuiltIn(fun) = scalar_fn.func_def else { return Ok(None); }; - scalar_fn + return Ok(scalar_fn .args .iter() .map(|expr| rewrite_expr(expr, input)) - .collect::>>>()? - .map(|new_args| Expr::ScalarFunction(ScalarFunction::new(fun, new_args))) + .collect::>>()? + .map(|new_args| { + Expr::ScalarFunction(ScalarFunction::new(fun, new_args)) + })); } - _ => { - // Unsupported type to merge in consecutive projections - None - } - }) + // Unsupported type for consecutive projection merge analysis. + _ => return Ok(None), + }; + Ok(Some(result)) } -/// Retrieves a set of outer-referenced columns from an expression. -/// Please note that `expr.to_columns()` API doesn't return these columns. +/// Retrieves a set of outer-referenced columns by the given expression, `expr`. +/// Note that the `Expr::to_columns()` function doesn't return these columns. /// -/// # Arguments +/// # Parameters /// -/// * `expr` - The expression to be analyzed for outer-referenced columns. +/// * `expr` - The expression to analyze for outer-referenced columns. /// /// # Returns /// -/// A `HashSet` containing columns that are referenced by the expression. -fn outer_columns(expr: &Expr) -> HashSet { +/// If the function can safely infer all outer-referenced columns, returns a +/// `Some(HashSet)` containing these columns. Otherwise, returns `None`. +fn outer_columns(expr: &Expr) -> Option> { let mut columns = HashSet::new(); - outer_columns_helper(expr, &mut columns); - columns + outer_columns_helper(expr, &mut columns).then_some(columns) } -/// Helper function to accumulate outer-referenced columns referred by the `expr`. +/// A recursive subroutine that accumulates outer-referenced columns by the +/// given expression, `expr`. /// -/// # Arguments +/// # Parameters /// -/// * `expr` - The expression to be analyzed for outer-referenced columns. -/// * `columns` - A mutable reference to a `HashSet` where the detected columns are collected. -fn outer_columns_helper(expr: &Expr, columns: &mut HashSet) { +/// * `expr` - The expression to analyze for outer-referenced columns. +/// * `columns` - A mutable reference to a `HashSet` where detected +/// columns are collected. +/// +/// Returns `true` if it can safely collect all outer-referenced columns. +/// Otherwise, returns `false`. +fn outer_columns_helper(expr: &Expr, columns: &mut HashSet) -> bool { match expr { Expr::OuterReferenceColumn(_, col) => { columns.insert(col.clone()); + true } Expr::BinaryExpr(binary_expr) => { - outer_columns_helper(&binary_expr.left, columns); - outer_columns_helper(&binary_expr.right, columns); + outer_columns_helper(&binary_expr.left, columns) + && outer_columns_helper(&binary_expr.right, columns) } Expr::ScalarSubquery(subquery) => { - for expr in &subquery.outer_ref_columns { - outer_columns_helper(expr, columns); - } + let exprs = subquery.outer_ref_columns.iter(); + outer_columns_helper_multi(exprs, columns) } Expr::Exists(exists) => { - for expr in &exists.subquery.outer_ref_columns { - outer_columns_helper(expr, columns); + let exprs = exists.subquery.outer_ref_columns.iter(); + outer_columns_helper_multi(exprs, columns) + } + Expr::Alias(alias) => outer_columns_helper(&alias.expr, columns), + Expr::InSubquery(insubquery) => { + let exprs = insubquery.subquery.outer_ref_columns.iter(); + outer_columns_helper_multi(exprs, columns) + } + Expr::IsNotNull(expr) | Expr::IsNull(expr) => outer_columns_helper(expr, columns), + Expr::Cast(cast) => outer_columns_helper(&cast.expr, columns), + Expr::Sort(sort) => outer_columns_helper(&sort.expr, columns), + Expr::AggregateFunction(aggregate_fn) => { + outer_columns_helper_multi(aggregate_fn.args.iter(), columns) + && aggregate_fn + .order_by + .as_ref() + .map_or(true, |obs| outer_columns_helper_multi(obs.iter(), columns)) + && aggregate_fn + .filter + .as_ref() + .map_or(true, |filter| outer_columns_helper(filter, columns)) + } + Expr::WindowFunction(window_fn) => { + outer_columns_helper_multi(window_fn.args.iter(), columns) + && outer_columns_helper_multi(window_fn.order_by.iter(), columns) + && outer_columns_helper_multi(window_fn.partition_by.iter(), columns) + } + Expr::GroupingSet(groupingset) => match groupingset { + GroupingSet::GroupingSets(multi_exprs) => multi_exprs + .iter() + .all(|e| outer_columns_helper_multi(e.iter(), columns)), + GroupingSet::Cube(exprs) | GroupingSet::Rollup(exprs) => { + outer_columns_helper_multi(exprs.iter(), columns) } + }, + Expr::ScalarFunction(scalar_fn) => { + outer_columns_helper_multi(scalar_fn.args.iter(), columns) } - Expr::Alias(alias) => { - outer_columns_helper(&alias.expr, columns); + Expr::Like(like) => { + outer_columns_helper(&like.expr, columns) + && outer_columns_helper(&like.pattern, columns) } - _ => {} + Expr::InList(in_list) => { + outer_columns_helper(&in_list.expr, columns) + && outer_columns_helper_multi(in_list.list.iter(), columns) + } + Expr::Case(case) => { + let when_then_exprs = case + .when_then_expr + .iter() + .flat_map(|(first, second)| [first.as_ref(), second.as_ref()]); + outer_columns_helper_multi(when_then_exprs, columns) + && case + .expr + .as_ref() + .map_or(true, |expr| outer_columns_helper(expr, columns)) + && case + .else_expr + .as_ref() + .map_or(true, |expr| outer_columns_helper(expr, columns)) + } + Expr::Column(_) | Expr::Literal(_) | Expr::Wildcard { .. } => true, + _ => false, } } -/// Generates the required expressions(Column) that resides at `indices` of the `input_schema`. +/// A recursive subroutine that accumulates outer-referenced columns by the +/// given expressions (`exprs`). +/// +/// # Parameters +/// +/// * `exprs` - The expressions to analyze for outer-referenced columns. +/// * `columns` - A mutable reference to a `HashSet` where detected +/// columns are collected. +/// +/// Returns `true` if it can safely collect all outer-referenced columns. +/// Otherwise, returns `false`. +fn outer_columns_helper_multi<'a>( + mut exprs: impl Iterator, + columns: &mut HashSet, +) -> bool { + exprs.all(|e| outer_columns_helper(e, columns)) +} + +/// Generates the required expressions (columns) that reside at `indices` of +/// the given `input_schema`. /// /// # Arguments /// /// * `input_schema` - A reference to the input schema. -/// * `indices` - A slice of `usize` indices specifying which columns are required. +/// * `indices` - A slice of `usize` indices specifying required columns. /// /// # Returns /// -/// A vector of `Expr::Column` expressions, that sits at `indices` of the `input_schema`. +/// A vector of `Expr::Column` expressions residing at `indices` of the `input_schema`. fn get_required_exprs(input_schema: &Arc, indices: &[usize]) -> Vec { let fields = input_schema.fields(); indices @@ -614,58 +718,70 @@ fn get_required_exprs(input_schema: &Arc, indices: &[usize]) -> Vec>( - input: &LogicalPlan, - exprs: I, +/// A [`Result`] object containing the indices of all required fields in +/// `input_schema` to calculate all `exprs` successfully. +fn indices_referred_by_exprs<'a>( + input_schema: &DFSchemaRef, + exprs: impl Iterator, ) -> Result> { - let new_indices = exprs - .flat_map(|expr| indices_referred_by_expr(input.schema(), expr)) + let indices = exprs + .map(|expr| indices_referred_by_expr(input_schema, expr)) + .collect::>>()?; + Ok(indices + .into_iter() .flatten() - // Make sure no duplicate entries exists and indices are ordered. + // Make sure no duplicate entries exist and indices are ordered: .sorted() .dedup() - .collect::>(); - Ok(new_indices) + .collect()) } -/// Get indices of the necessary fields referred by the `expr` among input schema. +/// Get indices of the fields referred to by the given expression `expr` within +/// the given schema (`input_schema`). /// -/// # Arguments +/// # Parameters /// -/// * `input_schema`: The input schema to search for indices referred by expr. -/// * `expr`: An expression for which we want to find necessary field indices at the input schema. +/// * `input_schema`: The input schema to analyze for index requirements. +/// * `expr`: An expression for which we want to find necessary field indices. /// /// # Returns /// -/// A [Result] object that contains the required field indices of the `input_schema`, to be able to calculate -/// the `expr` successfully. +/// A [`Result`] object containing the indices of all required fields in +/// `input_schema` to calculate `expr` successfully. fn indices_referred_by_expr( input_schema: &DFSchemaRef, expr: &Expr, ) -> Result> { let mut cols = expr.to_columns()?; - // Get outer referenced columns (expr.to_columns() doesn't return these columns). - cols.extend(outer_columns(expr)); - cols.iter() - .filter(|&col| input_schema.has_column(col)) - .map(|col| input_schema.index_of_column(col)) - .collect::>>() + // Get outer-referenced columns: + if let Some(outer_cols) = outer_columns(expr) { + cols.extend(outer_cols); + } else { + // Expression is not known to contain outer columns or not. Hence, do + // not assume anything and require all the schema indices at the input: + return Ok((0..input_schema.fields().len()).collect()); + } + Ok(cols + .iter() + .flat_map(|col| input_schema.index_of_column(col)) + .collect()) } -/// Get all required indices for the input (indices required by parent + indices referred by `exprs`) +/// Gets all required indices for the input; i.e. those required by the parent +/// and those referred to by `exprs`. /// -/// # Arguments +/// # Parameters /// /// * `parent_required_indices` - A slice of indices required by the parent plan. /// * `input` - The input logical plan to analyze for index requirements. @@ -673,30 +789,28 @@ fn indices_referred_by_expr( /// /// # Returns /// -/// A `Result` containing a vector of `usize` indices containing all required indices. -fn get_all_required_indices<'a, I: Iterator>( +/// A `Result` containing a vector of `usize` indices containing all the required +/// indices. +fn get_all_required_indices<'a>( parent_required_indices: &[usize], input: &LogicalPlan, - exprs: I, + exprs: impl Iterator, ) -> Result> { - let referred_indices = indices_referred_by_exprs(input, exprs)?; - Ok(merge_vectors(parent_required_indices, &referred_indices)) + indices_referred_by_exprs(input.schema(), exprs) + .map(|indices| merge_slices(parent_required_indices, &indices)) } -/// Retrieves a list of expressions at specified indices from a slice of expressions. +/// Retrieves the expressions at specified indices within the given slice. Ignores +/// any invalid indices. /// -/// This function takes a slice of expressions `exprs` and a slice of `usize` indices `indices`. -/// It returns a new vector containing the expressions from `exprs` that correspond to the provided indices (with bound check). +/// # Parameters /// -/// # Arguments -/// -/// * `exprs` - A slice of expressions from which expressions are to be retrieved. -/// * `indices` - A slice of `usize` indices specifying the positions of the expressions to be retrieved. +/// * `exprs` - A slice of expressions to index into. +/// * `indices` - A slice of indices specifying the positions of expressions sought. /// /// # Returns /// -/// A vector of expressions that correspond to the specified indices. If any index is out of bounds, -/// the associated expression is skipped in the result. +/// A vector of expressions corresponding to specified indices. fn get_at_indices(exprs: &[Expr], indices: &[usize]) -> Vec { indices .iter() @@ -705,158 +819,148 @@ fn get_at_indices(exprs: &[Expr], indices: &[usize]) -> Vec { .collect() } -/// Merges two slices of `usize` values into a single vector with sorted (ascending) and deduplicated elements. -/// -/// # Arguments -/// -/// * `lhs` - The first slice of `usize` values to be merged. -/// * `rhs` - The second slice of `usize` values to be merged. -/// -/// # Returns -/// -/// A vector of `usize` values containing the merged, sorted, and deduplicated elements from `lhs` and `rhs`. -/// As an example merge of [3, 2, 4] and [3, 6, 1] will produce [1, 2, 3, 6] -fn merge_vectors(lhs: &[usize], rhs: &[usize]) -> Vec { - let mut merged = lhs.to_vec(); - merged.extend(rhs); - // Make sure to run sort before dedup. - // Dedup removes consecutive same entries - // If sort is run before it, all duplicates are removed. - merged.sort(); - merged.dedup(); - merged +/// Merges two slices into a single vector with sorted (ascending) and +/// deduplicated elements. For example, merging `[3, 2, 4]` and `[3, 6, 1]` +/// will produce `[1, 2, 3, 6]`. +fn merge_slices(left: &[T], right: &[T]) -> Vec { + // Make sure to sort before deduping, which removes the duplicates: + left.iter() + .cloned() + .chain(right.iter().cloned()) + .sorted() + .dedup() + .collect() } -/// Splits requirement indices for a join into left and right children based on the join type. +/// Splits requirement indices for a join into left and right children based on +/// the join type. /// -/// This function takes the length of the left child, a slice of requirement indices, and the type -/// of join (e.g., INNER, LEFT, RIGHT, etc.) as arguments. Depending on the join type, it divides -/// the requirement indices into those that apply to the left child and those that apply to the right child. +/// This function takes the length of the left child, a slice of requirement +/// indices, and the type of join (e.g. `INNER`, `LEFT`, `RIGHT`) as arguments. +/// Depending on the join type, it divides the requirement indices into those +/// that apply to the left child and those that apply to the right child. /// -/// - For INNER, LEFT, RIGHT, and FULL joins, the requirements are split between left and right children. -/// The right child indices are adjusted to point to valid positions in the right child by subtracting -/// the length of the left child. +/// - For `INNER`, `LEFT`, `RIGHT` and `FULL` joins, the requirements are split +/// between left and right children. The right child indices are adjusted to +/// point to valid positions within the right child by subtracting the length +/// of the left child. /// -/// - For LEFT ANTI, LEFT SEMI, RIGHT SEMI, and RIGHT ANTI joins, all requirements are re-routed to either -/// the left child or the right child directly, depending on the join type. +/// - For `LEFT ANTI`, `LEFT SEMI`, `RIGHT SEMI` and `RIGHT ANTI` joins, all +/// requirements are re-routed to either the left child or the right child +/// directly, depending on the join type. /// -/// # Arguments +/// # Parameters /// /// * `left_len` - The length of the left child. /// * `indices` - A slice of requirement indices. -/// * `join_type` - The type of join (e.g., INNER, LEFT, RIGHT, etc.). +/// * `join_type` - The type of join (e.g. `INNER`, `LEFT`, `RIGHT`). /// /// # Returns /// -/// A tuple containing two vectors of `usize` indices: the first vector represents the requirements for -/// the left child, and the second vector represents the requirements for the right child. The indices -/// are appropriately split and adjusted based on the join type. +/// A tuple containing two vectors of `usize` indices: The first vector represents +/// the requirements for the left child, and the second vector represents the +/// requirements for the right child. The indices are appropriately split and +/// adjusted based on the join type. fn split_join_requirements( left_len: usize, indices: &[usize], join_type: &JoinType, ) -> (Vec, Vec) { match join_type { - // In these cases requirements split to left and right child. + // In these cases requirements are split between left/right children: JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { - let (left_child_reqs, mut right_child_reqs): (Vec, Vec) = + let (left_reqs, mut right_reqs): (Vec, Vec) = indices.iter().partition(|&&idx| idx < left_len); - // Decrease right side index by `left_len` so that they point to valid positions in the right child. - right_child_reqs.iter_mut().for_each(|idx| *idx -= left_len); - (left_child_reqs, right_child_reqs) + // Decrease right side indices by `left_len` so that they point to valid + // positions within the right child: + for idx in right_reqs.iter_mut() { + *idx -= left_len; + } + (left_reqs, right_reqs) } // All requirements can be re-routed to left child directly. JoinType::LeftAnti | JoinType::LeftSemi => (indices.to_vec(), vec![]), - // All requirements can be re-routed to right side directly. (No need to change index, join schema is right child schema.) + // All requirements can be re-routed to right side directly. + // No need to change index, join schema is right child schema. JoinType::RightSemi | JoinType::RightAnti => (vec![], indices.to_vec()), } } -/// Adds a projection on top of a logical plan if it is beneficial and reduces the number of columns for the parent operator. +/// Adds a projection on top of a logical plan if doing so reduces the number +/// of columns for the parent operator. /// -/// This function takes a `LogicalPlan`, a list of projection expressions, and a flag indicating whether -/// the projection is beneficial. If the projection is beneficial and reduces the number of columns in -/// the plan, a new `LogicalPlan` with the projection is created and returned, along with a `true` flag. -/// If the projection is unnecessary or doesn't reduce the number of columns, the original plan is returned -/// with a `false` flag. +/// This function takes a `LogicalPlan` and a list of projection expressions. +/// If the projection is beneficial (it reduces the number of columns in the +/// plan) a new `LogicalPlan` with the projection is created and returned, along +/// with a `true` flag. If the projection doesn't reduce the number of columns, +/// the original plan is returned with a `false` flag. /// -/// # Arguments +/// # Parameters /// /// * `plan` - The input `LogicalPlan` to potentially add a projection to. /// * `project_exprs` - A list of expressions for the projection. -/// * `projection_beneficial` - A flag indicating whether the projection is beneficial. /// /// # Returns /// -/// A `Result` containing a tuple with two values: the resulting `LogicalPlan` (with or without -/// the added projection) and a `bool` flag indicating whether the projection was added (`true`) or not (`false`). +/// A `Result` containing a tuple with two values: The resulting `LogicalPlan` +/// (with or without the added projection) and a `bool` flag indicating if a +/// projection was added (`true`) or not (`false`). fn add_projection_on_top_if_helpful( plan: LogicalPlan, project_exprs: Vec, - projection_beneficial: bool, ) -> Result<(LogicalPlan, bool)> { - // Make sure projection decreases table column size, otherwise it is unnecessary. - if !projection_beneficial || project_exprs.len() >= plan.schema().fields().len() { + // Make sure projection decreases the number of columns, otherwise it is unnecessary. + if project_exprs.len() >= plan.schema().fields().len() { Ok((plan, false)) } else { - let new_plan = Projection::try_new(project_exprs, Arc::new(plan)) - .map(LogicalPlan::Projection)?; - Ok((new_plan, true)) + Projection::try_new(project_exprs, Arc::new(plan)) + .map(|proj| (LogicalPlan::Projection(proj), true)) } } -/// Collects and returns a vector of all indices of the fields in the schema of a logical plan. +/// Rewrite the given projection according to the fields required by its +/// ancestors. /// -/// # Arguments +/// # Parameters /// -/// * `plan` - A reference to the `LogicalPlan` for which indices are required. +/// * `proj` - A reference to the original projection to rewrite. +/// * `config` - A reference to the optimizer configuration. +/// * `indices` - A slice of indices representing the columns required by the +/// ancestors of the given projection. /// /// # Returns /// -/// A vector of `usize` indices representing all fields in the schema of the provided logical plan. -fn require_all_indices(plan: &LogicalPlan) -> Vec { - (0..plan.schema().fields().len()).collect() -} - -/// Rewrite Projection Given Required fields by its parent(s). -/// -/// # Arguments -/// -/// * `proj` - A reference to the original projection to be rewritten. -/// * `_config` - A reference to the optimizer configuration (unused in the function). -/// * `indices` - A slice of indices representing the required columns by the parent(s) of projection. -/// -/// # Returns +/// A `Result` object with the following semantics: /// -/// A `Result` containing an `Option` of the rewritten logical plan. If the -/// rewrite is successful, it returns `Some` with the optimized logical plan. -/// If the logical plan remains unchanged it returns `Ok(None)`. +/// - `Ok(Some(LogicalPlan))`: Contains the rewritten projection +/// - `Ok(None)`: No rewrite necessary. +/// - `Err(error)`: An error occured during the function call. fn rewrite_projection_given_requirements( proj: &Projection, - _config: &dyn OptimizerConfig, + config: &dyn OptimizerConfig, indices: &[usize], ) -> Result> { let exprs_used = get_at_indices(&proj.expr, indices); - let required_indices = indices_referred_by_exprs(&proj.input, exprs_used.iter())?; + let required_indices = + indices_referred_by_exprs(proj.input.schema(), exprs_used.iter())?; return if let Some(input) = - optimize_projections(&proj.input, _config, &required_indices)? + optimize_projections(&proj.input, config, &required_indices)? { if &projection_schema(&input, &exprs_used)? == input.schema() { Ok(Some(input)) } else { - let new_proj = Projection::try_new(exprs_used, Arc::new(input))?; - let new_proj = LogicalPlan::Projection(new_proj); - Ok(Some(new_proj)) + Projection::try_new(exprs_used, Arc::new(input)) + .map(|proj| Some(LogicalPlan::Projection(proj))) } } else if exprs_used.len() < proj.expr.len() { - // Projection expression used is different than the existing projection - // In this case, even if child doesn't change we should update projection to use less columns. + // Projection expression used is different than the existing projection. + // In this case, even if the child doesn't change, we should update the + // projection to use fewer columns: if &projection_schema(&proj.input, &exprs_used)? == proj.input.schema() { Ok(Some(proj.input.as_ref().clone())) } else { - let new_proj = Projection::try_new(exprs_used, proj.input.clone())?; - let new_proj = LogicalPlan::Projection(new_proj); - Ok(Some(new_proj)) + Projection::try_new(exprs_used, proj.input.clone()) + .map(|proj| Some(LogicalPlan::Projection(proj))) } } else { // Projection doesn't change. @@ -866,16 +970,16 @@ fn rewrite_projection_given_requirements( #[cfg(test)] mod tests { + use std::sync::Arc; + use crate::optimize_projections::OptimizeProjections; + use crate::test::{assert_optimized_plan_eq, test_table_scan}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::{Result, TableReference}; use datafusion_expr::{ binary_expr, col, count, lit, logical_plan::builder::LogicalPlanBuilder, table_scan, Expr, LogicalPlan, Operator, }; - use std::sync::Arc; - - use crate::test::*; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq(Arc::new(OptimizeProjections::new()), plan, expected) @@ -920,6 +1024,20 @@ mod tests { \n TableScan: test projection=[a]"; assert_optimized_plan_equal(&plan, expected) } + + #[test] + fn merge_nested_alias() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a").alias("alias1").alias("alias2")])? + .project(vec![col("alias2").alias("alias")])? + .build()?; + + let expected = "Projection: test.a AS alias\ + \n TableScan: test projection=[a]"; + assert_optimized_plan_equal(&plan, expected) + } + #[test] fn test_nested_count() -> Result<()> { let schema = Schema::new(vec![Field::new("foo", DataType::Int32, false)]); diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 7af46ed70adf..0dc34cb809eb 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -17,6 +17,10 @@ //! Query optimizer traits +use std::collections::HashSet; +use std::sync::Arc; +use std::time::Instant; + use crate::common_subexpr_eliminate::CommonSubexprEliminate; use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery; use crate::eliminate_cross_join::EliminateCrossJoin; @@ -41,15 +45,14 @@ use crate::simplify_expressions::SimplifyExpressions; use crate::single_distinct_to_groupby::SingleDistinctToGroupBy; use crate::unwrap_cast_in_comparison::UnwrapCastInComparison; use crate::utils::log_plan; -use chrono::{DateTime, Utc}; + use datafusion_common::alias::AliasGenerator; use datafusion_common::config::ConfigOptions; use datafusion_common::{DataFusionError, Result}; -use datafusion_expr::LogicalPlan; +use datafusion_expr::logical_plan::LogicalPlan; + +use chrono::{DateTime, Utc}; use log::{debug, warn}; -use std::collections::HashSet; -use std::sync::Arc; -use std::time::Instant; /// `OptimizerRule` transforms one [`LogicalPlan`] into another which /// computes the same results, but in a potentially more efficient @@ -447,17 +450,18 @@ pub(crate) fn assert_schema_is_the_same( #[cfg(test)] mod tests { + use std::sync::{Arc, Mutex}; + + use super::ApplyOrder; use crate::optimizer::Optimizer; use crate::test::test_table_scan; use crate::{OptimizerConfig, OptimizerContext, OptimizerRule}; + use datafusion_common::{ plan_err, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, }; use datafusion_expr::logical_plan::EmptyRelation; use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection}; - use std::sync::{Arc, Mutex}; - - use super::ApplyOrder; #[test] fn skip_failing_rule() { diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index 4172881c0aad..d857c6154ea9 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -15,8 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::any::Any; +use std::collections::HashMap; +use std::sync::Arc; + use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit}; -use chrono::{DateTime, NaiveDateTime, Utc}; use datafusion_common::config::ConfigOptions; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF}; @@ -28,9 +31,8 @@ use datafusion_sql::sqlparser::ast::Statement; use datafusion_sql::sqlparser::dialect::GenericDialect; use datafusion_sql::sqlparser::parser::Parser; use datafusion_sql::TableReference; -use std::any::Any; -use std::collections::HashMap; -use std::sync::Arc; + +use chrono::{DateTime, NaiveDateTime, Utc}; #[cfg(test)] #[ctor::ctor] diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs index 15f720d75652..a0819e4aaf8e 100644 --- a/datafusion/sql/src/select.rs +++ b/datafusion/sql/src/select.rs @@ -25,10 +25,7 @@ use crate::utils::{ }; use datafusion_common::Column; -use datafusion_common::{ - get_target_functional_dependencies, not_impl_err, plan_err, DFSchemaRef, - DataFusionError, Result, -}; +use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; use datafusion_expr::expr::Alias; use datafusion_expr::expr_rewriter::{ normalize_col, normalize_col_with_schemas_and_ambiguity_check, @@ -534,14 +531,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { group_by_exprs: &[Expr], aggr_exprs: &[Expr], ) -> Result<(LogicalPlan, Vec, Option)> { - let group_by_exprs = - get_updated_group_by_exprs(group_by_exprs, select_exprs, input.schema())?; - // create the aggregate plan let plan = LogicalPlanBuilder::from(input.clone()) - .aggregate(group_by_exprs.clone(), aggr_exprs.to_vec())? + .aggregate(group_by_exprs.to_vec(), aggr_exprs.to_vec())? .build()?; + let group_by_exprs = if let LogicalPlan::Aggregate(agg) = &plan { + &agg.group_expr + } else { + unreachable!(); + }; + // in this next section of code we are re-writing the projection to refer to columns // output by the aggregate plan. For example, if the projection contains the expression // `SUM(a)` then we replace that with a reference to a column `SUM(a)` produced by @@ -550,7 +550,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // combine the original grouping and aggregate expressions into one list (note that // we do not add the "having" expression since that is not part of the projection) let mut aggr_projection_exprs = vec![]; - for expr in &group_by_exprs { + for expr in group_by_exprs { match expr { Expr::GroupingSet(GroupingSet::Rollup(exprs)) => { aggr_projection_exprs.extend_from_slice(exprs) @@ -659,61 +659,3 @@ fn match_window_definitions( } Ok(()) } - -/// Update group by exprs, according to functional dependencies -/// The query below -/// -/// SELECT sn, amount -/// FROM sales_global -/// GROUP BY sn -/// -/// cannot be calculated, because it has a column(`amount`) which is not -/// part of group by expression. -/// However, if we know that, `sn` is determinant of `amount`. We can -/// safely, determine value of `amount` for each distinct `sn`. For these cases -/// we rewrite the query above as -/// -/// SELECT sn, amount -/// FROM sales_global -/// GROUP BY sn, amount -/// -/// Both queries, are functionally same. \[Because, (`sn`, `amount`) and (`sn`) -/// defines the identical groups. \] -/// This function updates group by expressions such that select expressions that are -/// not in group by expression, are added to the group by expressions if they are dependent -/// of the sub-set of group by expressions. -fn get_updated_group_by_exprs( - group_by_exprs: &[Expr], - select_exprs: &[Expr], - schema: &DFSchemaRef, -) -> Result> { - let mut new_group_by_exprs = group_by_exprs.to_vec(); - let fields = schema.fields(); - let group_by_expr_names = group_by_exprs - .iter() - .map(|group_by_expr| group_by_expr.display_name()) - .collect::>>()?; - // Get targets that can be used in a select, even if they do not occur in aggregation: - if let Some(target_indices) = - get_target_functional_dependencies(schema, &group_by_expr_names) - { - // Calculate dependent fields names with determinant GROUP BY expression: - let associated_field_names = target_indices - .iter() - .map(|idx| fields[*idx].qualified_name()) - .collect::>(); - // Expand GROUP BY expressions with select expressions: If a GROUP - // BY expression is a determinant key, we can use its dependent - // columns in select statements also. - for expr in select_exprs { - let expr_name = format!("{}", expr); - if !new_group_by_exprs.contains(expr) - && associated_field_names.contains(&expr_name) - { - new_group_by_exprs.push(expr.clone()); - } - } - } - - Ok(new_group_by_exprs) -} diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index 1d6d7dc671fa..5248ac8c8531 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -3211,6 +3211,21 @@ SELECT s.sn, s.amount, 2*s.sn 3 200 6 4 100 8 +# we should be able to re-write group by expression +# using functional dependencies for complex expressions also. +# In this case, we use 2*s.amount instead of s.amount. +query IRI +SELECT s.sn, 2*s.amount, 2*s.sn + FROM sales_global_with_pk AS s + GROUP BY sn + ORDER BY sn +---- +0 60 0 +1 100 2 +2 150 4 +3 400 6 +4 200 8 + query IRI SELECT s.sn, s.amount, 2*s.sn FROM sales_global_with_pk_alternate AS s @@ -3364,7 +3379,7 @@ SELECT column1, COUNT(*) as column2 FROM (VALUES (['a', 'b'], 1), (['c', 'd', 'e # primary key should be aware from which columns it is associated -statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression r.sn could not be resolved from available columns: l.sn, SUM\(l.amount\) +statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression r.sn could not be resolved from available columns: l.sn, l.zip_code, l.country, l.ts, l.currency, l.amount, SUM\(l.amount\) SELECT l.sn, r.sn, SUM(l.amount), r.amount FROM sales_global_with_pk AS l JOIN sales_global_with_pk AS r @@ -3456,7 +3471,7 @@ ORDER BY r.sn 4 100 2022-01-03T10:00:00 # after join, new window expressions shouldn't be associated with primary keys -statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression rn1 could not be resolved from available columns: r.sn, SUM\(r.amount\) +statement error DataFusion error: Error during planning: Projection references non-aggregate values: Expression rn1 could not be resolved from available columns: r.sn, r.ts, r.amount, SUM\(r.amount\) SELECT r.sn, SUM(r.amount), rn1 FROM (SELECT r.ts, r.sn, r.amount, @@ -3784,6 +3799,192 @@ AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, b@1 as b], aggr=[SUM(multip ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true +statement ok +set datafusion.execution.target_partitions = 1; + +query TT +EXPLAIN SELECT c, sum1 + FROM + (SELECT c, b, a, SUM(d) as sum1 + FROM multiple_ordered_table_with_pk + GROUP BY c) +GROUP BY c; +---- +logical_plan +Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, sum1]], aggr=[[]] +--Projection: multiple_ordered_table_with_pk.c, SUM(multiple_ordered_table_with_pk.d) AS sum1 +----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]] +------TableScan: multiple_ordered_table_with_pk projection=[c, d] +physical_plan +AggregateExec: mode=Single, gby=[c@0 as c, sum1@1 as sum1], aggr=[], ordering_mode=PartiallySorted([0]) +--ProjectionExec: expr=[c@0 as c, SUM(multiple_ordered_table_with_pk.d)@1 as sum1] +----AggregateExec: mode=Single, gby=[c@0 as c], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], has_header=true + +query TT +EXPLAIN SELECT c, sum1, SUM(b) OVER() as sumb + FROM + (SELECT c, b, a, SUM(d) as sum1 + FROM multiple_ordered_table_with_pk + GROUP BY c); +---- +logical_plan +Projection: multiple_ordered_table_with_pk.c, sum1, SUM(multiple_ordered_table_with_pk.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS sumb +--WindowAggr: windowExpr=[[SUM(CAST(multiple_ordered_table_with_pk.b AS Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] +----Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b, SUM(multiple_ordered_table_with_pk.d) AS sum1 +------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]] +--------TableScan: multiple_ordered_table_with_pk projection=[b, c, d] +physical_plan +ProjectionExec: expr=[c@0 as c, sum1@2 as sum1, SUM(multiple_ordered_table_with_pk.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@3 as sumb] +--WindowAggExec: wdw=[SUM(multiple_ordered_table_with_pk.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(multiple_ordered_table_with_pk.b) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }] +----ProjectionExec: expr=[c@0 as c, b@1 as b, SUM(multiple_ordered_table_with_pk.d)@2 as sum1] +------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true + +query TT +EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1 + FROM + (SELECT c, b, a, SUM(d) as sum1 + FROM multiple_ordered_table_with_pk + GROUP BY c) as lhs + JOIN + (SELECT c, b, a, SUM(d) as sum1 + FROM multiple_ordered_table_with_pk + GROUP BY c) as rhs + ON lhs.b=rhs.b; +---- +logical_plan +Projection: lhs.c, rhs.c, lhs.sum1, rhs.sum1 +--Inner Join: lhs.b = rhs.b +----SubqueryAlias: lhs +------Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b, SUM(multiple_ordered_table_with_pk.d) AS sum1 +--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]] +----------TableScan: multiple_ordered_table_with_pk projection=[b, c, d] +----SubqueryAlias: rhs +------Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b, SUM(multiple_ordered_table_with_pk.d) AS sum1 +--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.b]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]] +----------TableScan: multiple_ordered_table_with_pk projection=[b, c, d] +physical_plan +ProjectionExec: expr=[c@0 as c, c@3 as c, sum1@2 as sum1, sum1@5 as sum1] +--CoalesceBatchesExec: target_batch_size=2 +----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(b@1, b@1)] +------ProjectionExec: expr=[c@0 as c, b@1 as b, SUM(multiple_ordered_table_with_pk.d)@2 as sum1] +--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) +----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true +------ProjectionExec: expr=[c@0 as c, b@1 as b, SUM(multiple_ordered_table_with_pk.d)@2 as sum1] +--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0]) +----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, d], output_ordering=[c@1 ASC NULLS LAST], has_header=true + +query TT +EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1 + FROM + (SELECT c, b, a, SUM(d) as sum1 + FROM multiple_ordered_table_with_pk + GROUP BY c) as lhs + CROSS JOIN + (SELECT c, b, a, SUM(d) as sum1 + FROM multiple_ordered_table_with_pk + GROUP BY c) as rhs; +---- +logical_plan +Projection: lhs.c, rhs.c, lhs.sum1, rhs.sum1 +--CrossJoin: +----SubqueryAlias: lhs +------Projection: multiple_ordered_table_with_pk.c, SUM(multiple_ordered_table_with_pk.d) AS sum1 +--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]] +----------TableScan: multiple_ordered_table_with_pk projection=[c, d] +----SubqueryAlias: rhs +------Projection: multiple_ordered_table_with_pk.c, SUM(multiple_ordered_table_with_pk.d) AS sum1 +--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]] +----------TableScan: multiple_ordered_table_with_pk projection=[c, d] +physical_plan +ProjectionExec: expr=[c@0 as c, c@2 as c, sum1@1 as sum1, sum1@3 as sum1] +--CrossJoinExec +----ProjectionExec: expr=[c@0 as c, SUM(multiple_ordered_table_with_pk.d)@1 as sum1] +------AggregateExec: mode=Single, gby=[c@0 as c], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], has_header=true +----ProjectionExec: expr=[c@0 as c, SUM(multiple_ordered_table_with_pk.d)@1 as sum1] +------AggregateExec: mode=Single, gby=[c@0 as c], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted +--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], has_header=true + +# we do not generate physical plan for Repartition yet (e.g Distribute By queries). +query TT +EXPLAIN SELECT a, b, sum1 +FROM (SELECT c, b, a, SUM(d) as sum1 + FROM multiple_ordered_table_with_pk + GROUP BY c) +DISTRIBUTE BY a +---- +logical_plan +Repartition: DistributeBy(a) +--Projection: multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b, SUM(multiple_ordered_table_with_pk.d) AS sum1 +----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]] +------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d] + +# union with aggregate +query TT +EXPLAIN SELECT c, a, SUM(d) as sum1 + FROM multiple_ordered_table_with_pk + GROUP BY c +UNION ALL + SELECT c, a, SUM(d) as sum1 + FROM multiple_ordered_table_with_pk + GROUP BY c +---- +logical_plan +Union +--Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, SUM(multiple_ordered_table_with_pk.d) AS sum1 +----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]] +------TableScan: multiple_ordered_table_with_pk projection=[a, c, d] +--Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, SUM(multiple_ordered_table_with_pk.d) AS sum1 +----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]] +------TableScan: multiple_ordered_table_with_pk projection=[a, c, d] +physical_plan +UnionExec +--ProjectionExec: expr=[c@0 as c, a@1 as a, SUM(multiple_ordered_table_with_pk.d)@2 as sum1] +----AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true +--ProjectionExec: expr=[c@0 as c, a@1 as a, SUM(multiple_ordered_table_with_pk.d)@2 as sum1] +----AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true + +# table scan should be simplified. +query TT +EXPLAIN SELECT c, a, SUM(d) as sum1 + FROM multiple_ordered_table_with_pk + GROUP BY c +---- +logical_plan +Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, SUM(multiple_ordered_table_with_pk.d) AS sum1 +--Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]] +----TableScan: multiple_ordered_table_with_pk projection=[a, c, d] +physical_plan +ProjectionExec: expr=[c@0 as c, a@1 as a, SUM(multiple_ordered_table_with_pk.d)@2 as sum1] +--AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted +----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true + +# limit should be simplified +query TT +EXPLAIN SELECT * + FROM (SELECT c, a, SUM(d) as sum1 + FROM multiple_ordered_table_with_pk + GROUP BY c + LIMIT 5) +---- +logical_plan +Projection: multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, SUM(multiple_ordered_table_with_pk.d) AS sum1 +--Limit: skip=0, fetch=5 +----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a]], aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]] +------TableScan: multiple_ordered_table_with_pk projection=[a, c, d] +physical_plan +ProjectionExec: expr=[c@0 as c, a@1 as a, SUM(multiple_ordered_table_with_pk.d)@2 as sum1] +--GlobalLimitExec: skip=0, fetch=5 +----AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted +------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true + +statement ok +set datafusion.execution.target_partitions = 8; + # Tests for single distinct to group by optimization rule statement ok CREATE TABLE t(x int) AS VALUES (1), (2), (1);