From 0ad1c4d1adfe63d96b01d42d2f58bfdb40fd4516 Mon Sep 17 00:00:00 2001 From: elijah Date: Tue, 16 May 2023 18:29:23 +0800 Subject: [PATCH 1/5] feat: port tests in avro.rs to sqllogictest --- datafusion/core/tests/sql/avro.rs | 157 ------------------ datafusion/core/tests/sql/mod.rs | 1 - .../tests/sqllogictests/test_files/avro.slt | 76 +++++++++ 3 files changed, 76 insertions(+), 158 deletions(-) delete mode 100644 datafusion/core/tests/sql/avro.rs create mode 100644 datafusion/core/tests/sqllogictests/test_files/avro.slt diff --git a/datafusion/core/tests/sql/avro.rs b/datafusion/core/tests/sql/avro.rs deleted file mode 100644 index 85ed30044c17..000000000000 --- a/datafusion/core/tests/sql/avro.rs +++ /dev/null @@ -1,157 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use super::*; - -async fn register_alltypes_avro(ctx: &SessionContext) { - let testdata = datafusion::test_util::arrow_test_data(); - ctx.register_avro( - "alltypes_plain", - &format!("{testdata}/avro/alltypes_plain.avro"), - AvroReadOptions::default(), - ) - .await - .unwrap(); -} - -#[tokio::test] -async fn avro_query() { - let ctx = SessionContext::new(); - register_alltypes_avro(&ctx).await; - // NOTE that string_col is actually a binary column and does not have the UTF8 logical type - // so we need an explicit cast - let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----+---------------------------+", - "| id | alltypes_plain.string_col |", - "+----+---------------------------+", - "| 4 | 0 |", - "| 5 | 1 |", - "| 6 | 0 |", - "| 7 | 1 |", - "| 2 | 0 |", - "| 3 | 1 |", - "| 0 | 0 |", - "| 1 | 1 |", - "+----+---------------------------+", - ]; - - assert_batches_eq!(expected, &actual); -} - -#[tokio::test] -async fn avro_query_multiple_files() { - let tempdir = tempfile::tempdir().unwrap(); - let table_path = tempdir.path(); - let testdata = datafusion::test_util::arrow_test_data(); - let alltypes_plain_file = format!("{testdata}/avro/alltypes_plain.avro"); - std::fs::copy( - &alltypes_plain_file, - format!("{}/alltypes_plain1.avro", table_path.display()), - ) - .unwrap(); - std::fs::copy( - &alltypes_plain_file, - format!("{}/alltypes_plain2.avro", table_path.display()), - ) - .unwrap(); - - let ctx = SessionContext::new(); - ctx.register_avro( - "alltypes_plain", - table_path.display().to_string().as_str(), - AvroReadOptions::default(), - ) - .await - .unwrap(); - // NOTE that string_col is actually a binary column and does not have the UTF8 logical type - // so we need an explicit cast - let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----+---------------------------+", - "| id | alltypes_plain.string_col |", - "+----+---------------------------+", - "| 4 | 0 |", - "| 5 | 1 |", - "| 6 | 0 |", - "| 7 | 1 |", - "| 2 | 0 |", - "| 3 | 1 |", - "| 0 | 0 |", - "| 1 | 1 |", - "| 4 | 0 |", - "| 5 | 1 |", - "| 6 | 0 |", - "| 7 | 1 |", - "| 2 | 0 |", - "| 3 | 1 |", - "| 0 | 0 |", - "| 1 | 1 |", - "+----+---------------------------+", - ]; - - assert_batches_eq!(expected, &actual); -} - -#[tokio::test] -async fn avro_single_nan_schema() { - let ctx = SessionContext::new(); - let testdata = datafusion::test_util::arrow_test_data(); - ctx.register_avro( - "single_nan", - &format!("{testdata}/avro/single_nan.avro"), - AvroReadOptions::default(), - ) - .await - .unwrap(); - let sql = "SELECT mycol FROM single_nan"; - let dataframe = ctx.sql(sql).await.unwrap(); - let results = dataframe.collect().await.unwrap(); - for batch in results { - assert_eq!(1, batch.num_rows()); - assert_eq!(1, batch.num_columns()); - } -} - -#[tokio::test] -async fn avro_explain() { - let ctx = SessionContext::new(); - register_alltypes_avro(&ctx).await; - - let sql = "EXPLAIN SELECT count(*) from alltypes_plain"; - let actual = execute(&ctx, sql).await; - let actual = normalize_vec_for_explain(actual); - let expected = vec![ - vec![ - "logical_plan", - "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\ - \n TableScan: alltypes_plain projection=[id]", - ], - vec![ - "physical_plan", - "AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\ - \n CoalescePartitionsExec\ - \n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ - \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\ - \n AvroExec: file_groups={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, projection=[id]\ - \n", - ], - ]; - assert_eq!(expected, actual); -} diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index be7b66adb514..83a10cdf82ff 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -80,7 +80,6 @@ macro_rules! test_expression { pub mod aggregates; #[cfg(feature = "avro")] -pub mod avro; pub mod create_drop; pub mod explain_analyze; pub mod expr; diff --git a/datafusion/core/tests/sqllogictests/test_files/avro.slt b/datafusion/core/tests/sqllogictests/test_files/avro.slt new file mode 100644 index 000000000000..42c0692cc0f0 --- /dev/null +++ b/datafusion/core/tests/sqllogictests/test_files/avro.slt @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +statement ok +CREATE EXTERNAL TABLE alltypes_plain ( + id INT NOT NULL, + bool_col BOOLEAN NOT NULL, + tinyint_col TINYINT NOT NULL, + smallint_col SMALLINT NOT NULL, + int_col INT NOT NULL, + bigint_col BIGINT NOT NULL, + float_col FLOAT NOT NULL, + double_col DOUBLE NOT NULL, + date_string_col BYTEA NOT NULL, + string_col VARCHAR NOT NULL, + timestamp_col TIMESTAMP NOT NULL, +) +STORED AS AVRO +WITH HEADER ROW +LOCATION '../../testing/data/avro/alltypes_plain.avro' + +statement ok +CREATE EXTERNAL TABLE single_nan ( + mycol FLOAT +) +STORED AS AVRO +WITH HEADER ROW +LOCATION '../../testing/data/avro/single_nan.avro' + +# test avro_query +query IT +SELECT id, CAST(string_col AS varchar) FROM alltypes_plain +---- +4 0 +5 1 +6 0 +7 1 +2 0 +3 1 +0 0 +1 1 + +# test avro_single_nan_schema +query R +SELECT mycol FROM single_nan +---- +NULL + +# test avro_explain +query TT +EXPLAIN SELECT count(*) from alltypes_plain +---- +logical_plan +Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] +--TableScan: alltypes_plain projection=[id] +physical_plan +AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))] +--CoalescePartitionsExec +----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))] +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[id] From 82de6f8513674888d2d3c232dde1864c57ff7630 Mon Sep 17 00:00:00 2001 From: elijah Date: Thu, 18 May 2023 11:41:20 +0800 Subject: [PATCH 2/5] fix: add test setup for avro_query_multiple_files --- .../core/tests/sqllogictests/src/main.rs | 58 ++++++++++++++++--- .../core/tests/sqllogictests/src/setup.rs | 35 ++++++++++- .../tests/sqllogictests/test_files/avro.slt | 27 ++++++++- 3 files changed, 109 insertions(+), 11 deletions(-) diff --git a/datafusion/core/tests/sqllogictests/src/main.rs b/datafusion/core/tests/sqllogictests/src/main.rs index 841511decf0f..863279e62c2f 100644 --- a/datafusion/core/tests/sqllogictests/src/main.rs +++ b/datafusion/core/tests/sqllogictests/src/main.rs @@ -22,6 +22,7 @@ use std::thread; use log::info; use sqllogictest::strict_column_validator; +use tempfile::TempDir; use datafusion::prelude::{SessionConfig, SessionContext}; @@ -83,7 +84,8 @@ async fn run_test_file( relative_path: PathBuf, ) -> Result<(), Box> { info!("Running with DataFusion runner: {}", path.display()); - let ctx = context_for_test_file(&relative_path).await; + let test_ctx = context_for_test_file(&relative_path).await; + let ctx = test_ctx.session_ctx().clone(); let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path)); runner.with_column_validator(strict_column_validator); runner.run_file_async(path).await?; @@ -110,7 +112,8 @@ async fn run_complete_file( info!("Using complete mode to complete: {}", path.display()); - let ctx = context_for_test_file(&relative_path).await; + let test_ctx = context_for_test_file(&relative_path).await; + let ctx = test_ctx.session_ctx().clone(); let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path)); let col_separator = " "; runner @@ -160,29 +163,70 @@ fn read_dir_recursive>(path: P) -> Box SessionContext { +async fn context_for_test_file(relative_path: &Path) -> TestContext { let config = SessionConfig::new() // hardcode target partitions so plans are deterministic .with_target_partitions(4); - let ctx = SessionContext::with_config(config); + let mut test_ctx = TestContext::new(SessionContext::with_config(config)); match relative_path.file_name().unwrap().to_str().unwrap() { "aggregate.slt" => { info!("Registering aggregate tables"); - setup::register_aggregate_tables(&ctx).await; + setup::register_aggregate_tables(&test_ctx.session_ctx()).await; } "scalar.slt" => { info!("Registering scalar tables"); - setup::register_scalar_tables(&ctx).await; + setup::register_scalar_tables(&test_ctx.session_ctx()).await; + } + "avro.slt" => { + info!("Registering avro tables"); + setup::register_avro_tables(&mut test_ctx).await; } _ => { info!("Using default SessionContext"); } }; - ctx + test_ctx +} + +/// Context for running tests +pub struct TestContext { + /// Context for running queries + ctx: SessionContext, + /// Temporary directory created and cleared at the end of the test + test_dir: Option, +} + +impl TestContext { + fn new(ctx: SessionContext) -> Self { + Self { ctx, test_dir: None } + } + + /// Enables the test directory feature. If not enabled, + /// calling `testdir_path` will result in a panic. + fn enable_testdir(&mut self) { + if self.test_dir.is_none() { + self.test_dir = Some(TempDir::new().expect("failed to create testdir")); + } + } + + /// Returns the path to the test directory. Panics if the test + /// directory feature is not enabled via `enable_testdir`. + fn testdir_path(&self) -> &Path { + self.test_dir + .as_ref() + .expect("testdir not enabled") + .path() + } + + /// Returns a reference to the internal SessionContext + fn session_ctx(&self) -> &SessionContext { + &self.ctx + } } + /// Parsed command line options struct Options { // regex like diff --git a/datafusion/core/tests/sqllogictests/src/setup.rs b/datafusion/core/tests/sqllogictests/src/setup.rs index 9e3f154f5948..7549c2369c2f 100644 --- a/datafusion/core/tests/sqllogictests/src/setup.rs +++ b/datafusion/core/tests/sqllogictests/src/setup.rs @@ -29,8 +29,41 @@ use datafusion::{ test_util, }; use std::sync::Arc; +use datafusion::prelude::AvroReadOptions; -use crate::utils; +use crate::{TestContext, utils}; + +pub async fn register_avro_tables(ctx: &mut TestContext) { + register_avro_test_data(ctx).await; +} + +async fn register_avro_test_data(ctx: &mut TestContext) { + ctx.enable_testdir(); + + let table_path = ctx.testdir_path().join("avro"); + std::fs::create_dir(&table_path).expect("failed to create avro table path"); + + let testdata = datafusion::test_util::arrow_test_data(); + let alltypes_plain_file = format!("{testdata}/avro/alltypes_plain.avro"); + std::fs::copy( + &alltypes_plain_file, + format!("{}/alltypes_plain1.avro", table_path.display()), + ) + .unwrap(); + std::fs::copy( + &alltypes_plain_file, + format!("{}/alltypes_plain2.avro", table_path.display()), + ) + .unwrap(); + + ctx.session_ctx().register_avro( + "alltypes_plain_multi_files", + table_path.display().to_string().as_str(), + AvroReadOptions::default(), + ) + .await + .unwrap(); +} pub async fn register_aggregate_tables(ctx: &SessionContext) { register_aggregate_test_100(ctx).await; diff --git a/datafusion/core/tests/sqllogictests/test_files/avro.slt b/datafusion/core/tests/sqllogictests/test_files/avro.slt index 42c0692cc0f0..5a01ae72cb30 100644 --- a/datafusion/core/tests/sqllogictests/test_files/avro.slt +++ b/datafusion/core/tests/sqllogictests/test_files/avro.slt @@ -42,7 +42,7 @@ STORED AS AVRO WITH HEADER ROW LOCATION '../../testing/data/avro/single_nan.avro' -# test avro_query +# test avro query query IT SELECT id, CAST(string_col AS varchar) FROM alltypes_plain ---- @@ -55,13 +55,34 @@ SELECT id, CAST(string_col AS varchar) FROM alltypes_plain 0 0 1 1 -# test avro_single_nan_schema +# test avro single nan schema query R SELECT mycol FROM single_nan ---- NULL -# test avro_explain +# test avro query multi files +query IT +SELECT id, CAST(string_col AS varchar) FROM alltypes_plain_multi_files +---- +4 0 +5 1 +6 0 +7 1 +2 0 +3 1 +0 0 +1 1 +4 0 +5 1 +6 0 +7 1 +2 0 +3 1 +0 0 +1 1 + +# test avro explain query TT EXPLAIN SELECT count(*) from alltypes_plain ---- From 27addcbd303f2056383658c60eccb0f5eb9f0f06 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 18 May 2023 14:48:56 -0400 Subject: [PATCH 3/5] run cargo fmt --- .../core/tests/sqllogictests/src/main.rs | 11 +++++------ .../core/tests/sqllogictests/src/setup.rs | 19 ++++++++++--------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/datafusion/core/tests/sqllogictests/src/main.rs b/datafusion/core/tests/sqllogictests/src/main.rs index 863279e62c2f..3f6d26d732e9 100644 --- a/datafusion/core/tests/sqllogictests/src/main.rs +++ b/datafusion/core/tests/sqllogictests/src/main.rs @@ -200,7 +200,10 @@ pub struct TestContext { impl TestContext { fn new(ctx: SessionContext) -> Self { - Self { ctx, test_dir: None } + Self { + ctx, + test_dir: None, + } } /// Enables the test directory feature. If not enabled, @@ -214,10 +217,7 @@ impl TestContext { /// Returns the path to the test directory. Panics if the test /// directory feature is not enabled via `enable_testdir`. fn testdir_path(&self) -> &Path { - self.test_dir - .as_ref() - .expect("testdir not enabled") - .path() + self.test_dir.as_ref().expect("testdir not enabled").path() } /// Returns a reference to the internal SessionContext @@ -226,7 +226,6 @@ impl TestContext { } } - /// Parsed command line options struct Options { // regex like diff --git a/datafusion/core/tests/sqllogictests/src/setup.rs b/datafusion/core/tests/sqllogictests/src/setup.rs index 7549c2369c2f..91ce03c5a0b7 100644 --- a/datafusion/core/tests/sqllogictests/src/setup.rs +++ b/datafusion/core/tests/sqllogictests/src/setup.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use datafusion::prelude::AvroReadOptions; use datafusion::{ arrow::{ array::{ @@ -29,9 +30,8 @@ use datafusion::{ test_util, }; use std::sync::Arc; -use datafusion::prelude::AvroReadOptions; -use crate::{TestContext, utils}; +use crate::{utils, TestContext}; pub async fn register_avro_tables(ctx: &mut TestContext) { register_avro_test_data(ctx).await; @@ -49,18 +49,19 @@ async fn register_avro_test_data(ctx: &mut TestContext) { &alltypes_plain_file, format!("{}/alltypes_plain1.avro", table_path.display()), ) - .unwrap(); + .unwrap(); std::fs::copy( &alltypes_plain_file, format!("{}/alltypes_plain2.avro", table_path.display()), ) - .unwrap(); + .unwrap(); - ctx.session_ctx().register_avro( - "alltypes_plain_multi_files", - table_path.display().to_string().as_str(), - AvroReadOptions::default(), - ) + ctx.session_ctx() + .register_avro( + "alltypes_plain_multi_files", + table_path.display().to_string().as_str(), + AvroReadOptions::default(), + ) .await .unwrap(); } From 0a33708aef897c9a2770b798052bc1fed653a943 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 18 May 2023 14:50:27 -0400 Subject: [PATCH 4/5] Run hash collisions test with avro support too --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 654c0d564913..92c397461af7 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -461,7 +461,7 @@ jobs: - name: Run tests run: | cd datafusion - cargo test --lib --tests --features=force_hash_collisions + cargo test --lib --tests --features=force_hash_collisions,avro cargo-toml-formatting-checks: name: check Cargo.toml formatting From 9e10ded231ffb352d3514b3595778cf9b3578c87 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 18 May 2023 15:35:28 -0400 Subject: [PATCH 5/5] fix clippy --- datafusion/core/tests/sqllogictests/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/sqllogictests/src/main.rs b/datafusion/core/tests/sqllogictests/src/main.rs index 3f6d26d732e9..df43544867aa 100644 --- a/datafusion/core/tests/sqllogictests/src/main.rs +++ b/datafusion/core/tests/sqllogictests/src/main.rs @@ -173,11 +173,11 @@ async fn context_for_test_file(relative_path: &Path) -> TestContext { match relative_path.file_name().unwrap().to_str().unwrap() { "aggregate.slt" => { info!("Registering aggregate tables"); - setup::register_aggregate_tables(&test_ctx.session_ctx()).await; + setup::register_aggregate_tables(test_ctx.session_ctx()).await; } "scalar.slt" => { info!("Registering scalar tables"); - setup::register_scalar_tables(&test_ctx.session_ctx()).await; + setup::register_scalar_tables(test_ctx.session_ctx()).await; } "avro.slt" => { info!("Registering avro tables");