From 39fbd8e88ed58866059a9fa0f75cd6028fbe631d Mon Sep 17 00:00:00 2001 From: Dharan Aditya Date: Sun, 23 Jun 2024 15:48:57 +0530 Subject: [PATCH] Convert Average to UDAF #10942 (#10964) * add avg udaf * remove avg from expr * add test stub * migrate avg udaf * change avg udaf signature remove avg phy expr * fix tests * fix state_fields fn * fix ut in phy-plan aggr * refactor Average to Avg * refactor Average to Avg * fix type coercion tests * fix example and logic tests * fix py expr failing ut * update docs * fix failing tests * formatting examples * remove duplicate code and fix uts * addressing PR comments * add ut for logical avg window * fix physical plan roundtrip_window test case --- .../examples/dataframe_subquery.rs | 1 + .../examples/simplify_udaf_expression.rs | 38 +- .../examples/simplify_udwf_expression.rs | 10 +- datafusion/core/src/dataframe/mod.rs | 12 +- datafusion/core/tests/dataframe/mod.rs | 8 +- .../user_defined/user_defined_aggregates.rs | 3 +- .../user_defined_scalar_functions.rs | 4 +- datafusion/expr/src/aggregate_function.rs | 22 - datafusion/expr/src/expr.rs | 7 - datafusion/expr/src/expr_fn.rs | 12 - datafusion/expr/src/expr_rewriter/order_by.rs | 8 +- datafusion/expr/src/test/function_stub.rs | 80 +++- .../expr/src/type_coercion/aggregates.rs | 86 ++-- .../src}/average.rs | 410 +++++++++--------- datafusion/functions-aggregate/src/lib.rs | 3 + .../optimizer/src/analyzer/type_coercion.rs | 39 +- .../optimizer/src/common_subexpr_eliminate.rs | 34 +- .../optimizer/tests/optimizer_integration.rs | 10 +- .../physical-expr/src/aggregate/build_in.rs | 74 +--- .../src/aggregate/groups_accumulator/mod.rs | 1 + datafusion/physical-expr/src/aggregate/mod.rs | 1 - .../physical-expr/src/expressions/mod.rs | 2 - .../physical-plan/src/aggregates/mod.rs | 73 +++- datafusion/proto/proto/datafusion.proto | 2 +- datafusion/proto/src/generated/pbjson.rs | 3 - datafusion/proto/src/generated/prost.rs | 4 +- .../proto/src/logical_plan/from_proto.rs | 1 - datafusion/proto/src/logical_plan/to_proto.rs | 2 - .../proto/src/physical_plan/to_proto.rs | 35 +- .../tests/cases/roundtrip_logical_plan.rs | 15 +- .../tests/cases/roundtrip_physical_plan.rs | 52 ++- datafusion/sql/tests/sql_integration.rs | 14 +- .../sqllogictest/test_files/aggregate.slt | 14 +- datafusion/sqllogictest/test_files/errors.slt | 2 +- .../sqllogictest/test_files/group_by.slt | 4 +- .../optimizer_group_by_constant.slt | 4 +- .../sqllogictest/test_files/predicates.slt | 4 +- .../sqllogictest/test_files/tpch/q1.slt.part | 10 +- .../sqllogictest/test_files/tpch/q17.slt.part | 14 +- .../sqllogictest/test_files/tpch/q22.slt.part | 10 +- datafusion/sqllogictest/test_files/window.slt | 26 +- .../tests/cases/consumer_integration.rs | 4 +- .../tests/cases/roundtrip_logical_plan.rs | 2 +- 43 files changed, 564 insertions(+), 596 deletions(-) rename datafusion/{physical-expr/src/aggregate => functions-aggregate/src}/average.rs (76%) diff --git a/datafusion-examples/examples/dataframe_subquery.rs b/datafusion-examples/examples/dataframe_subquery.rs index 9fb61008b9f6..e798751b3353 100644 --- a/datafusion-examples/examples/dataframe_subquery.rs +++ b/datafusion-examples/examples/dataframe_subquery.rs @@ -19,6 +19,7 @@ use arrow_schema::DataType; use std::sync::Arc; use datafusion::error::Result; +use datafusion::functions_aggregate::average::avg; use datafusion::prelude::*; use datafusion::test_util::arrow_test_data; use datafusion_common::ScalarValue; diff --git a/datafusion-examples/examples/simplify_udaf_expression.rs b/datafusion-examples/examples/simplify_udaf_expression.rs index d2c8c6a86c7c..aedc511c62fe 100644 --- a/datafusion-examples/examples/simplify_udaf_expression.rs +++ b/datafusion-examples/examples/simplify_udaf_expression.rs @@ -15,21 +15,21 @@ // specific language governing permissions and limitations // under the License. -use arrow_schema::{Field, Schema}; -use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; -use datafusion_expr::function::{AggregateFunctionSimplification, StateFieldsArgs}; -use datafusion_expr::simplify::SimplifyInfo; - use std::{any::Any, sync::Arc}; +use arrow_schema::{Field, Schema}; + use datafusion::arrow::{array::Float32Array, record_batch::RecordBatch}; use datafusion::error::Result; +use datafusion::functions_aggregate::average::avg_udaf; +use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility}; use datafusion::{assert_batches_eq, prelude::*}; use datafusion_common::cast::as_float64_array; +use datafusion_expr::function::{AggregateFunctionSimplification, StateFieldsArgs}; +use datafusion_expr::simplify::SimplifyInfo; use datafusion_expr::{ - expr::{AggregateFunction, AggregateFunctionDefinition}, - function::AccumulatorArgs, - Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature, + expr::AggregateFunction, function::AccumulatorArgs, Accumulator, AggregateUDF, + AggregateUDFImpl, GroupsAccumulator, Signature, }; /// This example shows how to use the AggregateUDFImpl::simplify API to simplify/replace user @@ -92,18 +92,16 @@ impl AggregateUDFImpl for BetterAvgUdaf { // with build-in aggregate function to illustrate the use let simplify = |aggregate_function: datafusion_expr::expr::AggregateFunction, _: &dyn SimplifyInfo| { - Ok(Expr::AggregateFunction(AggregateFunction { - func_def: AggregateFunctionDefinition::BuiltIn( - // yes it is the same Avg, `BetterAvgUdaf` was just a - // marketing pitch :) - datafusion_expr::aggregate_function::AggregateFunction::Avg, - ), - args: aggregate_function.args, - distinct: aggregate_function.distinct, - filter: aggregate_function.filter, - order_by: aggregate_function.order_by, - null_treatment: aggregate_function.null_treatment, - })) + Ok(Expr::AggregateFunction(AggregateFunction::new_udf( + avg_udaf(), + // yes it is the same Avg, `BetterAvgUdaf` was just a + // marketing pitch :) + aggregate_function.args, + aggregate_function.distinct, + aggregate_function.filter, + aggregate_function.order_by, + aggregate_function.null_treatment, + ))) }; Some(Box::new(simplify)) diff --git a/datafusion-examples/examples/simplify_udwf_expression.rs b/datafusion-examples/examples/simplify_udwf_expression.rs index 4e8d03c38e00..a17e45dba2a3 100644 --- a/datafusion-examples/examples/simplify_udwf_expression.rs +++ b/datafusion-examples/examples/simplify_udwf_expression.rs @@ -18,12 +18,14 @@ use std::any::Any; use arrow_schema::DataType; + use datafusion::execution::context::SessionContext; +use datafusion::functions_aggregate::average::avg_udaf; use datafusion::{error::Result, execution::options::CsvReadOptions}; use datafusion_expr::function::WindowFunctionSimplification; use datafusion_expr::{ - expr::WindowFunction, simplify::SimplifyInfo, AggregateFunction, Expr, - PartitionEvaluator, Signature, Volatility, WindowUDF, WindowUDFImpl, + expr::WindowFunction, simplify::SimplifyInfo, Expr, PartitionEvaluator, Signature, + Volatility, WindowUDF, WindowUDFImpl, }; /// This UDWF will show how to use the WindowUDFImpl::simplify() API @@ -71,9 +73,7 @@ impl WindowUDFImpl for SimplifySmoothItUdf { let simplify = |window_function: datafusion_expr::expr::WindowFunction, _: &dyn SimplifyInfo| { Ok(Expr::WindowFunction(WindowFunction { - fun: datafusion_expr::WindowFunctionDefinition::AggregateFunction( - AggregateFunction::Avg, - ), + fun: datafusion_expr::WindowFunctionDefinition::AggregateUDF(avg_udaf()), args: window_function.args, partition_by: window_function.partition_by, order_by: window_function.order_by, diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 90cde087689a..e1fc8273e6ff 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -48,13 +48,11 @@ use datafusion_common::config::{CsvOptions, FormatOptions, JsonOptions}; use datafusion_common::{ plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, UnnestOptions, }; -use datafusion_expr::lit; +use datafusion_expr::{case, is_null, lit}; use datafusion_expr::{ - avg, max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, - UNNAMED_TABLE, + max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE, }; -use datafusion_expr::{case, is_null}; -use datafusion_functions_aggregate::expr_fn::{count, median, stddev, sum}; +use datafusion_functions_aggregate::expr_fn::{avg, count, median, stddev, sum}; use async_trait::async_trait; @@ -561,7 +559,7 @@ impl DataFrame { /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); /// let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await? - /// // Return a single row (a, b) for each distinct value of a + /// // Return a single row (a, b) for each distinct value of a /// .distinct_on(vec![col("a")], vec![col("a"), col("b")], None)?; /// # Ok(()) /// # } @@ -2045,7 +2043,7 @@ mod tests { assert_batches_sorted_eq!( ["+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+", - "| c1 | MIN(aggregate_test_100.c12) | MAX(aggregate_test_100.c12) | AVG(aggregate_test_100.c12) | sum(aggregate_test_100.c12) | count(aggregate_test_100.c12) | count(DISTINCT aggregate_test_100.c12) |", + "| c1 | MIN(aggregate_test_100.c12) | MAX(aggregate_test_100.c12) | avg(aggregate_test_100.c12) | sum(aggregate_test_100.c12) | count(aggregate_test_100.c12) | count(DISTINCT aggregate_test_100.c12) |", "+----+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-------------------------------+----------------------------------------+", "| a | 0.02182578039211991 | 0.9800193410444061 | 0.48754517466109415 | 10.238448667882977 | 21 | 21 |", "| b | 0.04893135681998029 | 0.9185813970744787 | 0.41040709263815384 | 7.797734760124923 | 19 | 19 |", diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index c018ebec4b63..700bc53af72e 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -52,11 +52,11 @@ use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::expr::{GroupingSet, Sort}; use datafusion_expr::var_provider::{VarProvider, VarType}; use datafusion_expr::{ - array_agg, avg, cast, col, exists, expr, in_subquery, lit, max, out_ref_col, - placeholder, scalar_subquery, when, wildcard, Expr, ExprSchemable, WindowFrame, - WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, + array_agg, cast, col, exists, expr, in_subquery, lit, max, out_ref_col, placeholder, + scalar_subquery, when, wildcard, Expr, ExprSchemable, WindowFrame, WindowFrameBound, + WindowFrameUnits, WindowFunctionDefinition, }; -use datafusion_functions_aggregate::expr_fn::{count, sum}; +use datafusion_functions_aggregate::expr_fn::{avg, count, sum}; #[tokio::test] async fn test_count_wildcard_on_sort() -> Result<()> { diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs b/datafusion/core/tests/user_defined/user_defined_aggregates.rs index 66cdeb575a15..d591c662d877 100644 --- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs +++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs @@ -48,7 +48,8 @@ use datafusion_expr::{ create_udaf, function::AccumulatorArgs, AggregateUDFImpl, GroupsAccumulator, SimpleAggregateUDF, }; -use datafusion_physical_expr::expressions::AvgAccumulator; +use datafusion_functions_aggregate::average::AvgAccumulator; + /// Test to show the contents of the setup #[tokio::test] async fn test_setup() { diff --git a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs index a81fc9159e52..5e3c44c039ab 100644 --- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs +++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs @@ -51,7 +51,7 @@ async fn csv_query_custom_udf_with_cast() -> Result<()> { let actual = plan_and_collect(&ctx, sql).await.unwrap(); let expected = [ "+------------------------------------------+", - "| AVG(custom_sqrt(aggregate_test_100.c11)) |", + "| avg(custom_sqrt(aggregate_test_100.c11)) |", "+------------------------------------------+", "| 0.6584408483418835 |", "+------------------------------------------+", @@ -69,7 +69,7 @@ async fn csv_query_avg_sqrt() -> Result<()> { let actual = plan_and_collect(&ctx, sql).await.unwrap(); let expected = [ "+------------------------------------------+", - "| AVG(custom_sqrt(aggregate_test_100.c12)) |", + "| avg(custom_sqrt(aggregate_test_100.c12)) |", "+------------------------------------------+", "| 0.6706002946036459 |", "+------------------------------------------+", diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs index 967ccc0b0866..7663dcd6105a 100644 --- a/datafusion/expr/src/aggregate_function.rs +++ b/datafusion/expr/src/aggregate_function.rs @@ -37,8 +37,6 @@ pub enum AggregateFunction { Min, /// Maximum Max, - /// Average - Avg, /// Aggregation into an array ArrayAgg, /// N'th value in a group according to some ordering @@ -55,7 +53,6 @@ impl AggregateFunction { match self { Min => "MIN", Max => "MAX", - Avg => "AVG", ArrayAgg => "ARRAY_AGG", NthValue => "NTH_VALUE", Correlation => "CORR", @@ -75,9 +72,7 @@ impl FromStr for AggregateFunction { fn from_str(name: &str) -> Result { Ok(match name { // general - "avg" => AggregateFunction::Avg, "max" => AggregateFunction::Max, - "mean" => AggregateFunction::Avg, "min" => AggregateFunction::Min, "array_agg" => AggregateFunction::ArrayAgg, "nth_value" => AggregateFunction::NthValue, @@ -123,7 +118,6 @@ impl AggregateFunction { AggregateFunction::Correlation => { correlation_return_type(&coerced_data_types[0]) } - AggregateFunction::Avg => avg_return_type(&coerced_data_types[0]), AggregateFunction::ArrayAgg => Ok(DataType::List(Arc::new(Field::new( "item", coerced_data_types[0].clone(), @@ -135,19 +129,6 @@ impl AggregateFunction { } } -/// Returns the internal sum datatype of the avg aggregate function. -pub fn sum_type_of_avg(input_expr_types: &[DataType]) -> Result { - // Note that this function *must* return the same type that the respective physical expression returns - // or the execution panics. - let fun = AggregateFunction::Avg; - let coerced_data_types = crate::type_coercion::aggregates::coerce_types( - &fun, - input_expr_types, - &fun.signature(), - )?; - avg_sum_type(&coerced_data_types[0]) -} - impl AggregateFunction { /// the signatures supported by the function `fun`. pub fn signature(&self) -> Signature { @@ -168,9 +149,6 @@ impl AggregateFunction { .collect::>(); Signature::uniform(1, valid, Volatility::Immutable) } - AggregateFunction::Avg => { - Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable) - } AggregateFunction::NthValue => Signature::any(2, Volatility::Immutable), AggregateFunction::Correlation => { Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable) diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 0de4a87b941b..409ccf7b47c7 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -2280,7 +2280,6 @@ mod test { "nth_value", "min", "max", - "avg", ]; for name in names { let fun = find_df_window_func(name).unwrap(); @@ -2309,12 +2308,6 @@ mod test { aggregate_function::AggregateFunction::Min )) ); - assert_eq!( - find_df_window_func("avg"), - Some(WindowFunctionDefinition::AggregateFunction( - aggregate_function::AggregateFunction::Avg - )) - ); assert_eq!( find_df_window_func("cume_dist"), Some(WindowFunctionDefinition::BuiltInWindowFunction( diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index deac20555771..8b0213fd52fd 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -183,18 +183,6 @@ pub fn array_agg(expr: Expr) -> Expr { )) } -/// Create an expression to represent the avg() aggregate function -pub fn avg(expr: Expr) -> Expr { - Expr::AggregateFunction(AggregateFunction::new( - aggregate_function::AggregateFunction::Avg, - vec![expr], - false, - None, - None, - None, - )) -} - /// Return a new expression with bitwise AND pub fn bitwise_and(left: Expr, right: Expr) -> Expr { Expr::BinaryExpr(BinaryExpr::new( diff --git a/datafusion/expr/src/expr_rewriter/order_by.rs b/datafusion/expr/src/expr_rewriter/order_by.rs index eb38fee7cad0..4b56ca3d1c2e 100644 --- a/datafusion/expr/src/expr_rewriter/order_by.rs +++ b/datafusion/expr/src/expr_rewriter/order_by.rs @@ -156,8 +156,8 @@ mod test { use arrow::datatypes::{DataType, Field, Schema}; use crate::{ - avg, cast, col, lit, logical_plan::builder::LogicalTableSource, min, try_cast, - LogicalPlanBuilder, + cast, col, lit, logical_plan::builder::LogicalTableSource, min, + test::function_stub::avg, try_cast, LogicalPlanBuilder, }; use super::*; @@ -246,9 +246,9 @@ mod test { expected: sort(col("c1") + col("MIN(t.c2)")), }, TestCase { - desc: r#"avg(c3) --> "AVG(t.c3)" as average (column *named* "AVG(t.c3)", aliased)"#, + desc: r#"avg(c3) --> "avg(t.c3)" as average (column *named* "avg(t.c3)", aliased)"#, input: sort(avg(col("c3"))), - expected: sort(col("AVG(t.c3)").alias("average")), + expected: sort(col("avg(t.c3)").alias("average")), }, ]; diff --git a/datafusion/expr/src/test/function_stub.rs b/datafusion/expr/src/test/function_stub.rs index ac98ee9747cc..14a6522ebe91 100644 --- a/datafusion/expr/src/test/function_stub.rs +++ b/datafusion/expr/src/test/function_stub.rs @@ -21,6 +21,14 @@ use std::any::Any; +use arrow::datatypes::{ + DataType, Field, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, +}; + +use datafusion_common::{exec_err, not_impl_err, Result}; + +use crate::type_coercion::aggregates::{avg_return_type, coerce_avg_type, NUMERICS}; +use crate::Volatility::Immutable; use crate::{ expr::AggregateFunction, function::{AccumulatorArgs, StateFieldsArgs}, @@ -28,10 +36,6 @@ use crate::{ Accumulator, AggregateUDFImpl, Expr, GroupsAccumulator, ReversedUDAF, Signature, Volatility, }; -use arrow::datatypes::{ - DataType, Field, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, -}; -use datafusion_common::{exec_err, not_impl_err, Result}; macro_rules! create_func { ($UDAF:ty, $AGGREGATE_UDF_FN:ident) => { @@ -82,6 +86,19 @@ pub fn count(expr: Expr) -> Expr { )) } +create_func!(Avg, avg_udaf); + +pub fn avg(expr: Expr) -> Expr { + Expr::AggregateFunction(AggregateFunction::new_udf( + avg_udaf(), + vec![expr], + false, + None, + None, + None, + )) +} + /// Stub `sum` used for optimizer testing #[derive(Debug)] pub struct Sum { @@ -273,3 +290,58 @@ impl AggregateUDFImpl for Count { ReversedUDAF::Identical } } + +/// Testing stub implementation of avg aggregate +#[derive(Debug)] +pub struct Avg { + signature: Signature, + aliases: Vec, +} + +impl Avg { + pub fn new() -> Self { + Self { + aliases: vec![String::from("mean")], + signature: Signature::uniform(1, NUMERICS.to_vec(), Immutable), + } + } +} + +impl Default for Avg { + fn default() -> Self { + Self::new() + } +} + +impl AggregateUDFImpl for Avg { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "avg" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + avg_return_type(self.name(), &arg_types[0]) + } + + fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result> { + not_impl_err!("no impl for stub") + } + + fn state_fields(&self, _args: StateFieldsArgs) -> Result> { + not_impl_err!("no impl for stub") + } + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + coerce_avg_type(self.name(), arg_types) + } +} diff --git a/datafusion/expr/src/type_coercion/aggregates.rs b/datafusion/expr/src/type_coercion/aggregates.rs index 428fc99070d2..7448af250284 100644 --- a/datafusion/expr/src/type_coercion/aggregates.rs +++ b/datafusion/expr/src/type_coercion/aggregates.rs @@ -17,14 +17,15 @@ use std::ops::Deref; -use crate::{AggregateFunction, Signature, TypeSignature}; - use arrow::datatypes::{ DataType, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, }; + use datafusion_common::{internal_err, plan_err, Result}; +use crate::{AggregateFunction, Signature, TypeSignature}; + pub static STRINGS: &[DataType] = &[DataType::Utf8, DataType::LargeUtf8]; pub static SIGNED_INTEGERS: &[DataType] = &[ @@ -101,26 +102,6 @@ pub fn coerce_types( // unpack the dictionary to get the value get_min_max_result_type(input_types) } - AggregateFunction::Avg => { - // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc - // smallint, int, bigint, real, double precision, decimal, or interval - let v = match &input_types[0] { - Decimal128(p, s) => Decimal128(*p, *s), - Decimal256(p, s) => Decimal256(*p, *s), - d if d.is_numeric() => Float64, - Dictionary(_, v) => { - return coerce_types(agg_fun, &[v.as_ref().clone()], signature) - } - _ => { - return plan_err!( - "The function {:?} does not support inputs of type {:?}.", - agg_fun, - input_types[0] - ) - } - }; - Ok(vec![v]) - } AggregateFunction::Correlation => { if !is_correlation_support_arg_type(&input_types[0]) { return plan_err!( @@ -262,7 +243,7 @@ pub fn correlation_return_type(arg_type: &DataType) -> Result { } /// function return type of an average -pub fn avg_return_type(arg_type: &DataType) -> Result { +pub fn avg_return_type(func_name: &str, arg_type: &DataType) -> Result { match arg_type { DataType::Decimal128(precision, scale) => { // in the spark, the result type is DECIMAL(min(38,precision+4), min(38,scale+4)). @@ -280,9 +261,9 @@ pub fn avg_return_type(arg_type: &DataType) -> Result { } arg_type if NUMERICS.contains(arg_type) => Ok(DataType::Float64), DataType::Dictionary(_, dict_value_type) => { - avg_return_type(dict_value_type.as_ref()) + avg_return_type(func_name, dict_value_type.as_ref()) } - other => plan_err!("AVG does not support {other:?}"), + other => plan_err!("{func_name} does not support {other:?}"), } } @@ -358,10 +339,29 @@ pub fn is_integer_arg_type(arg_type: &DataType) -> bool { arg_type.is_integer() } +pub fn coerce_avg_type(func_name: &str, arg_types: &[DataType]) -> Result> { + // Supported types smallint, int, bigint, real, double precision, decimal, or interval + // Refer to https://www.postgresql.org/docs/8.2/functions-aggregate.html doc + fn coerced_type(func_name: &str, data_type: &DataType) -> Result { + return match &data_type { + DataType::Decimal128(p, s) => Ok(DataType::Decimal128(*p, *s)), + DataType::Decimal256(p, s) => Ok(DataType::Decimal256(*p, *s)), + d if d.is_numeric() => Ok(DataType::Float64), + DataType::Dictionary(_, v) => return coerced_type(func_name, v.as_ref()), + _ => { + return plan_err!( + "The function {:?} does not support inputs of type {:?}.", + func_name, + data_type + ) + } + }; + } + Ok(vec![coerced_type(func_name, &arg_types[0])?]) +} #[cfg(test)] mod tests { use super::*; - #[test] fn test_aggregate_coerce_types() { // test input args with error number input types @@ -371,16 +371,6 @@ mod tests { let result = coerce_types(&fun, &input_types, &signature); assert_eq!("Error during planning: The function MIN expects 1 arguments, but 2 were provided", result.unwrap_err().strip_backtrace()); - let fun = AggregateFunction::Avg; - // test input args is invalid data type for avg - let input_types = vec![DataType::Utf8]; - let signature = fun.signature(); - let result = coerce_types(&fun, &input_types, &signature); - assert_eq!( - "Error during planning: The function Avg does not support inputs of type Utf8.", - result.unwrap_err().strip_backtrace() - ); - // test count, array_agg, approx_distinct, min, max. // the coerced types is same with input types let funs = vec![ @@ -401,30 +391,6 @@ mod tests { assert_eq!(*input_type, result.unwrap()); } } - - // test avg - let fun = AggregateFunction::Avg; - let signature = fun.signature(); - let r = coerce_types(&fun, &[DataType::Int32], &signature).unwrap(); - assert_eq!(r[0], DataType::Float64); - let r = coerce_types(&fun, &[DataType::Float32], &signature).unwrap(); - assert_eq!(r[0], DataType::Float64); - let r = coerce_types(&fun, &[DataType::Decimal128(20, 3)], &signature).unwrap(); - assert_eq!(r[0], DataType::Decimal128(20, 3)); - let r = coerce_types(&fun, &[DataType::Decimal256(20, 3)], &signature).unwrap(); - assert_eq!(r[0], DataType::Decimal256(20, 3)); - } - - #[test] - fn test_avg_return_data_type() -> Result<()> { - let data_type = DataType::Decimal128(10, 5); - let result_type = avg_return_type(&data_type)?; - assert_eq!(DataType::Decimal128(14, 9), result_type); - - let data_type = DataType::Decimal128(36, 10); - let result_type = avg_return_type(&data_type)?; - assert_eq!(DataType::Decimal128(38, 14), result_type); - Ok(()) } #[test] diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/functions-aggregate/src/average.rs similarity index 76% rename from datafusion/physical-expr/src/aggregate/average.rs rename to datafusion/functions-aggregate/src/average.rs index 80fcc9b70c5f..1dc1f10afce6 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -15,77 +15,85 @@ // specific language governing permissions and limitations // under the License. -//! Defines physical expressions that can evaluated at runtime during query execution +//! Defines `Avg` & `Mean` aggregate & accumulators -use arrow::array::{AsArray, PrimitiveBuilder}; +use arrow::array::{ + self, Array, ArrayRef, ArrowNativeTypeOp, ArrowNumericType, ArrowPrimitiveType, + AsArray, PrimitiveArray, PrimitiveBuilder, UInt64Array, +}; +use arrow::compute::sum; +use arrow::datatypes::{ + i256, ArrowNativeType, DataType, Decimal128Type, Decimal256Type, DecimalType, Field, + Float64Type, UInt64Type, +}; +use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue}; +use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; +use datafusion_expr::type_coercion::aggregates::{avg_return_type, coerce_avg_type}; +use datafusion_expr::utils::format_state_name; +use datafusion_expr::Volatility::Immutable; +use datafusion_expr::{ + Accumulator, AggregateUDFImpl, EmitTo, GroupsAccumulator, ReversedUDAF, Signature, +}; +use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState; +use datafusion_physical_expr_common::aggregate::utils::DecimalAverager; use log::debug; - use std::any::Any; use std::fmt::Debug; use std::sync::Arc; -use crate::aggregate::groups_accumulator::accumulate::NullState; -use crate::aggregate::utils::down_cast_any_ref; -use crate::expressions::format_state_name; -use crate::{AggregateExpr, PhysicalExpr}; -use arrow::compute::sum; -use arrow::datatypes::{DataType, Decimal128Type, Float64Type, UInt64Type}; -use arrow::{ - array::{ArrayRef, UInt64Array}, - datatypes::Field, -}; -use arrow_array::types::{Decimal256Type, DecimalType}; -use arrow_array::{ - Array, ArrowNativeTypeOp, ArrowNumericType, ArrowPrimitiveType, PrimitiveArray, -}; -use arrow_buffer::{i256, ArrowNativeType}; -use datafusion_common::{not_impl_err, Result, ScalarValue}; -use datafusion_expr::type_coercion::aggregates::avg_return_type; -use datafusion_expr::{Accumulator, EmitTo, GroupsAccumulator}; +make_udaf_expr_and_func!( + Avg, + avg, + expression, + "Returns the avg of a group of values.", + avg_udaf +); -use super::utils::DecimalAverager; - -/// AVG aggregate expression -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Avg { - name: String, - expr: Arc, - input_data_type: DataType, - result_data_type: DataType, + signature: Signature, + aliases: Vec, } impl Avg { - /// Create a new AVG aggregate function - pub fn new( - expr: Arc, - name: impl Into, - data_type: DataType, - ) -> Self { - let result_data_type = avg_return_type(&data_type).unwrap(); - + pub fn new() -> Self { Self { - name: name.into(), - expr, - input_data_type: data_type, - result_data_type, + signature: Signature::user_defined(Immutable), + aliases: vec![String::from("mean")], } } } -impl AggregateExpr for Avg { - /// Return a reference to Any that can be used for downcasting +impl Default for Avg { + fn default() -> Self { + Self::new() + } +} + +impl AggregateUDFImpl for Avg { fn as_any(&self) -> &dyn Any { self } - fn field(&self) -> Result { - Ok(Field::new(&self.name, self.result_data_type.clone(), true)) + fn name(&self) -> &str { + "avg" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + avg_return_type(self.name(), &arg_types[0]) } - fn create_accumulator(&self) -> Result> { + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + if acc_args.is_distinct { + return exec_err!("avg(DISTINCT) aggregations are not available"); + } use DataType::*; // instantiate specialized accumulator based for the type - match (&self.input_data_type, &self.result_data_type) { + match (acc_args.input_type, acc_args.data_type) { (Float64, Float64) => Ok(Box::::default()), ( Decimal128(sum_precision, sum_scale), @@ -110,59 +118,47 @@ impl AggregateExpr for Avg { target_precision: *target_precision, target_scale: *target_scale, })), - _ => not_impl_err!( + _ => exec_err!( "AvgAccumulator for ({} --> {})", - self.input_data_type, - self.result_data_type + acc_args.input_type, + acc_args.data_type ), } } - fn state_fields(&self) -> Result> { + fn state_fields(&self, args: StateFieldsArgs) -> Result> { Ok(vec![ Field::new( - format_state_name(&self.name, "count"), + format_state_name(args.name, "count"), DataType::UInt64, true, ), Field::new( - format_state_name(&self.name, "sum"), - self.input_data_type.clone(), + format_state_name(args.name, "sum"), + args.input_type.clone(), true, ), ]) } - fn expressions(&self) -> Vec> { - vec![self.expr.clone()] - } - - fn name(&self) -> &str { - &self.name - } - - fn reverse_expr(&self) -> Option> { - Some(Arc::new(self.clone())) - } - - fn create_sliding_accumulator(&self) -> Result> { - self.create_accumulator() - } - - fn groups_accumulator_supported(&self) -> bool { - use DataType::*; - - matches!(&self.result_data_type, Float64 | Decimal128(_, _)) + fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { + matches!( + args.data_type, + DataType::Float64 | DataType::Decimal128(_, _) + ) } - fn create_groups_accumulator(&self) -> Result> { + fn create_groups_accumulator( + &self, + args: AccumulatorArgs, + ) -> Result> { use DataType::*; // instantiate specialized accumulator based for the type - match (&self.input_data_type, &self.result_data_type) { + match (args.input_type, args.data_type) { (Float64, Float64) => { Ok(Box::new(AvgGroupsAccumulator::::new( - &self.input_data_type, - &self.result_data_type, + args.input_type, + args.data_type, |sum: f64, count: u64| Ok(sum / count as f64), ))) } @@ -180,8 +176,8 @@ impl AggregateExpr for Avg { move |sum: i128, count: u64| decimal_averager.avg(sum, count as i128); Ok(Box::new(AvgGroupsAccumulator::::new( - &self.input_data_type, - &self.result_data_type, + args.input_type, + args.data_type, avg_fn, ))) } @@ -201,32 +197,40 @@ impl AggregateExpr for Avg { }; Ok(Box::new(AvgGroupsAccumulator::::new( - &self.input_data_type, - &self.result_data_type, + args.input_type, + args.data_type, avg_fn, ))) } _ => not_impl_err!( "AvgGroupsAccumulator for ({} --> {})", - self.input_data_type, - self.result_data_type + args.input_type, + args.data_type ), } } -} -impl PartialEq for Avg { - fn eq(&self, other: &dyn Any) -> bool { - down_cast_any_ref(other) - .downcast_ref::() - .map(|x| { - self.name == x.name - && self.input_data_type == x.input_data_type - && self.result_data_type == x.result_data_type - && self.expr.eq(&x.expr) - }) - .unwrap_or(false) + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn create_sliding_accumulator( + &self, + args: AccumulatorArgs, + ) -> Result> { + self.accumulator(args) + } + + fn reverse_expr(&self) -> ReversedUDAF { + ReversedUDAF::Identical + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + if arg_types.len() != 1 { + return exec_err!("{} expects exactly one argument.", self.name()); + } + coerce_avg_type(self.name(), arg_types) } } @@ -238,13 +242,6 @@ pub struct AvgAccumulator { } impl Accumulator for AvgAccumulator { - fn state(&mut self) -> Result> { - Ok(vec![ - ScalarValue::from(self.count), - ScalarValue::Float64(self.sum), - ]) - } - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = values[0].as_primitive::(); self.count += (values.len() - values.null_count()) as u64; @@ -255,13 +252,21 @@ impl Accumulator for AvgAccumulator { Ok(()) } - fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = values[0].as_primitive::(); - self.count -= (values.len() - values.null_count()) as u64; - if let Some(x) = sum(values) { - self.sum = Some(self.sum.unwrap() - x); - } - Ok(()) + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::Float64( + self.sum.map(|f| f / self.count as f64), + )) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } + + fn state(&mut self) -> Result> { + Ok(vec![ + ScalarValue::from(self.count), + ScalarValue::Float64(self.sum), + ]) } fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { @@ -275,23 +280,23 @@ impl Accumulator for AvgAccumulator { } Ok(()) } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::Float64( - self.sum.map(|f| f / self.count as f64), - )) + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = values[0].as_primitive::(); + self.count -= (values.len() - values.null_count()) as u64; + if let Some(x) = sum(values) { + self.sum = Some(self.sum.unwrap() - x); + } + Ok(()) } + fn supports_retract_batch(&self) -> bool { true } - - fn size(&self) -> usize { - std::mem::size_of_val(self) - } } /// An accumulator to compute the average for decimals -struct DecimalAvgAccumulator { +#[derive(Debug)] +struct DecimalAvgAccumulator { sum: Option, count: u64, sum_scale: i8, @@ -300,56 +305,12 @@ struct DecimalAvgAccumulator { target_scale: i8, } -impl Debug for DecimalAvgAccumulator { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("DecimalAvgAccumulator") - .field("sum", &self.sum) - .field("count", &self.count) - .field("sum_scale", &self.sum_scale) - .field("sum_precision", &self.sum_precision) - .field("target_precision", &self.target_precision) - .field("target_scale", &self.target_scale) - .finish() - } -} - -impl Accumulator for DecimalAvgAccumulator { - fn state(&mut self) -> Result> { - Ok(vec![ - ScalarValue::from(self.count), - ScalarValue::new_primitive::( - self.sum, - &T::TYPE_CONSTRUCTOR(self.sum_precision, self.sum_scale), - )?, - ]) - } - +impl Accumulator for DecimalAvgAccumulator { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let values = values[0].as_primitive::(); - self.count += (values.len() - values.null_count()) as u64; - if let Some(x) = sum(values) { - let v = self.sum.get_or_insert(T::Native::default()); - self.sum = Some(v.add_wrapping(x)); - } - Ok(()) - } - fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = values[0].as_primitive::(); - self.count -= (values.len() - values.null_count()) as u64; if let Some(x) = sum(values) { - self.sum = Some(self.sum.unwrap().sub_wrapping(x)); - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - // counts are summed - self.count += sum(states[0].as_primitive::()).unwrap_or_default(); - - // sums are summed - if let Some(x) = sum(states[1].as_primitive::()) { let v = self.sum.get_or_insert(T::Native::default()); self.sum = Some(v.add_wrapping(x)); } @@ -374,13 +335,44 @@ impl Accumulator for DecimalAvgAccumulator &T::TYPE_CONSTRUCTOR(self.target_precision, self.target_scale), ) } - fn supports_retract_batch(&self) -> bool { - true - } fn size(&self) -> usize { std::mem::size_of_val(self) } + + fn state(&mut self) -> Result> { + Ok(vec![ + ScalarValue::from(self.count), + ScalarValue::new_primitive::( + self.sum, + &T::TYPE_CONSTRUCTOR(self.sum_precision, self.sum_scale), + )?, + ]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + // counts are summed + self.count += sum(states[0].as_primitive::()).unwrap_or_default(); + + // sums are summed + if let Some(x) = sum(states[1].as_primitive::()) { + let v = self.sum.get_or_insert(T::Native::default()); + self.sum = Some(v.add_wrapping(x)); + } + Ok(()) + } + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = values[0].as_primitive::(); + self.count -= (values.len() - values.null_count()) as u64; + if let Some(x) = sum(values) { + self.sum = Some(self.sum.unwrap().sub_wrapping(x)); + } + Ok(()) + } + + fn supports_retract_batch(&self) -> bool { + true + } } /// An accumulator to compute the average of `[PrimitiveArray]`. @@ -444,7 +436,7 @@ where &mut self, values: &[ArrayRef], group_indices: &[usize], - opt_filter: Option<&arrow_array::BooleanArray>, + opt_filter: Option<&array::BooleanArray>, total_num_groups: usize, ) -> Result<()> { assert_eq!(values.len(), 1, "single argument to update_batch"); @@ -469,45 +461,6 @@ where Ok(()) } - fn merge_batch( - &mut self, - values: &[ArrayRef], - group_indices: &[usize], - opt_filter: Option<&arrow_array::BooleanArray>, - total_num_groups: usize, - ) -> Result<()> { - assert_eq!(values.len(), 2, "two arguments to merge_batch"); - // first batch is counts, second is partial sums - let partial_counts = values[0].as_primitive::(); - let partial_sums = values[1].as_primitive::(); - // update counts with partial counts - self.counts.resize(total_num_groups, 0); - self.null_state.accumulate( - group_indices, - partial_counts, - opt_filter, - total_num_groups, - |group_index, partial_count| { - self.counts[group_index] += partial_count; - }, - ); - - // update sums - self.sums.resize(total_num_groups, T::default_value()); - self.null_state.accumulate( - group_indices, - partial_sums, - opt_filter, - total_num_groups, - |group_index, new_value: ::Native| { - let sum = &mut self.sums[group_index]; - *sum = sum.add_wrapping(new_value); - }, - ); - - Ok(()) - } - fn evaluate(&mut self, emit_to: EmitTo) -> Result { let counts = emit_to.take_needed(&mut self.counts); let sums = emit_to.take_needed(&mut self.sums); @@ -536,7 +489,7 @@ where .into_iter() .zip(counts.into_iter()) .map(|(sum, count)| (self.avg_fn)(sum, count)) - .collect::>>()?; + .collect::>>()?; PrimitiveArray::new(averages.into(), Some(nulls)) // no copy .with_data_type(self.return_data_type.clone()) }; @@ -562,6 +515,45 @@ where ]) } + fn merge_batch( + &mut self, + values: &[ArrayRef], + group_indices: &[usize], + opt_filter: Option<&array::BooleanArray>, + total_num_groups: usize, + ) -> Result<()> { + assert_eq!(values.len(), 2, "two arguments to merge_batch"); + // first batch is counts, second is partial sums + let partial_counts = values[0].as_primitive::(); + let partial_sums = values[1].as_primitive::(); + // update counts with partial counts + self.counts.resize(total_num_groups, 0); + self.null_state.accumulate( + group_indices, + partial_counts, + opt_filter, + total_num_groups, + |group_index, partial_count| { + self.counts[group_index] += partial_count; + }, + ); + + // update sums + self.sums.resize(total_num_groups, T::default_value()); + self.null_state.accumulate( + group_indices, + partial_sums, + opt_filter, + total_num_groups, + |group_index, new_value: ::Native| { + let sum = &mut self.sums[group_index]; + *sum = sum.add_wrapping(new_value); + }, + ); + + Ok(()) + } + fn size(&self) -> usize { self.counts.capacity() * std::mem::size_of::() + self.sums.capacity() * std::mem::size_of::() diff --git a/datafusion/functions-aggregate/src/lib.rs b/datafusion/functions-aggregate/src/lib.rs index 260d6dab31b9..6b38c4ed61c8 100644 --- a/datafusion/functions-aggregate/src/lib.rs +++ b/datafusion/functions-aggregate/src/lib.rs @@ -69,6 +69,7 @@ pub mod variance; pub mod approx_median; pub mod approx_percentile_cont; pub mod approx_percentile_cont_with_weight; +pub mod average; pub mod bit_and_or_xor; pub mod bool_and_or; pub mod string_agg; @@ -86,6 +87,7 @@ pub mod expr_fn { pub use super::approx_median::approx_median; pub use super::approx_percentile_cont::approx_percentile_cont; pub use super::approx_percentile_cont_with_weight::approx_percentile_cont_with_weight; + pub use super::average::avg; pub use super::bit_and_or_xor::bit_and; pub use super::bit_and_or_xor::bit_or; pub use super::bit_and_or_xor::bit_xor; @@ -147,6 +149,7 @@ pub fn all_default_aggregate_functions() -> Vec> { bit_and_or_xor::bit_xor_udaf(), bool_and_or::bool_and_udaf(), bool_and_or::bool_or_udaf(), + average::avg_udaf(), ] } diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index acc21f14f44d..4fecb243fb41 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -815,13 +815,14 @@ mod test { use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue}; use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction}; use datafusion_expr::logical_plan::{EmptyRelation, Projection}; + use datafusion_expr::test::function_stub::avg_udaf; use datafusion_expr::{ - cast, col, create_udaf, is_true, lit, AccumulatorFactoryFunction, - AggregateFunction, AggregateUDF, BinaryExpr, Case, ColumnarValue, Expr, - ExprSchemable, Filter, LogicalPlan, Operator, ScalarUDF, ScalarUDFImpl, - Signature, SimpleAggregateUDF, Subquery, Volatility, + cast, col, create_udaf, is_true, lit, AccumulatorFactoryFunction, AggregateUDF, + BinaryExpr, Case, ColumnarValue, Expr, ExprSchemable, Filter, LogicalPlan, + Operator, ScalarUDF, ScalarUDFImpl, Signature, SimpleAggregateUDF, Subquery, + Volatility, }; - use datafusion_physical_expr::expressions::AvgAccumulator; + use datafusion_functions_aggregate::average::AvgAccumulator; use crate::analyzer::type_coercion::{ coerce_case_expression, TypeCoercion, TypeCoercionRewriter, @@ -1003,31 +1004,29 @@ mod test { #[test] fn agg_function_case() -> Result<()> { let empty = empty(); - let fun: AggregateFunction = AggregateFunction::Avg; - let agg_expr = Expr::AggregateFunction(expr::AggregateFunction::new( - fun, - vec![lit(12i64)], + let agg_expr = Expr::AggregateFunction(expr::AggregateFunction::new_udf( + avg_udaf(), + vec![lit(12f64)], false, None, None, None, )); let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?); - let expected = "Projection: AVG(CAST(Int64(12) AS Float64))\n EmptyRelation"; + let expected = "Projection: avg(Float64(12))\n EmptyRelation"; assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)?; let empty = empty_with_type(DataType::Int32); - let fun: AggregateFunction = AggregateFunction::Avg; - let agg_expr = Expr::AggregateFunction(expr::AggregateFunction::new( - fun, - vec![col("a")], + let agg_expr = Expr::AggregateFunction(expr::AggregateFunction::new_udf( + avg_udaf(), + vec![cast(col("a"), DataType::Float64)], false, None, None, None, )); let plan = LogicalPlan::Projection(Projection::try_new(vec![agg_expr], empty)?); - let expected = "Projection: AVG(CAST(a AS Float64))\n EmptyRelation"; + let expected = "Projection: avg(CAST(a AS Float64))\n EmptyRelation"; assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)?; Ok(()) } @@ -1035,9 +1034,8 @@ mod test { #[test] fn agg_function_invalid_input_avg() -> Result<()> { let empty = empty(); - let fun: AggregateFunction = AggregateFunction::Avg; - let agg_expr = Expr::AggregateFunction(expr::AggregateFunction::new( - fun, + let agg_expr = Expr::AggregateFunction(expr::AggregateFunction::new_udf( + avg_udaf(), vec![lit("1")], false, None, @@ -1048,10 +1046,7 @@ mod test { .err() .unwrap() .strip_backtrace(); - assert_eq!( - "Error during planning: No function matches the given name and argument types 'AVG(Utf8)'. You might need to add explicit type casts.\n\tCandidate functions:\n\tAVG(Int8/Int16/Int32/Int64/UInt8/UInt16/UInt32/UInt64/Float32/Float64)", - err - ); + assert!(err.starts_with("Error during planning: Error during planning: Coercion from [Utf8] to the signature Uniform(1, [Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64, Float32, Float64]) failed.")); Ok(()) } diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 5f54354df4f2..66c23fdced63 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -1010,15 +1010,15 @@ mod test { use datafusion_expr::logical_plan::{table_scan, JoinType}; - use datafusion_expr::{avg, lit, logical_plan::builder::LogicalPlanBuilder}; use datafusion_expr::{ grouping_set, AccumulatorFactoryFunction, AggregateUDF, Signature, SimpleAggregateUDF, Volatility, }; + use datafusion_expr::{lit, logical_plan::builder::LogicalPlanBuilder}; use crate::optimizer::OptimizerContext; use crate::test::*; - use datafusion_expr::test::function_stub::sum; + use datafusion_expr::test::function_stub::{avg, sum}; use super::*; @@ -1065,8 +1065,8 @@ mod test { expr_to_identifier(&expr, &mut HashMap::new(), &mut id_array, ExprMask::Normal)?; let expected = vec![ - (8, "{(sum(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{sum(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}}"), - (6, "{sum(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"), + (8, "{(sum(a + Int32(1)) - avg(c)) * Int32(2)|{Int32(2)}|{sum(a + Int32(1)) - avg(c)|{avg(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}}"), + (6, "{sum(a + Int32(1)) - avg(c)|{avg(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"), (3, ""), (2, "{a + Int32(1)|{Int32(1)}|{a}}"), (0, ""), @@ -1090,13 +1090,13 @@ mod test { )?; let expected = vec![ - (8, "{(sum(a + Int32(1)) - AVG(c)) * Int32(2)|{Int32(2)}|{sum(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}}"), - (6, "{sum(a + Int32(1)) - AVG(c)|{AVG(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"), + (8, "{(sum(a + Int32(1)) - avg(c)) * Int32(2)|{Int32(2)}|{sum(a + Int32(1)) - avg(c)|{avg(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}}"), + (6, "{sum(a + Int32(1)) - avg(c)|{avg(c)|{c}}|{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}}"), (3, "{sum(a + Int32(1))|{a + Int32(1)|{Int32(1)}|{a}}}"), (2, "{a + Int32(1)|{Int32(1)}|{a}}"), (0, ""), (1, ""), - (5, "{AVG(c)|{c}}"), + (5, "{avg(c)|{c}}"), (4, ""), (7, "") ] @@ -1203,8 +1203,8 @@ mod test { )? .build()?; - let expected = "Projection: __common_expr_1 AS col1, __common_expr_1 AS col2, col3, __common_expr_3 AS AVG(test.c), __common_expr_2 AS col4, __common_expr_2 AS col5, col6, __common_expr_4 AS my_agg(test.c)\ - \n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS __common_expr_1, my_agg(test.a) AS __common_expr_2, AVG(test.b) AS col3, AVG(test.c) AS __common_expr_3, my_agg(test.b) AS col6, my_agg(test.c) AS __common_expr_4]]\ + let expected = "Projection: __common_expr_1 AS col1, __common_expr_1 AS col2, col3, __common_expr_3 AS avg(test.c), __common_expr_2 AS col4, __common_expr_2 AS col5, col6, __common_expr_4 AS my_agg(test.c)\ + \n Aggregate: groupBy=[[]], aggr=[[avg(test.a) AS __common_expr_1, my_agg(test.a) AS __common_expr_2, avg(test.b) AS col3, avg(test.c) AS __common_expr_3, my_agg(test.b) AS col6, my_agg(test.c) AS __common_expr_4]]\ \n TableScan: test"; assert_optimized_plan_eq(expected, plan, None); @@ -1222,8 +1222,8 @@ mod test { )? .build()?; - let expected = "Projection: Int32(1) + __common_expr_1 AS AVG(test.a), Int32(1) - __common_expr_1 AS AVG(test.a), Int32(1) + __common_expr_2 AS my_agg(test.a), Int32(1) - __common_expr_2 AS my_agg(test.a)\ - \n Aggregate: groupBy=[[]], aggr=[[AVG(test.a) AS __common_expr_1, my_agg(test.a) AS __common_expr_2]]\ + let expected = "Projection: Int32(1) + __common_expr_1 AS avg(test.a), Int32(1) - __common_expr_1 AS avg(test.a), Int32(1) + __common_expr_2 AS my_agg(test.a), Int32(1) - __common_expr_2 AS my_agg(test.a)\ + \n Aggregate: groupBy=[[]], aggr=[[avg(test.a) AS __common_expr_1, my_agg(test.a) AS __common_expr_2]]\ \n TableScan: test"; assert_optimized_plan_eq(expected, plan, None); @@ -1239,7 +1239,7 @@ mod test { )? .build()?; - let expected ="Aggregate: groupBy=[[]], aggr=[[AVG(__common_expr_1) AS col1, my_agg(__common_expr_1) AS col2]]\ + let expected ="Aggregate: groupBy=[[]], aggr=[[avg(__common_expr_1) AS col1, my_agg(__common_expr_1) AS col2]]\ \n Projection: UInt32(1) + test.a AS __common_expr_1, test.a, test.b, test.c\ \n TableScan: test"; @@ -1256,7 +1256,7 @@ mod test { )? .build()?; - let expected = "Aggregate: groupBy=[[__common_expr_1 AS UInt32(1) + test.a]], aggr=[[AVG(__common_expr_1) AS col1, my_agg(__common_expr_1) AS col2]]\ + let expected = "Aggregate: groupBy=[[__common_expr_1 AS UInt32(1) + test.a]], aggr=[[avg(__common_expr_1) AS col1, my_agg(__common_expr_1) AS col2]]\ \n Projection: UInt32(1) + test.a AS __common_expr_1, test.a, test.b, test.c\ \n TableScan: test"; @@ -1277,8 +1277,8 @@ mod test { )? .build()?; - let expected = "Projection: UInt32(1) + test.a, UInt32(1) + __common_expr_2 AS col1, UInt32(1) - __common_expr_2 AS col2, __common_expr_4 AS AVG(UInt32(1) + test.a), UInt32(1) + __common_expr_3 AS col3, UInt32(1) - __common_expr_3 AS col4, __common_expr_5 AS my_agg(UInt32(1) + test.a)\ - \n Aggregate: groupBy=[[__common_expr_1 AS UInt32(1) + test.a]], aggr=[[AVG(__common_expr_1) AS __common_expr_2, my_agg(__common_expr_1) AS __common_expr_3, AVG(__common_expr_1 AS UInt32(1) + test.a) AS __common_expr_4, my_agg(__common_expr_1 AS UInt32(1) + test.a) AS __common_expr_5]]\ + let expected = "Projection: UInt32(1) + test.a, UInt32(1) + __common_expr_2 AS col1, UInt32(1) - __common_expr_2 AS col2, __common_expr_4 AS avg(UInt32(1) + test.a), UInt32(1) + __common_expr_3 AS col3, UInt32(1) - __common_expr_3 AS col4, __common_expr_5 AS my_agg(UInt32(1) + test.a)\ + \n Aggregate: groupBy=[[__common_expr_1 AS UInt32(1) + test.a]], aggr=[[avg(__common_expr_1) AS __common_expr_2, my_agg(__common_expr_1) AS __common_expr_3, avg(__common_expr_1 AS UInt32(1) + test.a) AS __common_expr_4, my_agg(__common_expr_1 AS UInt32(1) + test.a) AS __common_expr_5]]\ \n Projection: UInt32(1) + test.a AS __common_expr_1, test.a, test.b, test.c\ \n TableScan: test"; @@ -1304,8 +1304,8 @@ mod test { )? .build()?; - let expected = "Projection: table.test.col.a, UInt32(1) + __common_expr_2 AS AVG(UInt32(1) + table.test.col.a), __common_expr_2 AS AVG(UInt32(1) + table.test.col.a)\ - \n Aggregate: groupBy=[[table.test.col.a]], aggr=[[AVG(__common_expr_1 AS UInt32(1) + table.test.col.a) AS __common_expr_2]]\ + let expected = "Projection: table.test.col.a, UInt32(1) + __common_expr_2 AS avg(UInt32(1) + table.test.col.a), __common_expr_2 AS avg(UInt32(1) + table.test.col.a)\ + \n Aggregate: groupBy=[[table.test.col.a]], aggr=[[avg(__common_expr_1 AS UInt32(1) + table.test.col.a) AS __common_expr_2]]\ \n Projection: UInt32(1) + table.test.col.a AS __common_expr_1, table.test.col.a\ \n TableScan: table.test"; diff --git a/datafusion/optimizer/tests/optimizer_integration.rs b/datafusion/optimizer/tests/optimizer_integration.rs index 70308183306e..c501d5aaa4bf 100644 --- a/datafusion/optimizer/tests/optimizer_integration.rs +++ b/datafusion/optimizer/tests/optimizer_integration.rs @@ -25,6 +25,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::{plan_err, Result}; use datafusion_expr::test::function_stub::sum_udaf; use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF}; +use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::count::count_udaf; use datafusion_optimizer::analyzer::Analyzer; use datafusion_optimizer::optimizer::Optimizer; @@ -64,16 +65,16 @@ fn subquery_filter_with_cast() -> Result<()> { // regression test for https://github.com/apache/datafusion/issues/3760 let sql = "SELECT col_int32 FROM test \ WHERE col_int32 > (\ - SELECT AVG(col_int32) FROM test \ + SELECT avg(col_int32) FROM test \ WHERE col_utf8 BETWEEN '2002-05-08' \ AND (cast('2002-05-08' as date) + interval '5 days')\ )"; let plan = test_sql(sql)?; let expected = "Projection: test.col_int32\ - \n Inner Join: Filter: CAST(test.col_int32 AS Float64) > __scalar_sq_1.AVG(test.col_int32)\ + \n Inner Join: Filter: CAST(test.col_int32 AS Float64) > __scalar_sq_1.avg(test.col_int32)\ \n TableScan: test projection=[col_int32]\ \n SubqueryAlias: __scalar_sq_1\ - \n Aggregate: groupBy=[[]], aggr=[[AVG(CAST(test.col_int32 AS Float64))]]\ + \n Aggregate: groupBy=[[]], aggr=[[avg(CAST(test.col_int32 AS Float64))]]\ \n Projection: test.col_int32\ \n Filter: test.col_utf8 >= Utf8(\"2002-05-08\") AND test.col_utf8 <= Utf8(\"2002-05-13\")\ \n TableScan: test projection=[col_int32, col_utf8]"; @@ -326,7 +327,8 @@ fn test_sql(sql: &str) -> Result { let statement = &ast[0]; let context_provider = MyContextProvider::default() .with_udaf(sum_udaf()) - .with_udaf(count_udaf()); + .with_udaf(count_udaf()) + .with_udaf(avg_udaf()); let sql_to_rel = SqlToRel::new(&context_provider); let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index 53cfcfb033a1..2b80af9bf3bc 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -33,7 +33,6 @@ use arrow::datatypes::Schema; use datafusion_common::{exec_err, not_impl_err, Result}; use datafusion_expr::AggregateFunction; -use crate::aggregate::average::Avg; use crate::expressions::{self, Literal}; use crate::{AggregateExpr, PhysicalExpr, PhysicalSortExpr}; @@ -108,12 +107,6 @@ pub fn create_aggregate_expr( name, data_type, )), - (AggregateFunction::Avg, false) => { - Arc::new(Avg::new(input_phy_exprs[0].clone(), name, data_type)) - } - (AggregateFunction::Avg, true) => { - return not_impl_err!("AVG(DISTINCT) aggregations are not available"); - } (AggregateFunction::Correlation, false) => { Arc::new(expressions::Correlation::new( input_phy_exprs[0].clone(), @@ -155,7 +148,7 @@ mod tests { use datafusion_common::plan_err; use datafusion_expr::{type_coercion, Signature}; - use crate::expressions::{try_cast, ArrayAgg, Avg, DistinctArrayAgg, Max, Min}; + use crate::expressions::{try_cast, ArrayAgg, DistinctArrayAgg, Max, Min}; use super::*; #[test] @@ -269,44 +262,6 @@ mod tests { Ok(()) } - #[test] - fn test_sum_avg_expr() -> Result<()> { - let funcs = vec![AggregateFunction::Avg]; - let data_types = vec![ - DataType::UInt32, - DataType::UInt64, - DataType::Int32, - DataType::Int64, - DataType::Float32, - DataType::Float64, - ]; - for fun in funcs { - for data_type in &data_types { - let input_schema = - Schema::new(vec![Field::new("c1", data_type.clone(), true)]); - let input_phy_exprs: Vec> = vec![Arc::new( - expressions::Column::new_with_schema("c1", &input_schema).unwrap(), - )]; - let result_agg_phy_exprs = create_physical_agg_expr_for_test( - &fun, - false, - &input_phy_exprs[0..1], - &input_schema, - "c1", - )?; - if fun == AggregateFunction::Avg { - assert!(result_agg_phy_exprs.as_any().is::()); - assert_eq!("c1", result_agg_phy_exprs.name()); - assert_eq!( - Field::new("c1", DataType::Float64, true), - result_agg_phy_exprs.field().unwrap() - ); - }; - } - } - Ok(()) - } - #[test] fn test_min_max() -> Result<()> { let observed = AggregateFunction::Min.return_type(&[DataType::Utf8])?; @@ -328,33 +283,6 @@ mod tests { Ok(()) } - #[test] - fn test_avg_return_type() -> Result<()> { - let observed = AggregateFunction::Avg.return_type(&[DataType::Float32])?; - assert_eq!(DataType::Float64, observed); - - let observed = AggregateFunction::Avg.return_type(&[DataType::Float64])?; - assert_eq!(DataType::Float64, observed); - - let observed = AggregateFunction::Avg.return_type(&[DataType::Int32])?; - assert_eq!(DataType::Float64, observed); - - let observed = - AggregateFunction::Avg.return_type(&[DataType::Decimal128(10, 6)])?; - assert_eq!(DataType::Decimal128(14, 10), observed); - - let observed = - AggregateFunction::Avg.return_type(&[DataType::Decimal128(36, 6)])?; - assert_eq!(DataType::Decimal128(38, 10), observed); - Ok(()) - } - - #[test] - fn test_avg_no_utf8() { - let observed = AggregateFunction::Avg.return_type(&[DataType::Utf8]); - assert!(observed.is_err()); - } - // Helper function // Create aggregate expr with type coercion fn create_physical_agg_expr_for_test( diff --git a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs index 73d810ec056d..1944e2b2d415 100644 --- a/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs +++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/mod.rs @@ -19,6 +19,7 @@ mod adapter; pub use adapter::GroupsAccumulatorAdapter; // Backward compatibility +#[allow(unused_imports)] pub(crate) mod accumulate { pub use datafusion_physical_expr_common::aggregate::groups_accumulator::accumulate::NullState; } diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs index f64c5b1fb260..7cb50fc25222 100644 --- a/datafusion/physical-expr/src/aggregate/mod.rs +++ b/datafusion/physical-expr/src/aggregate/mod.rs @@ -20,7 +20,6 @@ pub use datafusion_physical_expr_common::aggregate::AggregateExpr; pub(crate) mod array_agg; pub(crate) mod array_agg_distinct; pub(crate) mod array_agg_ordered; -pub(crate) mod average; pub(crate) mod correlation; pub(crate) mod covariance; pub(crate) mod grouping; diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index 0020aa5f55b2..59de4d08745c 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -38,8 +38,6 @@ pub mod helpers { pub use crate::aggregate::array_agg::ArrayAgg; pub use crate::aggregate::array_agg_distinct::DistinctArrayAgg; pub use crate::aggregate::array_agg_ordered::OrderSensitiveArrayAgg; -pub use crate::aggregate::average::Avg; -pub use crate::aggregate::average::AvgAccumulator; pub use crate::aggregate::build_in::create_aggregate_expr; pub use crate::aggregate::correlation::Correlation; pub use crate::aggregate::grouping::Grouping; diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index b7d8d60f4f35..4c187f03f36b 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1177,7 +1177,7 @@ mod tests { use crate::coalesce_batches::CoalesceBatchesExec; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::common; - use crate::expressions::{col, Avg}; + use crate::expressions::col; use crate::memory::MemoryExec; use crate::test::assert_is_pending; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; @@ -1194,6 +1194,7 @@ mod tests { use datafusion_execution::memory_pool::FairSpillPool; use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion_expr::expr::Sort; + use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::count::count_udaf; use datafusion_functions_aggregate::median::median_udaf; use datafusion_physical_expr::expressions::{ @@ -1485,11 +1486,17 @@ mod tests { groups: vec![vec![false]], }; - let aggregates: Vec> = vec![Arc::new(Avg::new( - col("b", &input_schema)?, - "AVG(b)".to_string(), - DataType::Float64, - ))]; + let aggregates: Vec> = vec![create_aggregate_expr( + &avg_udaf(), + &[col("b", &input_schema)?], + &[datafusion_expr::col("b")], + &[], + &[], + &input_schema, + "AVG(b)", + false, + false, + )?]; let task_ctx = if spill { // set to an appropriate value to trigger spill @@ -1819,11 +1826,17 @@ mod tests { vec![test_median_agg_expr(&input_schema)?]; // use fast-path in `row_hash.rs`. - let aggregates_v2: Vec> = vec![Arc::new(Avg::new( - col("b", &input_schema)?, - "AVG(b)".to_string(), - DataType::Float64, - ))]; + let aggregates_v2: Vec> = vec![create_aggregate_expr( + &avg_udaf(), + &[col("b", &input_schema)?], + &[datafusion_expr::col("b")], + &[], + &[], + &input_schema, + "AVG(b)", + false, + false, + )?]; for (version, groups, aggregates) in [ (0, groups_none, aggregates_v0), @@ -1873,15 +1886,21 @@ mod tests { async fn test_drop_cancel_without_groups() -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); + Arc::new(Schema::new(vec![Field::new("a", DataType::Float64, true)])); let groups = PhysicalGroupBy::default(); - let aggregates: Vec> = vec![Arc::new(Avg::new( - col("a", &schema)?, - "AVG(a)".to_string(), - DataType::Float64, - ))]; + let aggregates: Vec> = vec![create_aggregate_expr( + &avg_udaf(), + &[col("a", &schema)?], + &[datafusion_expr::col("a")], + &[], + &[], + &schema, + "AVG(a)", + false, + false, + )?]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); @@ -1908,18 +1927,24 @@ mod tests { async fn test_drop_cancel_with_groups() -> Result<()> { let task_ctx = Arc::new(TaskContext::default()); let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Float32, true), - Field::new("b", DataType::Float32, true), + Field::new("a", DataType::Float64, true), + Field::new("b", DataType::Float64, true), ])); let groups = PhysicalGroupBy::new_single(vec![(col("a", &schema)?, "a".to_string())]); - let aggregates: Vec> = vec![Arc::new(Avg::new( - col("b", &schema)?, - "AVG(b)".to_string(), - DataType::Float64, - ))]; + let aggregates: Vec> = vec![create_aggregate_expr( + &avg_udaf(), + &[col("b", &schema)?], + &[datafusion_expr::col("b")], + &[], + &[], + &schema, + "AVG(b)", + false, + false, + )?]; let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); let refs = blocking_exec.refs(); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 50356d5b6052..8d73643f9c33 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -475,7 +475,7 @@ enum AggregateFunction { MIN = 0; MAX = 1; // SUM = 2; - AVG = 3; + // AVG = 3; // COUNT = 4; // APPROX_DISTINCT = 5; ARRAY_AGG = 6; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 8cca0fe4a876..10d3f54dc83c 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -534,7 +534,6 @@ impl serde::Serialize for AggregateFunction { let variant = match self { Self::Min => "MIN", Self::Max => "MAX", - Self::Avg => "AVG", Self::ArrayAgg => "ARRAY_AGG", Self::Correlation => "CORRELATION", Self::Grouping => "GROUPING", @@ -552,7 +551,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { const FIELDS: &[&str] = &[ "MIN", "MAX", - "AVG", "ARRAY_AGG", "CORRELATION", "GROUPING", @@ -599,7 +597,6 @@ impl<'de> serde::Deserialize<'de> for AggregateFunction { match value { "MIN" => Ok(AggregateFunction::Min), "MAX" => Ok(AggregateFunction::Max), - "AVG" => Ok(AggregateFunction::Avg), "ARRAY_AGG" => Ok(AggregateFunction::ArrayAgg), "CORRELATION" => Ok(AggregateFunction::Correlation), "GROUPING" => Ok(AggregateFunction::Grouping), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 56f14982923d..9504d26d44c8 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1929,7 +1929,7 @@ pub enum AggregateFunction { Min = 0, Max = 1, /// SUM = 2; - Avg = 3, + /// AVG = 3; /// COUNT = 4; /// APPROX_DISTINCT = 5; ArrayAgg = 6, @@ -1971,7 +1971,6 @@ impl AggregateFunction { match self { AggregateFunction::Min => "MIN", AggregateFunction::Max => "MAX", - AggregateFunction::Avg => "AVG", AggregateFunction::ArrayAgg => "ARRAY_AGG", AggregateFunction::Correlation => "CORRELATION", AggregateFunction::Grouping => "GROUPING", @@ -1983,7 +1982,6 @@ impl AggregateFunction { match value { "MIN" => Some(Self::Min), "MAX" => Some(Self::Max), - "AVG" => Some(Self::Avg), "ARRAY_AGG" => Some(Self::ArrayAgg), "CORRELATION" => Some(Self::Correlation), "GROUPING" => Some(Self::Grouping), diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index ba0e708218cf..420e59688d30 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -139,7 +139,6 @@ impl From for AggregateFunction { match agg_fun { protobuf::AggregateFunction::Min => Self::Min, protobuf::AggregateFunction::Max => Self::Max, - protobuf::AggregateFunction::Avg => Self::Avg, protobuf::AggregateFunction::ArrayAgg => Self::ArrayAgg, protobuf::AggregateFunction::Correlation => Self::Correlation, protobuf::AggregateFunction::Grouping => Self::Grouping, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 08999effa4b1..d12bfd9ffd4d 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -110,7 +110,6 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction { match value { AggregateFunction::Min => Self::Min, AggregateFunction::Max => Self::Max, - AggregateFunction::Avg => Self::Avg, AggregateFunction::ArrayAgg => Self::ArrayAgg, AggregateFunction::Correlation => Self::Correlation, AggregateFunction::Grouping => Self::Grouping, @@ -374,7 +373,6 @@ pub fn serialize_expr( AggregateFunction::ArrayAgg => protobuf::AggregateFunction::ArrayAgg, AggregateFunction::Min => protobuf::AggregateFunction::Min, AggregateFunction::Max => protobuf::AggregateFunction::Max, - AggregateFunction::Avg => protobuf::AggregateFunction::Avg, AggregateFunction::Correlation => { protobuf::AggregateFunction::Correlation } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index a9d3736dee08..4554e529c322 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -23,7 +23,7 @@ use datafusion::datasource::file_format::parquet::ParquetSink; use datafusion::physical_expr::window::{NthValueKind, SlidingAggregateWindowExpr}; use datafusion::physical_expr::{PhysicalSortExpr, ScalarFunctionExpr}; use datafusion::physical_plan::expressions::{ - ArrayAgg, Avg, BinaryExpr, CaseExpr, CastExpr, Column, Correlation, CumeDist, + ArrayAgg, BinaryExpr, CaseExpr, CastExpr, Column, Correlation, CumeDist, DistinctArrayAgg, Grouping, InListExpr, IsNotNullExpr, IsNullExpr, Literal, Max, Min, NegativeExpr, NotExpr, NthValue, NthValueAgg, Ntile, OrderSensitiveArrayAgg, Rank, RankType, RowNumber, TryCastExpr, WindowShift, @@ -165,21 +165,28 @@ pub fn serialize_physical_window_expr( } else if let Some(plain_aggr_window_expr) = expr.downcast_ref::() { - let AggrFn { inner, distinct } = - aggr_expr_to_aggr_fn(plain_aggr_window_expr.get_aggregate_expr().as_ref())?; + let aggr_expr = plain_aggr_window_expr.get_aggregate_expr(); + if let Some(a) = aggr_expr.as_any().downcast_ref::() { + physical_window_expr_node::WindowFunction::UserDefinedAggrFunction( + a.fun().name().to_string(), + ) + } else { + let AggrFn { inner, distinct } = aggr_expr_to_aggr_fn( + plain_aggr_window_expr.get_aggregate_expr().as_ref(), + )?; - if distinct { - // TODO - return not_impl_err!( - "Distinct aggregate functions not supported in window expressions" - ); - } + if distinct { + return not_impl_err!( + "Distinct aggregate functions not supported in window expressions" + ); + } - if !window_frame.start_bound.is_unbounded() { - return Err(DataFusionError::Internal(format!("Invalid PlainAggregateWindowExpr = {window_expr:?} with WindowFrame = {window_frame:?}"))); - } + if !window_frame.start_bound.is_unbounded() { + return Err(DataFusionError::Internal(format!("Invalid PlainAggregateWindowExpr = {window_expr:?} with WindowFrame = {window_frame:?}"))); + } - physical_window_expr_node::WindowFunction::AggrFunction(inner as i32) + physical_window_expr_node::WindowFunction::AggrFunction(inner as i32) + } } else if let Some(sliding_aggr_window_expr) = expr.downcast_ref::() { @@ -251,8 +258,6 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> Result { protobuf::AggregateFunction::Min } else if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::Max - } else if aggr_expr.downcast_ref::().is_some() { - protobuf::AggregateFunction::Avg } else if aggr_expr.downcast_ref::().is_some() { protobuf::AggregateFunction::Correlation } else if aggr_expr.downcast_ref::().is_some() { diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index b3966c3f0204..1d61a65f0be4 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -59,6 +59,7 @@ use datafusion_expr::{ TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF, WindowUDFImpl, }; +use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::expr_fn::{ bit_and, bit_or, bit_xor, bool_and, bool_or, }; @@ -2163,7 +2164,16 @@ fn roundtrip_window() { vec![col("col1")], vec![col("col1")], vec![col("col2")], - row_number_frame, + row_number_frame.clone(), + None, + )); + + let text_expr7 = Expr::WindowFunction(expr::WindowFunction::new( + WindowFunctionDefinition::AggregateUDF(avg_udaf()), + vec![col("col1")], + vec![], + vec![], + row_number_frame.clone(), None, )); @@ -2174,5 +2184,6 @@ fn roundtrip_window() { roundtrip_expr_test(test_expr3, ctx.clone()); roundtrip_expr_test(test_expr4, ctx.clone()); roundtrip_expr_test(test_expr5, ctx.clone()); - roundtrip_expr_test(test_expr6, ctx); + roundtrip_expr_test(test_expr6, ctx.clone()); + roundtrip_expr_test(text_expr7, ctx); } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index eb3313239544..03c72cfc32b1 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -47,7 +47,7 @@ use datafusion::physical_plan::aggregates::{ use datafusion::physical_plan::analyze::AnalyzeExec; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::expressions::{ - binary, cast, col, in_list, like, lit, Avg, BinaryExpr, Column, NotExpr, NthValue, + binary, cast, col, in_list, like, lit, BinaryExpr, Column, NotExpr, NthValue, PhysicalSortExpr, }; use datafusion::physical_plan::filter::FilterExec; @@ -60,6 +60,7 @@ use datafusion::physical_plan::placeholder_row::PlaceholderRowExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::udaf::create_aggregate_expr; use datafusion::physical_plan::union::{InterleaveExec, UnionExec}; use datafusion::physical_plan::windows::{ BuiltInWindowExpr, PlainAggregateWindowExpr, WindowAggExec, @@ -79,6 +80,7 @@ use datafusion_expr::{ Accumulator, AccumulatorFactoryFunction, AggregateUDF, ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, SimpleAggregateUDF, WindowFrame, WindowFrameBound, }; +use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::string_agg::StringAgg; use datafusion_proto::physical_plan::{ AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec, @@ -282,11 +284,17 @@ fn roundtrip_window() -> Result<()> { )); let plain_aggr_window_expr = Arc::new(PlainAggregateWindowExpr::new( - Arc::new(Avg::new( - cast(col("b", &schema)?, &schema, DataType::Float64)?, - "AVG(b)".to_string(), - DataType::Float64, - )), + create_aggregate_expr( + &avg_udaf(), + &[cast(col("b", &schema)?, &schema, DataType::Float64)?], + &[], + &[], + &[], + &schema, + "avg(b)", + false, + false, + )?, &[], &[], Arc::new(WindowFrame::new(None)), @@ -342,11 +350,17 @@ fn rountrip_aggregate() -> Result<()> { let test_cases: Vec>> = vec![ // AVG - vec![Arc::new(Avg::new( - cast(col("b", &schema)?, &schema, DataType::Float64)?, - "AVG(b)".to_string(), - DataType::Float64, - ))], + vec![create_aggregate_expr( + &avg_udaf(), + &[col("b", &schema)?], + &[], + &[], + &[], + &schema, + "AVG(b)", + false, + false, + )?], // NTH_VALUE vec![Arc::new(NthValueAgg::new( col("b", &schema)?, @@ -398,11 +412,17 @@ fn rountrip_aggregate_with_limit() -> Result<()> { let groups: Vec<(Arc, String)> = vec![(col("a", &schema)?, "unused".to_string())]; - let aggregates: Vec> = vec![Arc::new(Avg::new( - cast(col("b", &schema)?, &schema, DataType::Float64)?, - "AVG(b)".to_string(), - DataType::Float64, - ))]; + let aggregates: Vec> = vec![create_aggregate_expr( + &avg_udaf(), + &[col("b", &schema)?], + &[], + &[], + &[], + &schema, + "AVG(b)", + false, + false, + )?]; let agg = AggregateExec::try_new( AggregateMode::Final, diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 3905d7d18424..f196d71d41de 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -37,6 +37,7 @@ use datafusion_sql::{ planner::{ParserOptions, SqlToRel}, }; +use datafusion_functions_aggregate::average::avg_udaf; use datafusion_functions_aggregate::{ approx_median::approx_median_udaf, count::count_udaf, }; @@ -2309,10 +2310,10 @@ fn empty_over_plus() { #[test] fn empty_over_multiple() { - let sql = "SELECT order_id, MAX(qty) OVER (), min(qty) over (), aVg(qty) OVER () from orders"; + let sql = "SELECT order_id, MAX(qty) OVER (), min(qty) over (), avg(qty) OVER () from orders"; let expected = "\ - Projection: orders.order_id, MAX(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MIN(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, AVG(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\ - \n WindowAggr: windowExpr=[[MAX(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MIN(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, AVG(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\ + Projection: orders.order_id, MAX(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MIN(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, avg(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\ + \n WindowAggr: windowExpr=[[MAX(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MIN(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, avg(orders.qty) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\ \n TableScan: orders"; quick_test(sql, expected); } @@ -2627,8 +2628,8 @@ fn select_groupby_orderby() { // expect that this is not an ambiguous reference let expected = "Sort: birth_date ASC NULLS LAST\ - \n Projection: AVG(person.age) AS value, date_trunc(Utf8(\"month\"), person.birth_date) AS birth_date\ - \n Aggregate: groupBy=[[person.birth_date]], aggr=[[AVG(person.age)]]\ + \n Projection: avg(person.age) AS value, date_trunc(Utf8(\"month\"), person.birth_date) AS birth_date\ + \n Aggregate: groupBy=[[person.birth_date]], aggr=[[avg(person.age)]]\ \n TableScan: person"; quick_test(sql, expected); @@ -2705,7 +2706,8 @@ fn logical_plan_with_dialect_and_options( .with_udf(make_udf("sqrt", vec![DataType::Int64], DataType::Int64)) .with_udaf(sum_udaf()) .with_udaf(approx_median_udaf()) - .with_udaf(count_udaf()); + .with_udaf(count_udaf()) + .with_udaf(avg_udaf()); let planner = SqlToRel::new_with_options(&context, options); let result = DFParser::parse_sql_with_dialect(sql, dialect); diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 4090ff8764e6..552ad6a2a756 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -3690,10 +3690,10 @@ X 2 2 2 2 Y 1 1 1 1 # aggregate_timestamps_avg -statement error DataFusion error: Error during planning: No function matches the given name and argument types 'AVG\(Timestamp\(Nanosecond, None\)\)'\. You might need to add explicit type casts\. +query error SELECT avg(nanos), avg(micros), avg(millis), avg(secs) FROM t -statement error DataFusion error: Error during planning: No function matches the given name and argument types 'AVG\(Timestamp\(Nanosecond, None\)\)'\. You might need to add explicit type casts\. +query error SELECT tag, avg(nanos), avg(micros), avg(millis), avg(secs) FROM t GROUP BY tag ORDER BY tag; # aggregate_duration_array_agg @@ -3790,10 +3790,10 @@ Y 2021-01-01 2021-01-01T00:00:00 # aggregate_timestamps_avg -statement error DataFusion error: Error during planning: No function matches the given name and argument types 'AVG\(Date32\)'\. You might need to add explicit type casts\. +query error SELECT avg(date32), avg(date64) FROM t -statement error DataFusion error: Error during planning: No function matches the given name and argument types 'AVG\(Date32\)'\. You might need to add explicit type casts\. +query error SELECT tag, avg(date32), avg(date64) FROM t GROUP BY tag ORDER BY tag; @@ -3888,10 +3888,10 @@ B 21:06:28.247821084 21:06:28.247821 21:06:28.247 21:06:28 # aggregate_times_avg -statement error DataFusion error: Error during planning: No function matches the given name and argument types 'AVG\(Time64\(Nanosecond\)\)'\. You might need to add explicit type casts\. +query error SELECT avg(nanos), avg(micros), avg(millis), avg(secs) FROM t -statement error DataFusion error: Error during planning: No function matches the given name and argument types 'AVG\(Time64\(Nanosecond\)\)'\. You might need to add explicit type casts\. +query error SELECT tag, avg(nanos), avg(micros), avg(millis), avg(secs) FROM t GROUP BY tag ORDER BY tag; statement ok @@ -4325,7 +4325,7 @@ select avg(distinct x_dict) from value_dict; ---- 3 -statement error DataFusion error: This feature is not implemented: AVG\(DISTINCT\) aggregations are not available +query error select avg(x_dict), avg(distinct x_dict) from value_dict; query I diff --git a/datafusion/sqllogictest/test_files/errors.slt b/datafusion/sqllogictest/test_files/errors.slt index d51c69496d46..fa25f00974a9 100644 --- a/datafusion/sqllogictest/test_files/errors.slt +++ b/datafusion/sqllogictest/test_files/errors.slt @@ -108,7 +108,7 @@ query error select count(); # AggregateFunction with wrong number of arguments -statement error DataFusion error: Error during planning: No function matches the given name and argument types 'AVG\(Utf8, Float64\)'\. You might need to add explicit type casts\.\n\tCandidate functions:\n\tAVG\(Int8/Int16/Int32/Int64/UInt8/UInt16/UInt32/UInt64/Float32/Float64\) +query error select avg(c1, c12) from aggregate_test_100; # AggregateFunction with wrong argument type diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index b08df0e4b35f..f7f251e4b95c 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -1962,9 +1962,9 @@ GROUP BY ALL; 2 0 13 query IIR rowsort -SELECT sub.col1, sub.col0, sub."AVG(tab3.col2)" AS avg_col2 +SELECT sub.col1, sub.col0, sub."avg(tab3.col2)" AS avg_col2 FROM ( - SELECT col1, AVG(col2), col0 FROM tab3 GROUP BY ALL + SELECT col1, avg(col2), col0 FROM tab3 GROUP BY ALL ) AS sub GROUP BY ALL; ---- diff --git a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt index ef77bdca1b3e..de6a153f58d9 100644 --- a/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt +++ b/datafusion/sqllogictest/test_files/optimizer_group_by_constant.slt @@ -60,8 +60,8 @@ FROM test_table t group by 1, 2, 3 ---- logical_plan -01)Projection: Int64(123), Int64(456), Int64(789), count(Int64(1)), AVG(t.c12) -02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)), AVG(t.c12)]] +01)Projection: Int64(123), Int64(456), Int64(789), count(Int64(1)), avg(t.c12) +02)--Aggregate: groupBy=[[]], aggr=[[count(Int64(1)), avg(t.c12)]] 03)----SubqueryAlias: t 04)------TableScan: test_table projection=[c12] diff --git a/datafusion/sqllogictest/test_files/predicates.slt b/datafusion/sqllogictest/test_files/predicates.slt index 4e8f0eb6caaa..ffaae7204eca 100644 --- a/datafusion/sqllogictest/test_files/predicates.slt +++ b/datafusion/sqllogictest/test_files/predicates.slt @@ -748,7 +748,7 @@ OR GROUP BY p_partkey; ---- logical_plan -01)Aggregate: groupBy=[[part.p_partkey]], aggr=[[sum(lineitem.l_extendedprice), AVG(lineitem.l_discount), count(DISTINCT partsupp.ps_suppkey)]] +01)Aggregate: groupBy=[[part.p_partkey]], aggr=[[sum(lineitem.l_extendedprice), avg(lineitem.l_discount), count(DISTINCT partsupp.ps_suppkey)]] 02)--Projection: lineitem.l_extendedprice, lineitem.l_discount, part.p_partkey, partsupp.ps_suppkey 03)----Inner Join: part.p_partkey = partsupp.ps_partkey 04)------Projection: lineitem.l_extendedprice, lineitem.l_discount, part.p_partkey @@ -759,7 +759,7 @@ logical_plan 09)--------------TableScan: part projection=[p_partkey, p_brand], partial_filters=[part.p_brand = Utf8("Brand#12") OR part.p_brand = Utf8("Brand#23")] 10)------TableScan: partsupp projection=[ps_partkey, ps_suppkey] physical_plan -01)AggregateExec: mode=SinglePartitioned, gby=[p_partkey@2 as p_partkey], aggr=[sum(lineitem.l_extendedprice), AVG(lineitem.l_discount), count(DISTINCT partsupp.ps_suppkey)] +01)AggregateExec: mode=SinglePartitioned, gby=[p_partkey@2 as p_partkey], aggr=[sum(lineitem.l_extendedprice), avg(lineitem.l_discount), count(DISTINCT partsupp.ps_suppkey)] 02)--CoalesceBatchesExec: target_batch_size=8192 03)----HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@2, ps_partkey@0)], projection=[l_extendedprice@0, l_discount@1, p_partkey@2, ps_suppkey@4] 04)------CoalesceBatchesExec: target_batch_size=8192 diff --git a/datafusion/sqllogictest/test_files/tpch/q1.slt.part b/datafusion/sqllogictest/test_files/tpch/q1.slt.part index f005081b155f..6c3e7dd3618a 100644 --- a/datafusion/sqllogictest/test_files/tpch/q1.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q1.slt.part @@ -41,19 +41,19 @@ explain select ---- logical_plan 01)Sort: lineitem.l_returnflag ASC NULLS LAST, lineitem.l_linestatus ASC NULLS LAST -02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus, sum(lineitem.l_quantity) AS sum_qty, sum(lineitem.l_extendedprice) AS sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, AVG(lineitem.l_quantity) AS avg_qty, AVG(lineitem.l_extendedprice) AS avg_price, AVG(lineitem.l_discount) AS avg_disc, count(*) AS count_order -03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(__common_expr_1) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(__common_expr_1 * (Decimal128(Some(1),20,0) + lineitem.l_tax)) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), count(Int64(1)) AS count(*)]] +02)--Projection: lineitem.l_returnflag, lineitem.l_linestatus, sum(lineitem.l_quantity) AS sum_qty, sum(lineitem.l_extendedprice) AS sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax) AS sum_charge, avg(lineitem.l_quantity) AS avg_qty, avg(lineitem.l_extendedprice) AS avg_price, avg(lineitem.l_discount) AS avg_disc, count(*) AS count_order +03)----Aggregate: groupBy=[[lineitem.l_returnflag, lineitem.l_linestatus]], aggr=[[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(__common_expr_1) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(__common_expr_1 * (Decimal128(Some(1),20,0) + lineitem.l_tax)) AS sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1)) AS count(*)]] 04)------Projection: lineitem.l_extendedprice * (Decimal128(Some(1),20,0) - lineitem.l_discount) AS __common_expr_1, lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, lineitem.l_tax, lineitem.l_returnflag, lineitem.l_linestatus 05)--------Filter: lineitem.l_shipdate <= Date32("1998-09-02") 06)----------TableScan: lineitem projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], partial_filters=[lineitem.l_shipdate <= Date32("1998-09-02")] physical_plan 01)SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST] 02)--SortExec: expr=[l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], preserve_partitioning=[true] -03)----ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_discount)@8 as avg_disc, count(*)@9 as count_order] -04)------AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), count(*)] +03)----ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, sum(lineitem.l_quantity)@2 as sum_qty, sum(lineitem.l_extendedprice)@3 as sum_base_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, avg(lineitem.l_quantity)@6 as avg_qty, avg(lineitem.l_extendedprice)@7 as avg_price, avg(lineitem.l_discount)@8 as avg_disc, count(*)@9 as count_order] +04)------AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)] 05)--------CoalesceBatchesExec: target_batch_size=8192 06)----------RepartitionExec: partitioning=Hash([l_returnflag@0, l_linestatus@1], 4), input_partitions=4 -07)------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), count(*)] +07)------------AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(*)] 08)--------------ProjectionExec: expr=[l_extendedprice@1 * (Some(1),20,0 - l_discount@2) as __common_expr_1, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus] 09)----------------CoalesceBatchesExec: target_batch_size=8192 10)------------------FilterExec: l_shipdate@6 <= 1998-09-02 diff --git a/datafusion/sqllogictest/test_files/tpch/q17.slt.part b/datafusion/sqllogictest/test_files/tpch/q17.slt.part index b1562301d9d9..ecb54e97b910 100644 --- a/datafusion/sqllogictest/test_files/tpch/q17.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q17.slt.part @@ -39,7 +39,7 @@ logical_plan 01)Projection: CAST(sum(lineitem.l_extendedprice) AS Float64) / Float64(7) AS avg_yearly 02)--Aggregate: groupBy=[[]], aggr=[[sum(lineitem.l_extendedprice)]] 03)----Projection: lineitem.l_extendedprice -04)------Inner Join: part.p_partkey = __scalar_sq_1.l_partkey Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < __scalar_sq_1.Float64(0.2) * AVG(lineitem.l_quantity) +04)------Inner Join: part.p_partkey = __scalar_sq_1.l_partkey Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) < __scalar_sq_1.Float64(0.2) * avg(lineitem.l_quantity) 05)--------Projection: lineitem.l_quantity, lineitem.l_extendedprice, part.p_partkey 06)----------Inner Join: lineitem.l_partkey = part.p_partkey 07)------------TableScan: lineitem projection=[l_partkey, l_quantity, l_extendedprice] @@ -47,8 +47,8 @@ logical_plan 09)--------------Filter: part.p_brand = Utf8("Brand#23") AND part.p_container = Utf8("MED BOX") 10)----------------TableScan: part projection=[p_partkey, p_brand, p_container], partial_filters=[part.p_brand = Utf8("Brand#23"), part.p_container = Utf8("MED BOX")] 11)--------SubqueryAlias: __scalar_sq_1 -12)----------Projection: CAST(Float64(0.2) * CAST(AVG(lineitem.l_quantity) AS Float64) AS Decimal128(30, 15)), lineitem.l_partkey -13)------------Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[AVG(lineitem.l_quantity)]] +12)----------Projection: CAST(Float64(0.2) * CAST(avg(lineitem.l_quantity) AS Float64) AS Decimal128(30, 15)), lineitem.l_partkey +13)------------Aggregate: groupBy=[[lineitem.l_partkey]], aggr=[[avg(lineitem.l_quantity)]] 14)--------------TableScan: lineitem projection=[l_partkey, l_quantity] physical_plan 01)ProjectionExec: expr=[CAST(sum(lineitem.l_extendedprice)@0 AS Float64) / 7 as avg_yearly] @@ -56,7 +56,7 @@ physical_plan 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[], aggr=[sum(lineitem.l_extendedprice)] 05)--------CoalesceBatchesExec: target_batch_size=8192 -06)----------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@2, l_partkey@1)], filter=CAST(l_quantity@0 AS Decimal128(30, 15)) < Float64(0.2) * AVG(lineitem.l_quantity)@1, projection=[l_extendedprice@1] +06)----------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(p_partkey@2, l_partkey@1)], filter=CAST(l_quantity@0 AS Decimal128(30, 15)) < Float64(0.2) * avg(lineitem.l_quantity)@1, projection=[l_extendedprice@1] 07)------------CoalesceBatchesExec: target_batch_size=8192 08)--------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(l_partkey@0, p_partkey@0)], projection=[l_quantity@1, l_extendedprice@2, p_partkey@3] 09)----------------CoalesceBatchesExec: target_batch_size=8192 @@ -69,11 +69,11 @@ physical_plan 16)------------------------FilterExec: p_brand@1 = Brand#23 AND p_container@2 = MED BOX 17)--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 18)----------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/part.tbl]]}, projection=[p_partkey, p_brand, p_container], has_header=false -19)------------ProjectionExec: expr=[CAST(0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) AS Decimal128(30, 15)) as Float64(0.2) * AVG(lineitem.l_quantity), l_partkey@0 as l_partkey] -20)--------------AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)] +19)------------ProjectionExec: expr=[CAST(0.2 * CAST(avg(lineitem.l_quantity)@1 AS Float64) AS Decimal128(30, 15)) as Float64(0.2) * avg(lineitem.l_quantity), l_partkey@0 as l_partkey] +20)--------------AggregateExec: mode=FinalPartitioned, gby=[l_partkey@0 as l_partkey], aggr=[avg(lineitem.l_quantity)] 21)----------------CoalesceBatchesExec: target_batch_size=8192 22)------------------RepartitionExec: partitioning=Hash([l_partkey@0], 4), input_partitions=4 -23)--------------------AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)] +23)--------------------AggregateExec: mode=Partial, gby=[l_partkey@0 as l_partkey], aggr=[avg(lineitem.l_quantity)] 24)----------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:0..18561749], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:18561749..37123498], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:37123498..55685247], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/lineitem.tbl:55685247..74246996]]}, projection=[l_partkey, l_quantity], has_header=false diff --git a/datafusion/sqllogictest/test_files/tpch/q22.slt.part b/datafusion/sqllogictest/test_files/tpch/q22.slt.part index 5b39c2725a97..b3bfc329642f 100644 --- a/datafusion/sqllogictest/test_files/tpch/q22.slt.part +++ b/datafusion/sqllogictest/test_files/tpch/q22.slt.part @@ -61,7 +61,7 @@ logical_plan 03)----Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[count(Int64(1)) AS count(*), sum(custsale.c_acctbal)]] 04)------SubqueryAlias: custsale 05)--------Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal -06)----------Inner Join: Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_2.AVG(customer.c_acctbal) +06)----------Inner Join: Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __scalar_sq_2.avg(customer.c_acctbal) 07)------------Projection: customer.c_phone, customer.c_acctbal 08)--------------LeftAnti Join: customer.c_custkey = __correlated_sq_1.o_custkey 09)----------------Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) @@ -69,7 +69,7 @@ logical_plan 11)----------------SubqueryAlias: __correlated_sq_1 12)------------------TableScan: orders projection=[o_custkey] 13)------------SubqueryAlias: __scalar_sq_2 -14)--------------Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]] +14)--------------Aggregate: groupBy=[[]], aggr=[[avg(customer.c_acctbal)]] 15)----------------Projection: customer.c_acctbal 16)------------------Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]) 17)--------------------TableScan: customer projection=[c_phone, c_acctbal], partial_filters=[customer.c_acctbal > Decimal128(Some(0),15,2) AS customer.c_acctbal > Decimal128(Some(0),30,15), substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")]), customer.c_acctbal > Decimal128(Some(0),15,2)] @@ -83,7 +83,7 @@ physical_plan 07)------------AggregateExec: mode=Partial, gby=[cntrycode@0 as cntrycode], aggr=[count(*), sum(custsale.c_acctbal)] 08)--------------ProjectionExec: expr=[substr(c_phone@0, 1, 2) as cntrycode, c_acctbal@1 as c_acctbal] 09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------------NestedLoopJoinExec: join_type=Inner, filter=CAST(c_acctbal@0 AS Decimal128(19, 6)) > AVG(customer.c_acctbal)@1 +10)------------------NestedLoopJoinExec: join_type=Inner, filter=CAST(c_acctbal@0 AS Decimal128(19, 6)) > avg(customer.c_acctbal)@1 11)--------------------CoalescePartitionsExec 12)----------------------CoalesceBatchesExec: target_batch_size=8192 13)------------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2] @@ -96,9 +96,9 @@ physical_plan 20)--------------------------CoalesceBatchesExec: target_batch_size=8192 21)----------------------------RepartitionExec: partitioning=Hash([o_custkey@0], 4), input_partitions=4 22)------------------------------CsvExec: file_groups={4 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:0..4223281], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:4223281..8446562], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:8446562..12669843], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/tpch/data/orders.tbl:12669843..16893122]]}, projection=[o_custkey], has_header=false -23)--------------------AggregateExec: mode=Final, gby=[], aggr=[AVG(customer.c_acctbal)] +23)--------------------AggregateExec: mode=Final, gby=[], aggr=[avg(customer.c_acctbal)] 24)----------------------CoalescePartitionsExec -25)------------------------AggregateExec: mode=Partial, gby=[], aggr=[AVG(customer.c_acctbal)] +25)------------------------AggregateExec: mode=Partial, gby=[], aggr=[avg(customer.c_acctbal)] 26)--------------------------ProjectionExec: expr=[c_acctbal@1 as c_acctbal] 27)----------------------------CoalesceBatchesExec: target_batch_size=8192 28)------------------------------FilterExec: c_acctbal@1 > Some(0),15,2 AND Use substr(c_phone@0, 1, 2) IN (SET) ([Literal { value: Utf8("13") }, Literal { value: Utf8("31") }, Literal { value: Utf8("23") }, Literal { value: Utf8("29") }, Literal { value: Utf8("30") }, Literal { value: Utf8("18") }, Literal { value: Utf8("17") }]) diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index d63917a78e42..e6f3e70c1ebd 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -2727,8 +2727,8 @@ EXPLAIN SELECT MAX(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as max2, COUNT(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as count1, COUNT(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as count2, - AVG(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as avg1, - AVG(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as avg2 + avg(inc_col) OVER(ORDER BY ts ASC RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING) as avg1, + avg(inc_col) OVER(ORDER BY ts DESC RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING) as avg2 FROM annotated_data_finite ORDER BY inc_col ASC LIMIT 5 @@ -2737,18 +2737,18 @@ logical_plan 01)Projection: sum1, sum2, min1, min2, max1, max2, count1, count2, avg1, avg2 02)--Limit: skip=0, fetch=5 03)----Sort: annotated_data_finite.inc_col ASC NULLS LAST, fetch=5 -04)------Projection: sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS sum1, sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS min2, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS max1, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS max2, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS count1, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS count2, AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS avg1, AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS avg2, annotated_data_finite.inc_col -05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, AVG(__common_expr_2 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING]] -06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, AVG(__common_expr_2 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]] +04)------Projection: sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS sum1, sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS min1, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS min2, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS max1, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS max2, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS count1, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS count2, avg(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING AS avg1, avg(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING AS avg2, annotated_data_finite.inc_col +05)--------WindowAggr: windowExpr=[[sum(__common_expr_1 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING, avg(__common_expr_2 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING]] +06)----------WindowAggr: windowExpr=[[sum(__common_expr_1 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING, avg(__common_expr_2 AS annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING]] 07)------------Projection: CAST(annotated_data_finite.inc_col AS Int64) AS __common_expr_1, CAST(annotated_data_finite.inc_col AS Float64) AS __common_expr_2, annotated_data_finite.ts, annotated_data_finite.inc_col 08)--------------TableScan: annotated_data_finite projection=[ts, inc_col] physical_plan 01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, min1@2 as min1, min2@3 as min2, max1@4 as max1, max2@5 as max2, count1@6 as count1, count2@7 as count2, avg1@8 as avg1, avg2@9 as avg2] 02)--GlobalLimitExec: skip=0, fetch=5 03)----SortExec: TopK(fetch=5), expr=[inc_col@10 ASC NULLS LAST], preserve_partitioning=[false] -04)------ProjectionExec: expr=[sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@9 as sum1, sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@4 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@10 as min1, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@5 as min2, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@11 as max1, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@6 as max2, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@12 as count1, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@7 as count2, AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@13 as avg1, AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@8 as avg2, inc_col@3 as inc_col] -05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)), is_causal: false }, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)), is_causal: false }, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)), is_causal: false }, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING: Ok(Field { name: "count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)), is_causal: false }, AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING: Ok(Field { name: "AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)), is_causal: false }], mode=[Sorted] -06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)), is_causal: false }, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)), is_causal: false }, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)), is_causal: false }, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)), is_causal: false }, AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "AVG(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)), is_causal: false }], mode=[Sorted] +04)------ProjectionExec: expr=[sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@9 as sum1, sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@4 as sum2, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@10 as min1, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@5 as min2, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@11 as max1, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@6 as max2, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@12 as count1, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@7 as count2, avg(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING@13 as avg1, avg(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@8 as avg2, inc_col@3 as inc_col] +05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)), is_causal: false }, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)), is_causal: false }, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)), is_causal: false }, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING: Ok(Field { name: "count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)), is_causal: false }, avg(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING: Ok(Field { name: "avg(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 5 FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(5)), is_causal: false }], mode=[Sorted] +06)----------BoundedWindowAggExec: wdw=[sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)), is_causal: false }, MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "MIN(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)), is_causal: false }, MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "MAX(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)), is_causal: false }, count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "count(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)), is_causal: false }, avg(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "avg(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: Following(Int32(3)), is_causal: false }], mode=[Sorted] 07)------------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as __common_expr_1, CAST(inc_col@1 AS Float64) as __common_expr_2, ts@0 as ts, inc_col@1 as inc_col] 08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true @@ -3630,7 +3630,7 @@ set datafusion.execution.target_partitions = 2; # we should still have the orderings [a ASC, b ASC], [c ASC]. query TT EXPLAIN SELECT *, - AVG(d) OVER sliding_window AS avg_d + avg(d) OVER sliding_window AS avg_d FROM multiple_ordered_table_inf WINDOW sliding_window AS ( PARTITION BY d @@ -3640,13 +3640,13 @@ ORDER BY c ---- logical_plan 01)Sort: multiple_ordered_table_inf.c ASC NULLS LAST -02)--Projection: multiple_ordered_table_inf.a0, multiple_ordered_table_inf.a, multiple_ordered_table_inf.b, multiple_ordered_table_inf.c, multiple_ordered_table_inf.d, AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW AS avg_d -03)----WindowAggr: windowExpr=[[AVG(CAST(multiple_ordered_table_inf.d AS Float64)) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW]] +02)--Projection: multiple_ordered_table_inf.a0, multiple_ordered_table_inf.a, multiple_ordered_table_inf.b, multiple_ordered_table_inf.c, multiple_ordered_table_inf.d, avg(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW AS avg_d +03)----WindowAggr: windowExpr=[[avg(CAST(multiple_ordered_table_inf.d AS Float64)) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW]] 04)------TableScan: multiple_ordered_table_inf projection=[a0, a, b, c, d] physical_plan 01)SortPreservingMergeExec: [c@3 ASC NULLS LAST] -02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW@5 as avg_d] -03)----BoundedWindowAggExec: wdw=[AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW: Ok(Field { name: "AVG(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: CurrentRow, is_causal: false }], mode=[Linear] +02)--ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, avg(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW@5 as avg_d] +03)----BoundedWindowAggExec: wdw=[avg(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW: Ok(Field { name: "avg(multiple_ordered_table_inf.d) PARTITION BY [multiple_ordered_table_inf.d] ORDER BY [multiple_ordered_table_inf.a ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: CurrentRow, is_causal: false }], mode=[Linear] 04)------CoalesceBatchesExec: target_batch_size=4096 05)--------RepartitionExec: partitioning=Hash([d@4], 2), input_partitions=2, preserve_order=true, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST,c@3 ASC NULLS LAST 06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 diff --git a/datafusion/substrait/tests/cases/consumer_integration.rs b/datafusion/substrait/tests/cases/consumer_integration.rs index 975cc150d79c..8ea3a69cab61 100644 --- a/datafusion/substrait/tests/cases/consumer_integration.rs +++ b/datafusion/substrait/tests/cases/consumer_integration.rs @@ -46,9 +46,9 @@ mod tests { let plan_str = format!("{:?}", plan); assert_eq!( plan_str, - "Projection: FILENAME_PLACEHOLDER_0.l_returnflag AS L_RETURNFLAG, FILENAME_PLACEHOLDER_0.l_linestatus AS L_LINESTATUS, sum(FILENAME_PLACEHOLDER_0.l_quantity) AS SUM_QTY, sum(FILENAME_PLACEHOLDER_0.l_extendedprice) AS SUM_BASE_PRICE, sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount) AS SUM_DISC_PRICE, sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount * Int32(1) + FILENAME_PLACEHOLDER_0.l_tax) AS SUM_CHARGE, AVG(FILENAME_PLACEHOLDER_0.l_quantity) AS AVG_QTY, AVG(FILENAME_PLACEHOLDER_0.l_extendedprice) AS AVG_PRICE, AVG(FILENAME_PLACEHOLDER_0.l_discount) AS AVG_DISC, count(Int64(1)) AS COUNT_ORDER\ + "Projection: FILENAME_PLACEHOLDER_0.l_returnflag AS L_RETURNFLAG, FILENAME_PLACEHOLDER_0.l_linestatus AS L_LINESTATUS, sum(FILENAME_PLACEHOLDER_0.l_quantity) AS SUM_QTY, sum(FILENAME_PLACEHOLDER_0.l_extendedprice) AS SUM_BASE_PRICE, sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount) AS SUM_DISC_PRICE, sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount * Int32(1) + FILENAME_PLACEHOLDER_0.l_tax) AS SUM_CHARGE, avg(FILENAME_PLACEHOLDER_0.l_quantity) AS AVG_QTY, avg(FILENAME_PLACEHOLDER_0.l_extendedprice) AS AVG_PRICE, avg(FILENAME_PLACEHOLDER_0.l_discount) AS AVG_DISC, count(Int64(1)) AS COUNT_ORDER\ \n Sort: FILENAME_PLACEHOLDER_0.l_returnflag ASC NULLS LAST, FILENAME_PLACEHOLDER_0.l_linestatus ASC NULLS LAST\ - \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.l_returnflag, FILENAME_PLACEHOLDER_0.l_linestatus]], aggr=[[sum(FILENAME_PLACEHOLDER_0.l_quantity), sum(FILENAME_PLACEHOLDER_0.l_extendedprice), sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount), sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount * Int32(1) + FILENAME_PLACEHOLDER_0.l_tax), AVG(FILENAME_PLACEHOLDER_0.l_quantity), AVG(FILENAME_PLACEHOLDER_0.l_extendedprice), AVG(FILENAME_PLACEHOLDER_0.l_discount), count(Int64(1))]]\ + \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.l_returnflag, FILENAME_PLACEHOLDER_0.l_linestatus]], aggr=[[sum(FILENAME_PLACEHOLDER_0.l_quantity), sum(FILENAME_PLACEHOLDER_0.l_extendedprice), sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount), sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount * Int32(1) + FILENAME_PLACEHOLDER_0.l_tax), avg(FILENAME_PLACEHOLDER_0.l_quantity), avg(FILENAME_PLACEHOLDER_0.l_extendedprice), avg(FILENAME_PLACEHOLDER_0.l_discount), count(Int64(1))]]\ \n Projection: FILENAME_PLACEHOLDER_0.l_returnflag, FILENAME_PLACEHOLDER_0.l_linestatus, FILENAME_PLACEHOLDER_0.l_quantity, FILENAME_PLACEHOLDER_0.l_extendedprice, FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount), FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount) * (CAST(Int32(1) AS Decimal128(19, 0)) + FILENAME_PLACEHOLDER_0.l_tax), FILENAME_PLACEHOLDER_0.l_discount\ \n Filter: FILENAME_PLACEHOLDER_0.l_shipdate <= Date32(\"1998-12-01\") - IntervalDayTime(\"IntervalDayTime { days: 120, milliseconds: 0 }\")\ \n TableScan: FILENAME_PLACEHOLDER_0 projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]" diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index d236d34c9df9..922e3e16c86d 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -239,7 +239,7 @@ async fn aggregate_grouping_sets() -> Result<()> { async fn aggregate_grouping_rollup() -> Result<()> { assert_expected_plan( "SELECT a, c, e, avg(b) FROM data GROUP BY ROLLUP (a, c, e)", - "Aggregate: groupBy=[[GROUPING SETS ((data.a, data.c, data.e), (data.a, data.c), (data.a), ())]], aggr=[[AVG(data.b)]]\ + "Aggregate: groupBy=[[GROUPING SETS ((data.a, data.c, data.e), (data.a, data.c), (data.a), ())]], aggr=[[avg(data.b)]]\ \n TableScan: data projection=[a, b, c, e]", true ).await