From 70bdbaf97f5d5b5425575120d0cda5ac7094faed Mon Sep 17 00:00:00 2001 From: Edmondo Porcu Date: Wed, 22 Nov 2023 15:52:22 -0800 Subject: [PATCH] First iteration of aggregates.rs tests to sqllogictests --- datafusion/core/tests/sql/aggregates.rs | 499 ------------------ .../sqllogictest/test_files/aggregate.slt | 165 +++++- 2 files changed, 146 insertions(+), 518 deletions(-) diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index 03864e9efef8..f1168d61a325 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -68,324 +68,6 @@ async fn csv_query_array_agg_distinct() -> Result<()> { Ok(()) } -#[tokio::test] -async fn aggregate() -> Result<()> { - let results = execute_with_partition("SELECT SUM(c1), SUM(c2) FROM test", 4).await?; - assert_eq!(results.len(), 1); - - let expected = [ - "+--------------+--------------+", - "| SUM(test.c1) | SUM(test.c2) |", - "+--------------+--------------+", - "| 60 | 220 |", - "+--------------+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn aggregate_empty() -> Result<()> { - // The predicate on this query purposely generates no results - let results = - execute_with_partition("SELECT SUM(c1), SUM(c2) FROM test where c1 > 100000", 4) - .await - .unwrap(); - - assert_eq!(results.len(), 1); - - let expected = [ - "+--------------+--------------+", - "| SUM(test.c1) | SUM(test.c2) |", - "+--------------+--------------+", - "| | |", - "+--------------+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn aggregate_avg() -> Result<()> { - let results = execute_with_partition("SELECT AVG(c1), AVG(c2) FROM test", 4).await?; - assert_eq!(results.len(), 1); - - let expected = [ - "+--------------+--------------+", - "| AVG(test.c1) | AVG(test.c2) |", - "+--------------+--------------+", - "| 1.5 | 5.5 |", - "+--------------+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn aggregate_max() -> Result<()> { - let results = execute_with_partition("SELECT MAX(c1), MAX(c2) FROM test", 4).await?; - assert_eq!(results.len(), 1); - - let expected = [ - "+--------------+--------------+", - "| MAX(test.c1) | MAX(test.c2) |", - "+--------------+--------------+", - "| 3 | 10 |", - "+--------------+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn aggregate_min() -> Result<()> { - let results = execute_with_partition("SELECT MIN(c1), MIN(c2) FROM test", 4).await?; - assert_eq!(results.len(), 1); - - let expected = [ - "+--------------+--------------+", - "| MIN(test.c1) | MIN(test.c2) |", - "+--------------+--------------+", - "| 0 | 1 |", - "+--------------+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn aggregate_grouped() -> Result<()> { - let results = - execute_with_partition("SELECT c1, SUM(c2) FROM test GROUP BY c1", 4).await?; - - let expected = [ - "+----+--------------+", - "| c1 | SUM(test.c2) |", - "+----+--------------+", - "| 0 | 55 |", - "| 1 | 55 |", - "| 2 | 55 |", - "| 3 | 55 |", - "+----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn aggregate_grouped_avg() -> Result<()> { - let results = - execute_with_partition("SELECT c1, AVG(c2) FROM test GROUP BY c1", 4).await?; - - let expected = [ - "+----+--------------+", - "| c1 | AVG(test.c2) |", - "+----+--------------+", - "| 0 | 5.5 |", - "| 1 | 5.5 |", - "| 2 | 5.5 |", - "| 3 | 5.5 |", - "+----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn aggregate_grouped_empty() -> Result<()> { - let results = execute_with_partition( - "SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", - 4, - ) - .await?; - - let expected = [ - "+----+--------------+", - "| c1 | AVG(test.c2) |", - "+----+--------------+", - "+----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn aggregate_grouped_max() -> Result<()> { - let results = - execute_with_partition("SELECT c1, MAX(c2) FROM test GROUP BY c1", 4).await?; - - let expected = [ - "+----+--------------+", - "| c1 | MAX(test.c2) |", - "+----+--------------+", - "| 0 | 10 |", - "| 1 | 10 |", - "| 2 | 10 |", - "| 3 | 10 |", - "+----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn aggregate_grouped_min() -> Result<()> { - let results = - execute_with_partition("SELECT c1, MIN(c2) FROM test GROUP BY c1", 4).await?; - - let expected = [ - "+----+--------------+", - "| c1 | MIN(test.c2) |", - "+----+--------------+", - "| 0 | 1 |", - "| 1 | 1 |", - "| 2 | 1 |", - "| 3 | 1 |", - "+----+--------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn aggregate_min_max_w_custom_window_frames() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = - "SELECT - MIN(c12) OVER (ORDER BY C12 RANGE BETWEEN 0.3 PRECEDING AND 0.2 FOLLOWING) as min1, - MAX(c12) OVER (ORDER BY C11 RANGE BETWEEN 0.1 PRECEDING AND 0.2 FOLLOWING) as max1 - FROM aggregate_test_100 - ORDER BY C9 - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = [ - "+---------------------+--------------------+", - "| min1 | max1 |", - "+---------------------+--------------------+", - "| 0.01479305307777301 | 0.9965400387585364 |", - "| 0.01479305307777301 | 0.9800193410444061 |", - "| 0.01479305307777301 | 0.9706712283358269 |", - "| 0.2667177795079635 | 0.9965400387585364 |", - "| 0.3600766362333053 | 0.9706712283358269 |", - "+---------------------+--------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn aggregate_min_max_w_custom_window_frames_unbounded_start() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = - "SELECT - MIN(c12) OVER (ORDER BY C12 RANGE BETWEEN UNBOUNDED PRECEDING AND 0.2 FOLLOWING) as min1, - MAX(c12) OVER (ORDER BY C11 RANGE BETWEEN UNBOUNDED PRECEDING AND 0.2 FOLLOWING) as max1 - FROM aggregate_test_100 - ORDER BY C9 - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = [ - "+---------------------+--------------------+", - "| min1 | max1 |", - "+---------------------+--------------------+", - "| 0.01479305307777301 | 0.9965400387585364 |", - "| 0.01479305307777301 | 0.9800193410444061 |", - "| 0.01479305307777301 | 0.9800193410444061 |", - "| 0.01479305307777301 | 0.9965400387585364 |", - "| 0.01479305307777301 | 0.9800193410444061 |", - "+---------------------+--------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn aggregate_avg_add() -> Result<()> { - let results = execute_with_partition( - "SELECT AVG(c1), AVG(c1) + 1, AVG(c1) + 2, 1 + AVG(c1) FROM test", - 4, - ) - .await?; - assert_eq!(results.len(), 1); - - let expected = ["+--------------+-------------------------+-------------------------+-------------------------+", - "| AVG(test.c1) | AVG(test.c1) + Int64(1) | AVG(test.c1) + Int64(2) | Int64(1) + AVG(test.c1) |", - "+--------------+-------------------------+-------------------------+-------------------------+", - "| 1.5 | 2.5 | 3.5 | 2.5 |", - "+--------------+-------------------------+-------------------------+-------------------------+"]; - assert_batches_sorted_eq!(expected, &results); - - Ok(()) -} - -#[tokio::test] -async fn case_sensitive_identifiers_aggregates() { - let ctx = SessionContext::new(); - ctx.register_table("t", table_with_sequence(1, 1).unwrap()) - .unwrap(); - - let expected = [ - "+----------+", - "| MAX(t.i) |", - "+----------+", - "| 1 |", - "+----------+", - ]; - - let results = plan_and_collect(&ctx, "SELECT max(i) FROM t") - .await - .unwrap(); - - assert_batches_sorted_eq!(expected, &results); - - let results = plan_and_collect(&ctx, "SELECT MAX(i) FROM t") - .await - .unwrap(); - assert_batches_sorted_eq!(expected, &results); - - // Using double quotes allows specifying the function name with capitalization - let err = plan_and_collect(&ctx, "SELECT \"MAX\"(i) FROM t") - .await - .unwrap_err(); - assert!(err - .to_string() - .contains("Error during planning: Invalid function 'MAX'")); - - let results = plan_and_collect(&ctx, "SELECT \"max\"(i) FROM t") - .await - .unwrap(); - assert_batches_sorted_eq!(expected, &results); -} - -#[tokio::test] -async fn count_basic() -> Result<()> { - let results = - execute_with_partition("SELECT COUNT(c1), COUNT(c2) FROM test", 1).await?; - assert_eq!(results.len(), 1); - - let expected = [ - "+----------------+----------------+", - "| COUNT(test.c1) | COUNT(test.c2) |", - "+----------------+----------------+", - "| 10 | 10 |", - "+----------------+----------------+", - ]; - assert_batches_sorted_eq!(expected, &results); - Ok(()) -} - #[tokio::test] async fn count_partitioned() -> Result<()> { let results = @@ -495,162 +177,6 @@ async fn count_aggregated_cube() -> Result<()> { Ok(()) } -#[tokio::test] -async fn count_multi_expr() -> Result<()> { - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, true), - Field::new("c2", DataType::Int32, true), - ])); - - let data = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![ - Some(0), - None, - Some(1), - Some(2), - None, - ])), - Arc::new(Int32Array::from(vec![ - Some(1), - Some(1), - Some(0), - None, - None, - ])), - ], - )?; - - let ctx = SessionContext::new(); - ctx.register_batch("test", data)?; - let sql = "SELECT count(c1, c2) FROM test"; - let actual = execute_to_batches(&ctx, sql).await; - - let expected = [ - "+------------------------+", - "| COUNT(test.c1,test.c2) |", - "+------------------------+", - "| 2 |", - "+------------------------+", - ]; - assert_batches_sorted_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn count_multi_expr_group_by() -> Result<()> { - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Int32, true), - Field::new("c2", DataType::Int32, true), - Field::new("c3", DataType::Int32, true), - ])); - - let data = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![ - Some(0), - None, - Some(1), - Some(2), - None, - ])), - Arc::new(Int32Array::from(vec![ - Some(1), - Some(1), - Some(0), - None, - None, - ])), - Arc::new(Int32Array::from(vec![ - Some(10), - Some(10), - Some(10), - Some(10), - Some(10), - ])), - ], - )?; - - let ctx = SessionContext::new(); - ctx.register_batch("test", data)?; - let sql = "SELECT c3, count(c1, c2) FROM test group by c3"; - let actual = execute_to_batches(&ctx, sql).await; - - let expected = [ - "+----+------------------------+", - "| c3 | COUNT(test.c1,test.c2) |", - "+----+------------------------+", - "| 10 | 2 |", - "+----+------------------------+", - ]; - assert_batches_sorted_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn simple_avg() -> Result<()> { - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - - let batch1 = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], - )?; - let batch2 = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(Int32Array::from(vec![4, 5]))], - )?; - - let ctx = SessionContext::new(); - - let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; - ctx.register_table("t", Arc::new(provider))?; - - let result = plan_and_collect(&ctx, "SELECT AVG(a) FROM t").await?; - - let batch = &result[0]; - assert_eq!(1, batch.num_columns()); - assert_eq!(1, batch.num_rows()); - - let values = as_float64_array(batch.column(0)).expect("failed to cast version"); - assert_eq!(values.len(), 1); - // avg(1,2,3,4,5) = 3.0 - assert_eq!(values.value(0), 3.0_f64); - Ok(()) -} - -#[tokio::test] -async fn simple_mean() -> Result<()> { - let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); - - let batch1 = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], - )?; - let batch2 = RecordBatch::try_new( - Arc::new(schema.clone()), - vec![Arc::new(Int32Array::from(vec![4, 5]))], - )?; - - let ctx = SessionContext::new(); - - let provider = MemTable::try_new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?; - ctx.register_table("t", Arc::new(provider))?; - - let result = plan_and_collect(&ctx, "SELECT MEAN(a) FROM t").await?; - - let batch = &result[0]; - assert_eq!(1, batch.num_columns()); - assert_eq!(1, batch.num_rows()); - - let values = as_float64_array(batch.column(0)).expect("failed to cast version"); - assert_eq!(values.len(), 1); - // mean(1,2,3,4,5) = 3.0 - assert_eq!(values.value(0), 3.0_f64); - Ok(()) -} - async fn run_count_distinct_integers_aggregated_scenario( partitions: Vec>, ) -> Result> { @@ -771,31 +297,6 @@ async fn count_distinct_integers_aggregated_multiple_partitions() -> Result<()> Ok(()) } -#[tokio::test] -async fn aggregate_with_alias() -> Result<()> { - let ctx = SessionContext::new(); - let state = ctx.state(); - - let schema = Arc::new(Schema::new(vec![ - Field::new("c1", DataType::Utf8, false), - Field::new("c2", DataType::UInt32, false), - ])); - - let plan = scan_empty(None, schema.as_ref(), None)? - .aggregate(vec![col("c1")], vec![sum(col("c2"))])? - .project(vec![col("c1"), sum(col("c2")).alias("total_salary")])? - .build()?; - - let plan = state.optimize(&plan)?; - let physical_plan = state.create_physical_plan(&Arc::new(plan)).await?; - assert_eq!("c1", physical_plan.schema().field(0).name().as_str()); - assert_eq!( - "total_salary", - physical_plan.schema().field(1).name().as_str() - ); - Ok(()) -} - #[tokio::test] async fn test_accumulator_row_accumulator() -> Result<()> { let config = SessionConfig::new(); diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index a1bb93ed53c4..67c370e9dc99 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1327,36 +1327,128 @@ select avg(c1), arrow_typeof(avg(c1)) from d_table ---- 5 Decimal128(14, 7) -# FIX: different test table + # aggregate -# query I -# SELECT SUM(c1), SUM(c2) FROM test -# ---- -# 60 220 +query II +SELECT SUM(c1), SUM(c2) FROM test +---- +7 6 + +# aggregate_empty + +query II +SELECT SUM(c1), SUM(c2) FROM test where c1 > 100000 +---- +NULL NULL + +# aggregate_avg +query RR +SELECT AVG(c1), AVG(c2) FROM test +---- +1.75 1.5 + +# aggregate_max +query II +SELECT MAX(c1), MAX(c2) FROM test +---- +3 2 + +# aggregate_min +query II +SELECT MIN(c1), MIN(c2) FROM test +---- +0 1 -# TODO: aggregate_empty +# aggregate_grouped +query II +SELECT c1, SUM(c2) FROM test GROUP BY c1 order by c1 +---- +0 NULL +1 1 +3 4 +NULL 1 -# TODO: aggregate_avg +# aggregate_grouped_avg +query IR +SELECT c1, AVG(c2) FROM test GROUP BY c1 order by c1 +---- +0 NULL +1 1 +3 2 +NULL 1 -# TODO: aggregate_max +# aggregate_grouped_empty +query IR +SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1 +---- -# TODO: aggregate_min +# aggregate_grouped_max +query II +SELECT c1, MAX(c2) FROM test GROUP BY c1 order by c1 +---- +0 NULL +1 1 +3 2 +NULL 1 -# TODO: aggregate_grouped +# aggregate_grouped_min +query II +SELECT c1, MIN(c2) FROM test GROUP BY c1 order by c1 +---- +0 NULL +1 1 +3 2 +NULL 1 -# TODO: aggregate_grouped_avg +# aggregate_min_max_w_custom_window_frames +query RR +SELECT +MIN(c12) OVER (ORDER BY C12 RANGE BETWEEN 0.3 PRECEDING AND 0.2 FOLLOWING) as min1, +MAX(c12) OVER (ORDER BY C11 RANGE BETWEEN 0.1 PRECEDING AND 0.2 FOLLOWING) as max1 +FROM aggregate_test_100 +ORDER BY C9 +LIMIT 5 +---- +0.014793053078 0.996540038759 +0.014793053078 0.980019341044 +0.014793053078 0.970671228336 +0.266717779508 0.996540038759 +0.360076636233 0.970671228336 -# TODO: aggregate_grouped_empty +# aggregate_min_max_with_custom_window_frames_unbounded_start +query RR +SELECT +MIN(c12) OVER (ORDER BY C12 RANGE BETWEEN UNBOUNDED PRECEDING AND 0.2 FOLLOWING) as min1, +MAX(c12) OVER (ORDER BY C11 RANGE BETWEEN UNBOUNDED PRECEDING AND 0.2 FOLLOWING) as max1 +FROM aggregate_test_100 +ORDER BY C9 +LIMIT 5 +---- +0.014793053078 0.996540038759 +0.014793053078 0.980019341044 +0.014793053078 0.980019341044 +0.014793053078 0.996540038759 +0.014793053078 0.980019341044 -# TODO: aggregate_grouped_max +# aggregate_avg_add +query RRRR +SELECT AVG(c1), AVG(c1) + 1, AVG(c1) + 2, 1 + AVG(c1) FROM test +---- +1.75 2.75 3.75 2.75 -# TODO: aggregate_grouped_min +# case_sensitive_identifiers_aggregates +query I +SELECT max(c1) FROM test; +---- +3 -# TODO: aggregate_avg_add -# TODO: case_sensitive_identifiers_aggregates -# TODO: count_basic +# count_basic +query II +SELECT COUNT(c1), COUNT(c2) FROM test +---- +4 4 # TODO: count_partitioned @@ -1364,9 +1456,44 @@ select avg(c1), arrow_typeof(avg(c1)) from d_table # TODO: count_aggregated_cube -# TODO: simple_avg +# count_multi_expr +query I +SELECT count(c1, c2) FROM test +---- +3 + +# count_multi_expr_group_by +query I +SELECT count(c1, c2) FROM test group by c1 order by c1 +---- +0 +1 +2 +0 + +# aggreggte_with_alias +query II +select c1, sum(c2) as `Total Salary` from test group by c1 order by c1 +---- +0 NULL +1 1 +3 4 +NULL 1 + +# simple_avg + +query R +select avg(c1) from test +---- +1.75 + +# simple_mean +query R +select mean(c1) from test +---- +1.75 + -# TODO: simple_mean # query_sum_distinct - 2 different aggregate functions: avg and sum(distinct) query RI