diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 654c0d564913..92c397461af7 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -461,7 +461,7 @@ jobs: - name: Run tests run: | cd datafusion - cargo test --lib --tests --features=force_hash_collisions + cargo test --lib --tests --features=force_hash_collisions,avro cargo-toml-formatting-checks: name: check Cargo.toml formatting diff --git a/datafusion/core/tests/sql/avro.rs b/datafusion/core/tests/sql/avro.rs deleted file mode 100644 index 85ed30044c17..000000000000 --- a/datafusion/core/tests/sql/avro.rs +++ /dev/null @@ -1,157 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use super::*; - -async fn register_alltypes_avro(ctx: &SessionContext) { - let testdata = datafusion::test_util::arrow_test_data(); - ctx.register_avro( - "alltypes_plain", - &format!("{testdata}/avro/alltypes_plain.avro"), - AvroReadOptions::default(), - ) - .await - .unwrap(); -} - -#[tokio::test] -async fn avro_query() { - let ctx = SessionContext::new(); - register_alltypes_avro(&ctx).await; - // NOTE that string_col is actually a binary column and does not have the UTF8 logical type - // so we need an explicit cast - let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----+---------------------------+", - "| id | alltypes_plain.string_col |", - "+----+---------------------------+", - "| 4 | 0 |", - "| 5 | 1 |", - "| 6 | 0 |", - "| 7 | 1 |", - "| 2 | 0 |", - "| 3 | 1 |", - "| 0 | 0 |", - "| 1 | 1 |", - "+----+---------------------------+", - ]; - - assert_batches_eq!(expected, &actual); -} - -#[tokio::test] -async fn avro_query_multiple_files() { - let tempdir = tempfile::tempdir().unwrap(); - let table_path = tempdir.path(); - let testdata = datafusion::test_util::arrow_test_data(); - let alltypes_plain_file = format!("{testdata}/avro/alltypes_plain.avro"); - std::fs::copy( - &alltypes_plain_file, - format!("{}/alltypes_plain1.avro", table_path.display()), - ) - .unwrap(); - std::fs::copy( - &alltypes_plain_file, - format!("{}/alltypes_plain2.avro", table_path.display()), - ) - .unwrap(); - - let ctx = SessionContext::new(); - ctx.register_avro( - "alltypes_plain", - table_path.display().to_string().as_str(), - AvroReadOptions::default(), - ) - .await - .unwrap(); - // NOTE that string_col is actually a binary column and does not have the UTF8 logical type - // so we need an explicit cast - let sql = "SELECT id, CAST(string_col AS varchar) FROM alltypes_plain"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----+---------------------------+", - "| id | alltypes_plain.string_col |", - "+----+---------------------------+", - "| 4 | 0 |", - "| 5 | 1 |", - "| 6 | 0 |", - "| 7 | 1 |", - "| 2 | 0 |", - "| 3 | 1 |", - "| 0 | 0 |", - "| 1 | 1 |", - "| 4 | 0 |", - "| 5 | 1 |", - "| 6 | 0 |", - "| 7 | 1 |", - "| 2 | 0 |", - "| 3 | 1 |", - "| 0 | 0 |", - "| 1 | 1 |", - "+----+---------------------------+", - ]; - - assert_batches_eq!(expected, &actual); -} - -#[tokio::test] -async fn avro_single_nan_schema() { - let ctx = SessionContext::new(); - let testdata = datafusion::test_util::arrow_test_data(); - ctx.register_avro( - "single_nan", - &format!("{testdata}/avro/single_nan.avro"), - AvroReadOptions::default(), - ) - .await - .unwrap(); - let sql = "SELECT mycol FROM single_nan"; - let dataframe = ctx.sql(sql).await.unwrap(); - let results = dataframe.collect().await.unwrap(); - for batch in results { - assert_eq!(1, batch.num_rows()); - assert_eq!(1, batch.num_columns()); - } -} - -#[tokio::test] -async fn avro_explain() { - let ctx = SessionContext::new(); - register_alltypes_avro(&ctx).await; - - let sql = "EXPLAIN SELECT count(*) from alltypes_plain"; - let actual = execute(&ctx, sql).await; - let actual = normalize_vec_for_explain(actual); - let expected = vec![ - vec![ - "logical_plan", - "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\ - \n TableScan: alltypes_plain projection=[id]", - ], - vec![ - "physical_plan", - "AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\ - \n CoalescePartitionsExec\ - \n AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]\ - \n RepartitionExec: partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\ - \n AvroExec: file_groups={1 group: [[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, projection=[id]\ - \n", - ], - ]; - assert_eq!(expected, actual); -} diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 3c5845fe228c..af79a10104d6 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -81,7 +81,6 @@ macro_rules! test_expression { pub mod aggregates; pub mod arrow_files; #[cfg(feature = "avro")] -pub mod avro; pub mod create_drop; pub mod explain_analyze; pub mod expr; diff --git a/datafusion/core/tests/sqllogictests/src/main.rs b/datafusion/core/tests/sqllogictests/src/main.rs index 841511decf0f..df43544867aa 100644 --- a/datafusion/core/tests/sqllogictests/src/main.rs +++ b/datafusion/core/tests/sqllogictests/src/main.rs @@ -22,6 +22,7 @@ use std::thread; use log::info; use sqllogictest::strict_column_validator; +use tempfile::TempDir; use datafusion::prelude::{SessionConfig, SessionContext}; @@ -83,7 +84,8 @@ async fn run_test_file( relative_path: PathBuf, ) -> Result<(), Box> { info!("Running with DataFusion runner: {}", path.display()); - let ctx = context_for_test_file(&relative_path).await; + let test_ctx = context_for_test_file(&relative_path).await; + let ctx = test_ctx.session_ctx().clone(); let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path)); runner.with_column_validator(strict_column_validator); runner.run_file_async(path).await?; @@ -110,7 +112,8 @@ async fn run_complete_file( info!("Using complete mode to complete: {}", path.display()); - let ctx = context_for_test_file(&relative_path).await; + let test_ctx = context_for_test_file(&relative_path).await; + let ctx = test_ctx.session_ctx().clone(); let mut runner = sqllogictest::Runner::new(DataFusion::new(ctx, relative_path)); let col_separator = " "; runner @@ -160,27 +163,67 @@ fn read_dir_recursive>(path: P) -> Box SessionContext { +async fn context_for_test_file(relative_path: &Path) -> TestContext { let config = SessionConfig::new() // hardcode target partitions so plans are deterministic .with_target_partitions(4); - let ctx = SessionContext::with_config(config); + let mut test_ctx = TestContext::new(SessionContext::with_config(config)); match relative_path.file_name().unwrap().to_str().unwrap() { "aggregate.slt" => { info!("Registering aggregate tables"); - setup::register_aggregate_tables(&ctx).await; + setup::register_aggregate_tables(test_ctx.session_ctx()).await; } "scalar.slt" => { info!("Registering scalar tables"); - setup::register_scalar_tables(&ctx).await; + setup::register_scalar_tables(test_ctx.session_ctx()).await; + } + "avro.slt" => { + info!("Registering avro tables"); + setup::register_avro_tables(&mut test_ctx).await; } _ => { info!("Using default SessionContext"); } }; - ctx + test_ctx +} + +/// Context for running tests +pub struct TestContext { + /// Context for running queries + ctx: SessionContext, + /// Temporary directory created and cleared at the end of the test + test_dir: Option, +} + +impl TestContext { + fn new(ctx: SessionContext) -> Self { + Self { + ctx, + test_dir: None, + } + } + + /// Enables the test directory feature. If not enabled, + /// calling `testdir_path` will result in a panic. + fn enable_testdir(&mut self) { + if self.test_dir.is_none() { + self.test_dir = Some(TempDir::new().expect("failed to create testdir")); + } + } + + /// Returns the path to the test directory. Panics if the test + /// directory feature is not enabled via `enable_testdir`. + fn testdir_path(&self) -> &Path { + self.test_dir.as_ref().expect("testdir not enabled").path() + } + + /// Returns a reference to the internal SessionContext + fn session_ctx(&self) -> &SessionContext { + &self.ctx + } } /// Parsed command line options diff --git a/datafusion/core/tests/sqllogictests/src/setup.rs b/datafusion/core/tests/sqllogictests/src/setup.rs index 9e3f154f5948..91ce03c5a0b7 100644 --- a/datafusion/core/tests/sqllogictests/src/setup.rs +++ b/datafusion/core/tests/sqllogictests/src/setup.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use datafusion::prelude::AvroReadOptions; use datafusion::{ arrow::{ array::{ @@ -30,7 +31,40 @@ use datafusion::{ }; use std::sync::Arc; -use crate::utils; +use crate::{utils, TestContext}; + +pub async fn register_avro_tables(ctx: &mut TestContext) { + register_avro_test_data(ctx).await; +} + +async fn register_avro_test_data(ctx: &mut TestContext) { + ctx.enable_testdir(); + + let table_path = ctx.testdir_path().join("avro"); + std::fs::create_dir(&table_path).expect("failed to create avro table path"); + + let testdata = datafusion::test_util::arrow_test_data(); + let alltypes_plain_file = format!("{testdata}/avro/alltypes_plain.avro"); + std::fs::copy( + &alltypes_plain_file, + format!("{}/alltypes_plain1.avro", table_path.display()), + ) + .unwrap(); + std::fs::copy( + &alltypes_plain_file, + format!("{}/alltypes_plain2.avro", table_path.display()), + ) + .unwrap(); + + ctx.session_ctx() + .register_avro( + "alltypes_plain_multi_files", + table_path.display().to_string().as_str(), + AvroReadOptions::default(), + ) + .await + .unwrap(); +} pub async fn register_aggregate_tables(ctx: &SessionContext) { register_aggregate_test_100(ctx).await; diff --git a/datafusion/core/tests/sqllogictests/test_files/avro.slt b/datafusion/core/tests/sqllogictests/test_files/avro.slt new file mode 100644 index 000000000000..5a01ae72cb30 --- /dev/null +++ b/datafusion/core/tests/sqllogictests/test_files/avro.slt @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +statement ok +CREATE EXTERNAL TABLE alltypes_plain ( + id INT NOT NULL, + bool_col BOOLEAN NOT NULL, + tinyint_col TINYINT NOT NULL, + smallint_col SMALLINT NOT NULL, + int_col INT NOT NULL, + bigint_col BIGINT NOT NULL, + float_col FLOAT NOT NULL, + double_col DOUBLE NOT NULL, + date_string_col BYTEA NOT NULL, + string_col VARCHAR NOT NULL, + timestamp_col TIMESTAMP NOT NULL, +) +STORED AS AVRO +WITH HEADER ROW +LOCATION '../../testing/data/avro/alltypes_plain.avro' + +statement ok +CREATE EXTERNAL TABLE single_nan ( + mycol FLOAT +) +STORED AS AVRO +WITH HEADER ROW +LOCATION '../../testing/data/avro/single_nan.avro' + +# test avro query +query IT +SELECT id, CAST(string_col AS varchar) FROM alltypes_plain +---- +4 0 +5 1 +6 0 +7 1 +2 0 +3 1 +0 0 +1 1 + +# test avro single nan schema +query R +SELECT mycol FROM single_nan +---- +NULL + +# test avro query multi files +query IT +SELECT id, CAST(string_col AS varchar) FROM alltypes_plain_multi_files +---- +4 0 +5 1 +6 0 +7 1 +2 0 +3 1 +0 0 +1 1 +4 0 +5 1 +6 0 +7 1 +2 0 +3 1 +0 0 +1 1 + +# test avro explain +query TT +EXPLAIN SELECT count(*) from alltypes_plain +---- +logical_plan +Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]] +--TableScan: alltypes_plain projection=[id] +physical_plan +AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))] +--CoalescePartitionsExec +----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))] +------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------AvroExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/avro/alltypes_plain.avro]]}, projection=[id]