Skip to content

Commit

Permalink
Support types other than String for partition columns on ListingTables (
Browse files Browse the repository at this point in the history
#4221)

* partition columns config

* solve test compile problems

* fix ut

* add partition column types config for each file reader options

* get correct partition column types from partitioned files

* remove DEFAULT_PARTITION_COLUMN_TYPE

* update pruned_partition_list

* change api

* add some tests

* create partitioned external table with schema

* upd

* Update datafusion/core/src/datasource/listing_table_factory.rs

Co-authored-by: Andrew Lamb <[email protected]>

* code clean

* code clean

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
doki23 and alamb authored Nov 23, 2022
1 parent 502b7e3 commit 55bf8e9
Show file tree
Hide file tree
Showing 13 changed files with 399 additions and 89 deletions.
77 changes: 59 additions & 18 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::sync::Arc;

use arrow::array::new_empty_array;
use arrow::{
array::{
Array, ArrayBuilder, ArrayRef, Date64Array, Date64Builder, StringBuilder,
Expand Down Expand Up @@ -176,7 +177,7 @@ pub async fn pruned_partition_list<'a>(
table_path: &'a ListingTableUrl,
filters: &'a [Expr],
file_extension: &'a str,
table_partition_cols: &'a [String],
table_partition_cols: &'a [(String, DataType)],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
let list = table_path.list_all_files(store, file_extension);

Expand All @@ -187,7 +188,15 @@ pub async fn pruned_partition_list<'a>(

let applicable_filters: Vec<_> = filters
.iter()
.filter(|f| expr_applicable_for_cols(table_partition_cols, f))
.filter(|f| {
expr_applicable_for_cols(
&table_partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
f,
)
})
.collect();

if applicable_filters.is_empty() {
Expand All @@ -200,11 +209,26 @@ pub async fn pruned_partition_list<'a>(
let parsed_path = parse_partitions_for_path(
table_path,
&object_meta.location,
table_partition_cols,
&table_partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
)
.map(|p| {
p.iter()
.map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
.zip(table_partition_cols)
.map(|(&part_value, part_column)| {
ScalarValue::try_from_string(
part_value.to_string(),
&part_column.1,
)
.unwrap_or_else(|_| {
panic!(
"Failed to cast str {} to type {}",
part_value, part_column.1
)
})
})
.collect()
});

Expand All @@ -221,6 +245,7 @@ pub async fn pruned_partition_list<'a>(
let metas: Vec<_> = list.try_collect().await?;
let batch = paths_to_batch(table_partition_cols, table_path, &metas)?;
let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
debug!("get mem_table: {:?}", mem_table);

// Filter the partitions using a local datafusion context
// TODO having the external context would allow us to resolve `Volatility::Stable`
Expand All @@ -245,28 +270,35 @@ pub async fn pruned_partition_list<'a>(
///
/// Note: For the last modified date, this looses precisions higher than millisecond.
fn paths_to_batch(
table_partition_cols: &[String],
table_partition_cols: &[(String, DataType)],
table_path: &ListingTableUrl,
metas: &[ObjectMeta],
) -> Result<RecordBatch> {
let mut key_builder = StringBuilder::with_capacity(metas.len(), 1024);
let mut length_builder = UInt64Builder::with_capacity(metas.len());
let mut modified_builder = Date64Builder::with_capacity(metas.len());
let mut partition_builders = table_partition_cols
let mut partition_scalar_values = table_partition_cols
.iter()
.map(|_| StringBuilder::with_capacity(metas.len(), 1024))
.map(|_| Vec::new())
.collect::<Vec<_>>();
for file_meta in metas {
if let Some(partition_values) = parse_partitions_for_path(
table_path,
&file_meta.location,
table_partition_cols,
&table_partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
) {
key_builder.append_value(file_meta.location.as_ref());
length_builder.append_value(file_meta.size as u64);
modified_builder.append_value(file_meta.last_modified.timestamp_millis());
for (i, part_val) in partition_values.iter().enumerate() {
partition_builders[i].append_value(part_val);
let scalar_val = ScalarValue::try_from_string(
part_val.to_string(),
&table_partition_cols[i].1,
)?;
partition_scalar_values[i].push(scalar_val);
}
} else {
debug!("No partitioning for path {}", file_meta.location);
Expand All @@ -279,8 +311,13 @@ fn paths_to_batch(
ArrayBuilder::finish(&mut length_builder),
ArrayBuilder::finish(&mut modified_builder),
];
for mut partition_builder in partition_builders {
col_arrays.push(ArrayBuilder::finish(&mut partition_builder));
for (i, part_scalar_val) in partition_scalar_values.into_iter().enumerate() {
if part_scalar_val.is_empty() {
col_arrays.push(new_empty_array(&table_partition_cols[i].1));
} else {
let partition_val_array = ScalarValue::iter_to_array(part_scalar_val)?;
col_arrays.push(partition_val_array);
}
}

// put the schema together
Expand All @@ -289,8 +326,8 @@ fn paths_to_batch(
Field::new(FILE_SIZE_COLUMN_NAME, DataType::UInt64, false),
Field::new(FILE_MODIFIED_COLUMN_NAME, DataType::Date64, true),
];
for pn in table_partition_cols {
fields.push(Field::new(pn, DataType::Utf8, false));
for part_col in table_partition_cols {
fields.push(Field::new(&part_col.0, part_col.1.to_owned(), false));
}

let batch = RecordBatch::try_new(Arc::new(Schema::new(fields)), col_arrays)?;
Expand Down Expand Up @@ -366,9 +403,10 @@ fn parse_partitions_for_path<'a>(

#[cfg(test)]
mod tests {
use futures::StreamExt;

use crate::logical_expr::{case, col, lit};
use crate::test::object_store::make_test_store;
use futures::StreamExt;

use super::*;

Expand Down Expand Up @@ -424,7 +462,7 @@ mod tests {
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
".parquet",
&[String::from("mypartition")],
&[(String::from("mypartition"), DataType::Utf8)],
)
.await
.expect("partition pruning failed")
Expand All @@ -447,7 +485,7 @@ mod tests {
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter],
".parquet",
&[String::from("mypartition")],
&[(String::from("mypartition"), DataType::Utf8)],
)
.await
.expect("partition pruning failed")
Expand Down Expand Up @@ -494,7 +532,10 @@ mod tests {
&ListingTableUrl::parse("file:///tablepath/").unwrap(),
&[filter1, filter2, filter3],
".parquet",
&[String::from("part1"), String::from("part2")],
&[
(String::from("part1"), DataType::Utf8),
(String::from("part2"), DataType::Utf8),
],
)
.await
.expect("partition pruning failed")
Expand Down Expand Up @@ -645,7 +686,7 @@ mod tests {
];

let batches = paths_to_batch(
&[String::from("part1")],
&[(String::from("part1"), DataType::Utf8)],
&ListingTableUrl::parse("file:///mybucket/tablepath").unwrap(),
&files,
)
Expand Down
61 changes: 45 additions & 16 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::str::FromStr;
use std::{any::Any, sync::Arc};

use arrow::compute::SortOptions;
use arrow::datatypes::{Field, Schema, SchemaRef};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion_physical_expr::PhysicalSortExpr;
Expand All @@ -41,14 +41,14 @@ use crate::datasource::{
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::physical_plan;
use crate::physical_plan::file_format::partition_type_wrap;
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
logical_expr::Expr,
physical_plan::{
empty::EmptyExec,
file_format::{FileScanConfig, DEFAULT_PARTITION_COLUMN_DATATYPE},
project_schema, ExecutionPlan, Statistics,
empty::EmptyExec, file_format::FileScanConfig, project_schema, ExecutionPlan,
Statistics,
},
};

Expand Down Expand Up @@ -210,9 +210,7 @@ pub struct ListingOptions {
/// partitioning expected should be named "a" and "b":
/// - If there is a third level of partitioning it will be ignored.
/// - Files that don't follow this partitioning will be ignored.
/// Note that only `DEFAULT_PARTITION_COLUMN_DATATYPE` is currently
/// supported for the column type.
pub table_partition_cols: Vec<String>,
pub table_partition_cols: Vec<(String, DataType)>,
/// Set true to try to guess statistics from the files.
/// This can add a lot of overhead as it will usually require files
/// to be opened and at least partially parsed.
Expand Down Expand Up @@ -270,16 +268,19 @@ impl ListingOptions {
///
/// ```
/// use std::sync::Arc;
/// use arrow::datatypes::DataType;
/// use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat};
///
/// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
/// .with_table_partition_cols(vec!["col_a".to_string(), "col_b".to_string()]);
/// .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
/// ("col_b".to_string(), DataType::Utf8)]);
///
/// assert_eq!(listing_options.table_partition_cols, vec!["col_a", "col_b"]);
/// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
/// ("col_b".to_string(), DataType::Utf8)]);
/// ```
pub fn with_table_partition_cols(
mut self,
table_partition_cols: Vec<String>,
table_partition_cols: Vec<(String, DataType)>,
) -> Self {
self.table_partition_cols = table_partition_cols;
self
Expand Down Expand Up @@ -428,10 +429,10 @@ impl ListingTable {

// Add the partition columns to the file schema
let mut table_fields = file_schema.fields().clone();
for part in &options.table_partition_cols {
for (part_col_name, part_col_type) in &options.table_partition_cols {
table_fields.push(Field::new(
part,
DEFAULT_PARTITION_COLUMN_DATATYPE.clone(),
part_col_name,
partition_type_wrap(part_col_type.clone()),
false,
));
}
Expand Down Expand Up @@ -536,6 +537,23 @@ impl TableProvider for ListingTable {
return Ok(Arc::new(EmptyExec::new(false, projected_schema)));
}

// extract types of partition columns
let table_partition_cols = self
.options
.table_partition_cols
.iter()
.map(|col| {
(
col.0.to_owned(),
self.table_schema
.field_with_name(&col.0)
.unwrap()
.data_type()
.clone(),
)
})
.collect();

// create the execution plan
self.options
.format
Expand All @@ -548,7 +566,7 @@ impl TableProvider for ListingTable {
projection: projection.clone(),
limit,
output_ordering: self.try_create_output_ordering()?,
table_partition_cols: self.options.table_partition_cols.clone(),
table_partition_cols,
config_options: ctx.config.config_options(),
},
filters,
Expand All @@ -560,7 +578,15 @@ impl TableProvider for ListingTable {
&self,
filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
if expr_applicable_for_cols(&self.options.table_partition_cols, filter) {
if expr_applicable_for_cols(
&self
.options
.table_partition_cols
.iter()
.map(|x| x.0.clone())
.collect::<Vec<_>>(),
filter,
) {
// if filter can be handled by partiton pruning, it is exact
Ok(TableProviderFilterPushDown::Exact)
} else {
Expand Down Expand Up @@ -807,7 +833,10 @@ mod tests {

let opt = ListingOptions::new(Arc::new(AvroFormat {}))
.with_file_extension(FileType::AVRO.get_ext())
.with_table_partition_cols(vec![String::from("p1")])
.with_table_partition_cols(vec![(
String::from("p1"),
partition_type_wrap(DataType::Utf8),
)])
.with_target_partitions(4);

let table_path = ListingTableUrl::parse("test:///table/").unwrap();
Expand Down
38 changes: 34 additions & 4 deletions datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::datasource::listing::{
};
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
use arrow::datatypes::{DataType, SchemaRef};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
use datafusion_expr::CreateExternalTable;
Expand Down Expand Up @@ -88,17 +89,46 @@ impl TableProviderFactory for ListingTableFactory {
),
};

let provided_schema = if cmd.schema.fields().is_empty() {
None
let (provided_schema, table_partition_cols) = if cmd.schema.fields().is_empty() {
(
None,
cmd.table_partition_cols
.iter()
.map(|x| (x.clone(), DataType::Utf8))
.collect::<Vec<_>>(),
)
} else {
Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
let schema: SchemaRef = Arc::new(cmd.schema.as_ref().to_owned().into());
let table_partition_cols = cmd
.table_partition_cols
.iter()
.map(|col| {
schema.field_with_name(col).map_err(|arrow_err| {
DataFusionError::Execution(arrow_err.to_string())
})
})
.collect::<datafusion_common::Result<Vec<_>>>()?
.into_iter()
.map(|f| (f.name().to_owned(), f.data_type().to_owned()))
.collect();
// exclude partition columns to support creating partitioned external table
// with a specified column definition like
// `create external table a(c0 int, c1 int) stored as csv partitioned by (c1)...`
let mut project_idx = Vec::new();
for i in 0..schema.fields().len() {
if !cmd.table_partition_cols.contains(schema.field(i).name()) {
project_idx.push(i);
}
}
let schema = Arc::new(schema.project(&project_idx)?);
(Some(schema), table_partition_cols)
};

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config.collect_statistics)
.with_file_extension(file_extension)
.with_target_partitions(state.config.target_partitions)
.with_table_partition_cols(cmd.table_partition_cols.clone())
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(None);

let table_path = ListingTableUrl::parse(&cmd.location)?;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};

/// In-memory table
#[derive(Debug)]
pub struct MemTable {
schema: SchemaRef,
batches: Vec<Vec<RecordBatch>>,
Expand Down
Loading

0 comments on commit 55bf8e9

Please sign in to comment.