From 55bf8e9c2ca6d358e7dc4c37e1439de38008f960 Mon Sep 17 00:00:00 2001 From: Jie Han <11144133+doki23@users.noreply.github.com> Date: Thu, 24 Nov 2022 01:58:33 +0800 Subject: [PATCH] Support types other than String for partition columns on ListingTables (#4221) * partition columns config * solve test compile problems * fix ut * add partition column types config for each file reader options * get correct partition column types from partitioned files * remove DEFAULT_PARTITION_COLUMN_TYPE * update pruned_partition_list * change api * add some tests * create partitioned external table with schema * upd * Update datafusion/core/src/datasource/listing_table_factory.rs Co-authored-by: Andrew Lamb * code clean * code clean Co-authored-by: Andrew Lamb --- .../core/src/datasource/listing/helpers.rs | 77 ++++++++--- .../core/src/datasource/listing/table.rs | 61 ++++++--- .../src/datasource/listing_table_factory.rs | 38 +++++- datafusion/core/src/datasource/memory.rs | 1 + datafusion/core/src/execution/options.rs | 34 +++-- .../src/physical_plan/file_format/avro.rs | 6 +- .../core/src/physical_plan/file_format/csv.rs | 4 +- .../physical_plan/file_format/file_stream.rs | 6 +- .../core/src/physical_plan/file_format/mod.rs | 41 +++--- .../src/physical_plan/file_format/parquet.rs | 7 +- datafusion/core/tests/path_partition.rs | 123 ++++++++++++++++-- datafusion/core/tests/sql/create_drop.rs | 68 ++++++++++ datafusion/proto/src/logical_plan.rs | 22 +++- 13 files changed, 399 insertions(+), 89 deletions(-) diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index f1a34e665a60..3cfe9ec148ed 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -19,6 +19,7 @@ use std::sync::Arc; +use arrow::array::new_empty_array; use arrow::{ array::{ Array, ArrayBuilder, ArrayRef, Date64Array, Date64Builder, StringBuilder, @@ -176,7 +177,7 @@ pub async fn pruned_partition_list<'a>( table_path: &'a ListingTableUrl, filters: &'a [Expr], file_extension: &'a str, - table_partition_cols: &'a [String], + table_partition_cols: &'a [(String, DataType)], ) -> Result>> { let list = table_path.list_all_files(store, file_extension); @@ -187,7 +188,15 @@ pub async fn pruned_partition_list<'a>( let applicable_filters: Vec<_> = filters .iter() - .filter(|f| expr_applicable_for_cols(table_partition_cols, f)) + .filter(|f| { + expr_applicable_for_cols( + &table_partition_cols + .iter() + .map(|x| x.0.clone()) + .collect::>(), + f, + ) + }) .collect(); if applicable_filters.is_empty() { @@ -200,11 +209,26 @@ pub async fn pruned_partition_list<'a>( let parsed_path = parse_partitions_for_path( table_path, &object_meta.location, - table_partition_cols, + &table_partition_cols + .iter() + .map(|x| x.0.clone()) + .collect::>(), ) .map(|p| { p.iter() - .map(|&pn| ScalarValue::Utf8(Some(pn.to_owned()))) + .zip(table_partition_cols) + .map(|(&part_value, part_column)| { + ScalarValue::try_from_string( + part_value.to_string(), + &part_column.1, + ) + .unwrap_or_else(|_| { + panic!( + "Failed to cast str {} to type {}", + part_value, part_column.1 + ) + }) + }) .collect() }); @@ -221,6 +245,7 @@ pub async fn pruned_partition_list<'a>( let metas: Vec<_> = list.try_collect().await?; let batch = paths_to_batch(table_partition_cols, table_path, &metas)?; let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; + debug!("get mem_table: {:?}", mem_table); // Filter the partitions using a local datafusion context // TODO having the external context would allow us to resolve `Volatility::Stable` @@ -245,28 +270,35 @@ pub async fn pruned_partition_list<'a>( /// /// Note: For the last modified date, this looses precisions higher than millisecond. fn paths_to_batch( - table_partition_cols: &[String], + table_partition_cols: &[(String, DataType)], table_path: &ListingTableUrl, metas: &[ObjectMeta], ) -> Result { let mut key_builder = StringBuilder::with_capacity(metas.len(), 1024); let mut length_builder = UInt64Builder::with_capacity(metas.len()); let mut modified_builder = Date64Builder::with_capacity(metas.len()); - let mut partition_builders = table_partition_cols + let mut partition_scalar_values = table_partition_cols .iter() - .map(|_| StringBuilder::with_capacity(metas.len(), 1024)) + .map(|_| Vec::new()) .collect::>(); for file_meta in metas { if let Some(partition_values) = parse_partitions_for_path( table_path, &file_meta.location, - table_partition_cols, + &table_partition_cols + .iter() + .map(|x| x.0.clone()) + .collect::>(), ) { key_builder.append_value(file_meta.location.as_ref()); length_builder.append_value(file_meta.size as u64); modified_builder.append_value(file_meta.last_modified.timestamp_millis()); for (i, part_val) in partition_values.iter().enumerate() { - partition_builders[i].append_value(part_val); + let scalar_val = ScalarValue::try_from_string( + part_val.to_string(), + &table_partition_cols[i].1, + )?; + partition_scalar_values[i].push(scalar_val); } } else { debug!("No partitioning for path {}", file_meta.location); @@ -279,8 +311,13 @@ fn paths_to_batch( ArrayBuilder::finish(&mut length_builder), ArrayBuilder::finish(&mut modified_builder), ]; - for mut partition_builder in partition_builders { - col_arrays.push(ArrayBuilder::finish(&mut partition_builder)); + for (i, part_scalar_val) in partition_scalar_values.into_iter().enumerate() { + if part_scalar_val.is_empty() { + col_arrays.push(new_empty_array(&table_partition_cols[i].1)); + } else { + let partition_val_array = ScalarValue::iter_to_array(part_scalar_val)?; + col_arrays.push(partition_val_array); + } } // put the schema together @@ -289,8 +326,8 @@ fn paths_to_batch( Field::new(FILE_SIZE_COLUMN_NAME, DataType::UInt64, false), Field::new(FILE_MODIFIED_COLUMN_NAME, DataType::Date64, true), ]; - for pn in table_partition_cols { - fields.push(Field::new(pn, DataType::Utf8, false)); + for part_col in table_partition_cols { + fields.push(Field::new(&part_col.0, part_col.1.to_owned(), false)); } let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), col_arrays)?; @@ -366,9 +403,10 @@ fn parse_partitions_for_path<'a>( #[cfg(test)] mod tests { + use futures::StreamExt; + use crate::logical_expr::{case, col, lit}; use crate::test::object_store::make_test_store; - use futures::StreamExt; use super::*; @@ -424,7 +462,7 @@ mod tests { &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter], ".parquet", - &[String::from("mypartition")], + &[(String::from("mypartition"), DataType::Utf8)], ) .await .expect("partition pruning failed") @@ -447,7 +485,7 @@ mod tests { &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter], ".parquet", - &[String::from("mypartition")], + &[(String::from("mypartition"), DataType::Utf8)], ) .await .expect("partition pruning failed") @@ -494,7 +532,10 @@ mod tests { &ListingTableUrl::parse("file:///tablepath/").unwrap(), &[filter1, filter2, filter3], ".parquet", - &[String::from("part1"), String::from("part2")], + &[ + (String::from("part1"), DataType::Utf8), + (String::from("part2"), DataType::Utf8), + ], ) .await .expect("partition pruning failed") @@ -645,7 +686,7 @@ mod tests { ]; let batches = paths_to_batch( - &[String::from("part1")], + &[(String::from("part1"), DataType::Utf8)], &ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(), &files, ) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 0fc17422a5f7..e3ee5f384025 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -21,7 +21,7 @@ use std::str::FromStr; use std::{any::Any, sync::Arc}; use arrow::compute::SortOptions; -use arrow::datatypes::{Field, Schema, SchemaRef}; +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; use dashmap::DashMap; use datafusion_physical_expr::PhysicalSortExpr; @@ -41,14 +41,14 @@ use crate::datasource::{ }; use crate::logical_expr::TableProviderFilterPushDown; use crate::physical_plan; +use crate::physical_plan::file_format::partition_type_wrap; use crate::{ error::{DataFusionError, Result}, execution::context::SessionState, logical_expr::Expr, physical_plan::{ - empty::EmptyExec, - file_format::{FileScanConfig, DEFAULT_PARTITION_COLUMN_DATATYPE}, - project_schema, ExecutionPlan, Statistics, + empty::EmptyExec, file_format::FileScanConfig, project_schema, ExecutionPlan, + Statistics, }, }; @@ -210,9 +210,7 @@ pub struct ListingOptions { /// partitioning expected should be named "a" and "b": /// - If there is a third level of partitioning it will be ignored. /// - Files that don't follow this partitioning will be ignored. - /// Note that only `DEFAULT_PARTITION_COLUMN_DATATYPE` is currently - /// supported for the column type. - pub table_partition_cols: Vec, + pub table_partition_cols: Vec<(String, DataType)>, /// Set true to try to guess statistics from the files. /// This can add a lot of overhead as it will usually require files /// to be opened and at least partially parsed. @@ -270,16 +268,19 @@ impl ListingOptions { /// /// ``` /// use std::sync::Arc; + /// use arrow::datatypes::DataType; /// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; /// /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())) - /// .with_table_partition_cols(vec!["col_a".to_string(), "col_b".to_string()]); + /// .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8), + /// ("col_b".to_string(), DataType::Utf8)]); /// - /// assert_eq!(listing_options.table_partition_cols, vec!["col_a", "col_b"]); + /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8), + /// ("col_b".to_string(), DataType::Utf8)]); /// ``` pub fn with_table_partition_cols( mut self, - table_partition_cols: Vec, + table_partition_cols: Vec<(String, DataType)>, ) -> Self { self.table_partition_cols = table_partition_cols; self @@ -428,10 +429,10 @@ impl ListingTable { // Add the partition columns to the file schema let mut table_fields = file_schema.fields().clone(); - for part in &options.table_partition_cols { + for (part_col_name, part_col_type) in &options.table_partition_cols { table_fields.push(Field::new( - part, - DEFAULT_PARTITION_COLUMN_DATATYPE.clone(), + part_col_name, + partition_type_wrap(part_col_type.clone()), false, )); } @@ -536,6 +537,23 @@ impl TableProvider for ListingTable { return Ok(Arc::new(EmptyExec::new(false, projected_schema))); } + // extract types of partition columns + let table_partition_cols = self + .options + .table_partition_cols + .iter() + .map(|col| { + ( + col.0.to_owned(), + self.table_schema + .field_with_name(&col.0) + .unwrap() + .data_type() + .clone(), + ) + }) + .collect(); + // create the execution plan self.options .format @@ -548,7 +566,7 @@ impl TableProvider for ListingTable { projection: projection.clone(), limit, output_ordering: self.try_create_output_ordering()?, - table_partition_cols: self.options.table_partition_cols.clone(), + table_partition_cols, config_options: ctx.config.config_options(), }, filters, @@ -560,7 +578,15 @@ impl TableProvider for ListingTable { &self, filter: &Expr, ) -> Result { - if expr_applicable_for_cols(&self.options.table_partition_cols, filter) { + if expr_applicable_for_cols( + &self + .options + .table_partition_cols + .iter() + .map(|x| x.0.clone()) + .collect::>(), + filter, + ) { // if filter can be handled by partiton pruning, it is exact Ok(TableProviderFilterPushDown::Exact) } else { @@ -807,7 +833,10 @@ mod tests { let opt = ListingOptions::new(Arc::new(AvroFormat {})) .with_file_extension(FileType::AVRO.get_ext()) - .with_table_partition_cols(vec![String::from("p1")]) + .with_table_partition_cols(vec![( + String::from("p1"), + partition_type_wrap(DataType::Utf8), + )]) .with_target_partitions(4); let table_path = ListingTableUrl::parse("test:///table/").unwrap(); diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 3662ab36199e..dbb8fdc78aa2 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -29,6 +29,7 @@ use crate::datasource::listing::{ }; use crate::datasource::TableProvider; use crate::execution::context::SessionState; +use arrow::datatypes::{DataType, SchemaRef}; use async_trait::async_trait; use datafusion_common::DataFusionError; use datafusion_expr::CreateExternalTable; @@ -88,17 +89,46 @@ impl TableProviderFactory for ListingTableFactory { ), }; - let provided_schema = if cmd.schema.fields().is_empty() { - None + let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() { + ( + None, + cmd.table_partition_cols + .iter() + .map(|x| (x.clone(), DataType::Utf8)) + .collect::>(), + ) } else { - Some(Arc::new(cmd.schema.as_ref().to_owned().into())) + let schema: SchemaRef = Arc::new(cmd.schema.as_ref().to_owned().into()); + let table_partition_cols = cmd + .table_partition_cols + .iter() + .map(|col| { + schema.field_with_name(col).map_err(|arrow_err| { + DataFusionError::Execution(arrow_err.to_string()) + }) + }) + .collect::>>()? + .into_iter() + .map(|f| (f.name().to_owned(), f.data_type().to_owned())) + .collect(); + // exclude partition columns to support creating partitioned external table + // with a specified column definition like + // `create external table a(c0 int, c1 int) stored as csv partitioned by (c1)...` + let mut project_idx = Vec::new(); + for i in 0..schema.fields().len() { + if !cmd.table_partition_cols.contains(schema.field(i).name()) { + project_idx.push(i); + } + } + let schema = Arc::new(schema.project(&project_idx)?); + (Some(schema), table_partition_cols) }; let options = ListingOptions::new(file_format) .with_collect_stat(state.config.collect_statistics) .with_file_extension(file_extension) .with_target_partitions(state.config.target_partitions) - .with_table_partition_cols(cmd.table_partition_cols.clone()) + .with_table_partition_cols(table_partition_cols) .with_file_sort_order(None); let table_path = ListingTableUrl::parse(&cmd.location)?; diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs index 4e5238ed2ecc..38e16fb03e8b 100644 --- a/datafusion/core/src/datasource/memory.rs +++ b/datafusion/core/src/datasource/memory.rs @@ -37,6 +37,7 @@ use crate::physical_plan::ExecutionPlan; use crate::physical_plan::{repartition::RepartitionExec, Partitioning}; /// In-memory table +#[derive(Debug)] pub struct MemTable { schema: SchemaRef, batches: Vec>, diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs index e130ac84da58..17f59ec7f41d 100644 --- a/datafusion/core/src/execution/options.rs +++ b/datafusion/core/src/execution/options.rs @@ -19,7 +19,7 @@ use std::sync::Arc; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::{DataType, Schema, SchemaRef}; use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; @@ -58,8 +58,7 @@ pub struct CsvReadOptions<'a> { /// Defaults to `FileType::CSV.get_ext().as_str()`. pub file_extension: &'a str, /// Partition Columns - pub table_partition_cols: Vec, - + pub table_partition_cols: Vec<(String, DataType)>, /// File compression type pub file_compression_type: FileCompressionType, } @@ -117,7 +116,10 @@ impl<'a> CsvReadOptions<'a> { } /// Specify table_partition_cols for partition pruning - pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + pub fn table_partition_cols( + mut self, + table_partition_cols: Vec<(String, DataType)>, + ) -> Self { self.table_partition_cols = table_partition_cols; self } @@ -164,7 +166,7 @@ pub struct ParquetReadOptions<'a> { /// Defaults to ".parquet". pub file_extension: &'a str, /// Partition Columns - pub table_partition_cols: Vec, + pub table_partition_cols: Vec<(String, DataType)>, /// Should DataFusion parquet reader use the predicate to prune data, /// overridden by value on execution::context::SessionConfig // TODO move this into ConfigOptions @@ -205,7 +207,10 @@ impl<'a> ParquetReadOptions<'a> { } /// Specify table_partition_cols for partition pruning - pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + pub fn table_partition_cols( + mut self, + table_partition_cols: Vec<(String, DataType)>, + ) -> Self { self.table_partition_cols = table_partition_cols; self } @@ -238,7 +243,7 @@ pub struct AvroReadOptions<'a> { /// Defaults to `FileType::AVRO.get_ext().as_str()`. pub file_extension: &'a str, /// Partition Columns - pub table_partition_cols: Vec, + pub table_partition_cols: Vec<(String, DataType)>, } impl<'a> Default for AvroReadOptions<'a> { @@ -253,7 +258,10 @@ impl<'a> Default for AvroReadOptions<'a> { impl<'a> AvroReadOptions<'a> { /// Specify table_partition_cols for partition pruning - pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + pub fn table_partition_cols( + mut self, + table_partition_cols: Vec<(String, DataType)>, + ) -> Self { self.table_partition_cols = table_partition_cols; self } @@ -279,16 +287,13 @@ impl<'a> AvroReadOptions<'a> { pub struct NdJsonReadOptions<'a> { /// The data source schema. pub schema: Option, - /// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`. pub schema_infer_max_records: usize, - /// File extension; only files with this extension are selected for data input. /// Defaults to `FileType::JSON.get_ext().as_str()`. pub file_extension: &'a str, /// Partition Columns - pub table_partition_cols: Vec, - + pub table_partition_cols: Vec<(String, DataType)>, /// File compression type pub file_compression_type: FileCompressionType, } @@ -307,7 +312,10 @@ impl<'a> Default for NdJsonReadOptions<'a> { impl<'a> NdJsonReadOptions<'a> { /// Specify table_partition_cols for partition pruning - pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { + pub fn table_partition_cols( + mut self, + table_partition_cols: Vec<(String, DataType)>, + ) -> Self { self.table_partition_cols = table_partition_cols; self } diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index 59faac8fab67..af4582c21fc1 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -208,6 +208,7 @@ mod tests { use crate::datasource::file_format::{avro::AvroFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::physical_plan::file_format::partition_type_wrap; use crate::prelude::SessionContext; use crate::scalar::ScalarValue; use crate::test::object_store::local_unpartitioned_file; @@ -374,7 +375,10 @@ mod tests { file_schema, statistics: Statistics::default(), limit: None, - table_partition_cols: vec!["date".to_owned()], + table_partition_cols: vec![( + "date".to_owned(), + partition_type_wrap(DataType::Utf8), + )], config_options: ConfigOptions::new().into_shareable(), output_ordering: None, }); diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 8b50755f4b7f..4547703496c3 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -287,6 +287,7 @@ mod tests { use super::*; use crate::datasource::file_format::file_type::FileType; use crate::physical_plan::file_format::chunked_store::ChunkedStore; + use crate::physical_plan::file_format::partition_type_wrap; use crate::prelude::*; use crate::test::{partitioned_csv_config, partitioned_file_groups}; use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data}; @@ -488,7 +489,8 @@ mod tests { let mut config = partitioned_csv_config(file_schema, file_groups)?; // Add partition columns - config.table_partition_cols = vec!["date".to_owned()]; + config.table_partition_cols = + vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))]; config.file_groups[0][0].partition_values = vec![ScalarValue::Utf8(Some("2021-10-26".to_owned()))]; diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index 063238a6d471..51a911d5186a 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -182,7 +182,11 @@ impl FileStream { let (projected_schema, _) = config.project(); let pc_projector = PartitionColumnProjector::new( projected_schema.clone(), - &config.table_partition_cols, + &config + .table_partition_cols + .iter() + .map(|x| x.0.clone()) + .collect::>(), ); let files = config.file_groups[partition].clone(); diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index fbeab4cbf4c9..f2ccc16bc683 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -52,7 +52,6 @@ use crate::{ }; use arrow::array::{new_null_array, UInt16BufferBuilder}; use arrow::record_batch::RecordBatchOptions; -use lazy_static::lazy_static; use log::{debug, info}; use object_store::path::Path; use object_store::ObjectMeta; @@ -65,9 +64,9 @@ use std::{ use super::{ColumnStatistics, Statistics}; -lazy_static! { - /// The datatype used for all partitioning columns for now - pub static ref DEFAULT_PARTITION_COLUMN_DATATYPE: DataType = DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)); +/// convert logical type of partition column to physical type: Dictionary(UInt16, val_type) +pub fn partition_type_wrap(val_type: DataType) -> DataType { + DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val_type)) } /// The base configurations to provide when creating a physical plan for @@ -99,8 +98,8 @@ pub struct FileScanConfig { /// The maximum number of records to read from this plan. If None, /// all records after filtering are returned. pub limit: Option, - /// The partitioning column names - pub table_partition_cols: Vec, + /// The partitioning columns + pub table_partition_cols: Vec<(String, DataType)>, /// The order in which the data is sorted, if known. pub output_ordering: Option>, /// Configuration options passed to the physical plans @@ -134,8 +133,8 @@ impl FileScanConfig { } else { let partition_idx = idx - self.file_schema.fields().len(); table_fields.push(Field::new( - &self.table_partition_cols[partition_idx], - DEFAULT_PARTITION_COLUMN_DATATYPE.clone(), + &self.table_partition_cols[partition_idx].0, + self.table_partition_cols[partition_idx].1.to_owned(), false, )); // TODO provide accurate stat for partition column (#1186) @@ -406,10 +405,7 @@ fn create_dict_array( }; // create data type - let data_type = - DataType::Dictionary(Box::new(DataType::UInt16), Box::new(val.get_datatype())); - - debug_assert_eq!(data_type, *DEFAULT_PARTITION_COLUMN_DATATYPE); + let data_type = partition_type_wrap(val.get_datatype()); // assemble pieces together let mut builder = ArrayData::builder(data_type) @@ -539,7 +535,7 @@ mod tests { Arc::clone(&file_schema), None, Statistics::default(), - vec!["date".to_owned()], + vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))], ); let (proj_schema, proj_statistics) = conf.project(); @@ -585,7 +581,7 @@ mod tests { ), ..Default::default() }, - vec!["date".to_owned()], + vec![("date".to_owned(), partition_type_wrap(DataType::Utf8))], ); let (proj_schema, proj_statistics) = conf.project(); @@ -615,8 +611,11 @@ mod tests { ("b", &vec![-2, -1, 0]), ("c", &vec![10, 11, 12]), ); - let partition_cols = - vec!["year".to_owned(), "month".to_owned(), "day".to_owned()]; + let partition_cols = vec![ + ("year".to_owned(), partition_type_wrap(DataType::Utf8)), + ("month".to_owned(), partition_type_wrap(DataType::Utf8)), + ("day".to_owned(), partition_type_wrap(DataType::Utf8)), + ]; // create a projected schema let conf = config_for_projection( file_batch.schema(), @@ -633,7 +632,13 @@ mod tests { ); let (proj_schema, _) = conf.project(); // created a projector for that projected schema - let mut proj = PartitionColumnProjector::new(proj_schema, &partition_cols); + let mut proj = PartitionColumnProjector::new( + proj_schema, + &partition_cols + .iter() + .map(|x| x.0.clone()) + .collect::>(), + ); // project first batch let projected_batch = proj @@ -777,7 +782,7 @@ mod tests { file_schema: SchemaRef, projection: Option>, statistics: Statistics, - table_partition_cols: Vec, + table_partition_cols: Vec<(String, DataType)>, ) -> FileScanConfig { FileScanConfig { file_schema, diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index fa68f307257f..cfd1c2194116 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -725,6 +725,7 @@ mod tests { use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::options::CsvReadOptions; use crate::physical_plan::displayable; + use crate::physical_plan::file_format::partition_type_wrap; use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; use crate::test::object_store::local_unpartitioned_file; use crate::{ @@ -1441,9 +1442,9 @@ mod tests { projection: Some(vec![0, 1, 2, 12]), limit: None, table_partition_cols: vec![ - "year".to_owned(), - "month".to_owned(), - "day".to_owned(), + ("year".to_owned(), partition_type_wrap(DataType::Utf8)), + ("month".to_owned(), partition_type_wrap(DataType::Utf8)), + ("day".to_owned(), partition_type_wrap(DataType::Utf8)), ], config_options: ConfigOptions::new().into_shareable(), output_ordering: None, diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index 028ddf14ec95..41ae758643a9 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -17,6 +17,7 @@ //! Test queries on partitioned datasets +use arrow::datatypes::DataType; use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::ops::Range; @@ -56,7 +57,11 @@ async fn parquet_distinct_partition_col() -> Result<()> { "year=2021/month=10/day=09/file.parquet", "year=2021/month=10/day=28/file.parquet", ], - &["year", "month", "day"], + &[ + ("year", DataType::Int32), + ("month", DataType::Utf8), + ("day", DataType::Utf8), + ], "mirror:///", "alltypes_plain.parquet", ) @@ -195,7 +200,7 @@ async fn csv_filter_with_file_col() -> Result<()> { "mytable/date=2021-10-27/file.csv", "mytable/date=2021-10-28/file.csv", ], - &["date"], + &[("date", DataType::Utf8)], "mirror:///mytable/", ); @@ -221,6 +226,42 @@ async fn csv_filter_with_file_col() -> Result<()> { Ok(()) } +#[tokio::test] +async fn csv_filter_with_file_nonstring_col() -> Result<()> { + let ctx = SessionContext::new(); + + register_partitioned_aggregate_csv( + &ctx, + &[ + "mytable/date=2021-10-27/file.csv", + "mytable/date=2021-10-28/file.csv", + ], + &[("date", DataType::Date32)], + "mirror:///mytable/", + ); + + let result = ctx + .sql("SELECT c1, c2, date FROM t WHERE date > '2021-10-27' LIMIT 5") + .await? + .collect() + .await?; + + let expected = vec![ + "+----+----+------------+", + "| c1 | c2 | date |", + "+----+----+------------+", + "| a | 1 | 2021-10-28 |", + "| b | 1 | 2021-10-28 |", + "| b | 5 | 2021-10-28 |", + "| c | 2 | 2021-10-28 |", + "| d | 5 | 2021-10-28 |", + "+----+----+------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + + Ok(()) +} + #[tokio::test] async fn csv_projection_on_partition() -> Result<()> { let ctx = SessionContext::new(); @@ -231,7 +272,7 @@ async fn csv_projection_on_partition() -> Result<()> { "mytable/date=2021-10-27/file.csv", "mytable/date=2021-10-28/file.csv", ], - &["date"], + &[("date", DataType::Date32)], "mirror:///mytable/", ); @@ -268,7 +309,7 @@ async fn csv_grouping_by_partition() -> Result<()> { "mytable/date=2021-10-27/file.csv", "mytable/date=2021-10-28/file.csv", ], - &["date"], + &[("date", DataType::Date32)], "mirror:///mytable/", ); @@ -302,7 +343,11 @@ async fn parquet_multiple_partitions() -> Result<()> { "year=2021/month=10/day=09/file.parquet", "year=2021/month=10/day=28/file.parquet", ], - &["year", "month", "day"], + &[ + ("year", DataType::Utf8), + ("month", DataType::Utf8), + ("day", DataType::Utf8), + ], "mirror:///", "alltypes_plain.parquet", ) @@ -333,6 +378,52 @@ async fn parquet_multiple_partitions() -> Result<()> { Ok(()) } +#[tokio::test] +async fn parquet_multiple_nonstring_partitions() -> Result<()> { + let ctx = SessionContext::new(); + + register_partitioned_alltypes_parquet( + &ctx, + &[ + "year=2021/month=09/day=09/file.parquet", + "year=2021/month=10/day=09/file.parquet", + "year=2021/month=10/day=28/file.parquet", + ], + &[ + ("year", DataType::Int32), + ("month", DataType::Int32), + ("day", DataType::Int32), + ], + "mirror:///", + "alltypes_plain.parquet", + ) + .await; + + let result = ctx + .sql("SELECT id, day FROM t WHERE day=month ORDER BY id") + .await? + .collect() + .await?; + + let expected = vec![ + "+----+-----+", + "| id | day |", + "+----+-----+", + "| 0 | 9 |", + "| 1 | 9 |", + "| 2 | 9 |", + "| 3 | 9 |", + "| 4 | 9 |", + "| 5 | 9 |", + "| 6 | 9 |", + "| 7 | 9 |", + "+----+-----+", + ]; + assert_batches_sorted_eq!(expected, &result); + + Ok(()) +} + #[tokio::test] async fn parquet_statistics() -> Result<()> { let ctx = SessionContext::new(); @@ -344,7 +435,11 @@ async fn parquet_statistics() -> Result<()> { "year=2021/month=10/day=09/file.parquet", "year=2021/month=10/day=28/file.parquet", ], - &["year", "month", "day"], + &[ + ("year", DataType::Int32), + ("month", DataType::Utf8), + ("day", DataType::Utf8), + ], "mirror:///", // This is the only file we found in the test set with // actual stats. It has 1 column / 1 row. @@ -404,7 +499,7 @@ async fn parquet_overlapping_columns() -> Result<()> { "id=2/file.parquet", "id=3/file.parquet", ], - &["id"], + &[("id", DataType::Int64)], "mirror:///", "alltypes_plain.parquet", ) @@ -422,7 +517,7 @@ async fn parquet_overlapping_columns() -> Result<()> { fn register_partitioned_aggregate_csv( ctx: &SessionContext, store_paths: &[&str], - partition_cols: &[&str], + partition_cols: &[(&str, DataType)], table_path: &str, ) { let testdata = arrow_test_data(); @@ -436,7 +531,10 @@ fn register_partitioned_aggregate_csv( let options = ListingOptions::new(Arc::new(CsvFormat::default())) .with_table_partition_cols( - partition_cols.iter().map(|&s| s.to_owned()).collect(), + partition_cols + .iter() + .map(|x| (x.0.to_owned(), x.1.clone())) + .collect::>(), ); let table_path = ListingTableUrl::parse(table_path).unwrap(); @@ -452,7 +550,7 @@ fn register_partitioned_aggregate_csv( async fn register_partitioned_alltypes_parquet( ctx: &SessionContext, store_paths: &[&str], - partition_cols: &[&str], + partition_cols: &[(&str, DataType)], table_path: &str, source_file: &str, ) { @@ -466,7 +564,10 @@ async fn register_partitioned_alltypes_parquet( let options = ListingOptions::new(Arc::new(ParquetFormat::default())) .with_table_partition_cols( - partition_cols.iter().map(|&s| s.to_owned()).collect(), + partition_cols + .iter() + .map(|x| (x.0.to_owned(), x.1.clone())) + .collect::>(), ); let table_path = ListingTableUrl::parse(table_path).unwrap(); diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs index 452f41b7d59e..fa2cbc00ddb8 100644 --- a/datafusion/core/tests/sql/create_drop.rs +++ b/datafusion/core/tests/sql/create_drop.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::fs::create_dir_all; use std::io::Write; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; @@ -341,6 +342,73 @@ async fn create_external_table_with_timestamps() { assert_batches_sorted_eq!(expected, &result); } +#[tokio::test] +async fn create_partitioned_external_table() -> Result<()> { + let ctx = SessionContext::new(); + + let data0 = "Jorge,2018-12-13T12:12:10.011Z"; + let data1 = "Andrew,2018-11-13T17:11:10.011Z"; + + let tmp_dir = TempDir::new().unwrap(); + + // scope to ensure the file is closed and written + { + let part0 = tmp_dir.path().join("c_date=2018-12-13"); + let file_path0 = part0.join("timestamps.csv"); + create_dir_all(part0).expect("creating part dir"); + File::options() + .read(true) + .write(true) + .create(true) + .open(&file_path0) + .expect("creating temp file") + .write_all(data0.as_bytes()) + .expect("writing data"); + + let part1 = tmp_dir.path().join("c_date=2018-11-13"); + let file_path1 = part1.join("timestamps.csv"); + create_dir_all(part1).expect("creating part dir"); + File::options() + .read(true) + .write(true) + .create(true) + .open(&file_path1) + .expect("creating temp file") + .write_all(data1.as_bytes()) + .expect("writing data"); + } + + let sql = format!( + "CREATE EXTERNAL TABLE csv_with_timestamps ( + name VARCHAR, + ts TIMESTAMP, + c_date DATE, + ) + STORED AS CSV + PARTITIONED BY (c_date) + LOCATION '{}' + ", + tmp_dir.path().to_str().expect("path is utf8") + ); + + plan_and_collect(&ctx, &sql) + .await + .expect("Executing CREATE EXTERNAL TABLE"); + + let sql = "SELECT * from csv_with_timestamps where c_date='2018-11-13'"; + let result = plan_and_collect(&ctx, sql).await.unwrap(); + let expected = vec![ + "+--------+-------------------------+------------+", + "| name | ts | c_date |", + "+--------+-------------------------+------------+", + "| Andrew | 2018-11-13T17:11:10.011 | 2018-11-13 |", + "+--------+-------------------------+------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + + Ok(()) +} + #[tokio::test] #[should_panic(expected = "already exists")] async fn sql_create_duplicate_table() { diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 499e92911e24..42d3401eff5d 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -461,7 +461,21 @@ impl AsLogicalPlan for LogicalPlanNode { let options = ListingOptions::new(file_format) .with_file_extension(scan.file_extension.clone()) - .with_table_partition_cols(scan.table_partition_cols.clone()) + .with_table_partition_cols( + scan.table_partition_cols + .iter() + .map(|col| { + ( + col.clone(), + schema + .field_with_name(col) + .unwrap() + .data_type() + .clone(), + ) + }) + .collect(), + ) .with_collect_stat(scan.collect_stat) .with_target_partitions(scan.target_partitions as usize) .with_file_sort_order(file_sort_order); @@ -876,7 +890,7 @@ impl AsLogicalPlan for LogicalPlanNode { FileFormatType::Avro(protobuf::AvroFormat {}) } else { return Err(proto_error(format!( - "Error converting file format, {:?} is invalid as a datafusion foramt.", + "Error converting file format, {:?} is invalid as a datafusion format.", listing_table.options().format ))); }; @@ -901,7 +915,9 @@ impl AsLogicalPlan for LogicalPlanNode { file_extension: options.file_extension.clone(), table_partition_cols: options .table_partition_cols - .clone(), + .iter() + .map(|x| x.0.clone()) + .collect::>(), paths: listing_table .table_paths() .iter()