Skip to content

Commit

Permalink
Support Copy To Partitioned Files (apache#9240)
Browse files Browse the repository at this point in the history
* support copy to partitioned files

* remove print statements

* fmt

* fix tests and use err macro

* cargo doc fix

* add partition directory specific test

* try to support columns with single quotes in name
  • Loading branch information
devinjdangelo authored Feb 19, 2024
1 parent 60ee91e commit b2a0451
Show file tree
Hide file tree
Showing 12 changed files with 163 additions and 13 deletions.
25 changes: 25 additions & 0 deletions datafusion/common/src/file_options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,31 @@ impl StatementOptions {
maybe_option.map(|(_, v)| v)
}

/// Finds partition_by option if exists and parses into a `Vec<String>`.
/// If option doesn't exist, returns empty `vec![]`.
/// E.g. (partition_by 'colA, colB, colC') -> `vec!['colA','colB','colC']`
pub fn take_partition_by(&mut self) -> Vec<String> {
let partition_by = self.take_str_option("partition_by");
match partition_by {
Some(part_cols) => {
let dequoted = part_cols
.chars()
.enumerate()
.filter(|(idx, c)| {
!((*idx == 0 || *idx == part_cols.len() - 1)
&& (*c == '\'' || *c == '"'))
})
.map(|(_idx, c)| c)
.collect::<String>();
dequoted
.split(',')
.map(|s| s.trim().replace("''", "'"))
.collect::<Vec<_>>()
}
None => vec![],
}
}

/// Infers the file_type given a target and arbitrary options.
/// If the options contain an explicit "format" option, that will be used.
/// Otherwise, attempt to infer file_type from the extension of target.
Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ pub struct DataFrameWriteOptions {
/// Allows compression of CSV and JSON.
/// Not supported for parquet.
compression: CompressionTypeVariant,
/// Sets which columns should be used for hive-style partitioned writes by name.
/// Can be set to empty vec![] for non-partitioned writes.
partition_by: Vec<String>,
}

impl DataFrameWriteOptions {
Expand All @@ -82,6 +85,7 @@ impl DataFrameWriteOptions {
overwrite: false,
single_file_output: false,
compression: CompressionTypeVariant::UNCOMPRESSED,
partition_by: vec![],
}
}
/// Set the overwrite option to true or false
Expand All @@ -101,6 +105,12 @@ impl DataFrameWriteOptions {
self.compression = compression;
self
}

/// Sets the partition_by columns for output partitioning
pub fn with_partition_by(mut self, partition_by: Vec<String>) -> Self {
self.partition_by = partition_by;
self
}
}

impl Default for DataFrameWriteOptions {
Expand Down Expand Up @@ -1176,6 +1186,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::CSV,
options.partition_by,
copy_options,
)?
.build()?;
Expand Down Expand Up @@ -1219,6 +1230,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::JSON,
options.partition_by,
copy_options,
)?
.build()?;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl DataFrame {
self.plan,
path.into(),
FileType::PARQUET,
options.partition_by,
copy_options,
)?
.build()?;
Expand Down
28 changes: 17 additions & 11 deletions datafusion/core/src/datasource/file_format/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use arrow_array::cast::AsArray;
use arrow_array::{downcast_dictionary_array, RecordBatch, StringArray, StructArray};
use arrow_schema::{DataType, Schema};
use datafusion_common::cast::as_string_array;
use datafusion_common::DataFusionError;
use datafusion_common::{exec_datafusion_err, DataFusionError};

use datafusion_execution::TaskContext;

Expand Down Expand Up @@ -319,14 +319,20 @@ fn compute_partition_keys_by_row<'a>(
) -> Result<Vec<Vec<&'a str>>> {
let mut all_partition_values = vec![];

for (col, dtype) in partition_by.iter() {
// For the purposes of writing partitioned data, we can rely on schema inference
// to determine the type of the partition cols in order to provide a more ergonomic
// UI which does not require specifying DataTypes manually. So, we ignore the
// DataType within the partition_by array and infer the correct type from the
// batch schema instead.
let schema = rb.schema();
for (col, _) in partition_by.iter() {
let mut partition_values = vec![];
let col_array =
rb.column_by_name(col)
.ok_or(DataFusionError::Execution(format!(
"PartitionBy Column {} does not exist in source data!",
col
)))?;

let dtype = schema.field_with_name(col)?.data_type();
let col_array = rb.column_by_name(col).ok_or(exec_datafusion_err!(
"PartitionBy Column {} does not exist in source data! Got schema {schema}.",
col
))?;

match dtype {
DataType::Utf8 => {
Expand All @@ -339,12 +345,12 @@ fn compute_partition_keys_by_row<'a>(
downcast_dictionary_array!(
col_array => {
let array = col_array.downcast_dict::<StringArray>()
.ok_or(DataFusionError::Execution(format!("it is not yet supported to write to hive partitions with datatype {}",
dtype)))?;
.ok_or(exec_datafusion_err!("it is not yet supported to write to hive partitions with datatype {}",
dtype))?;

for val in array.values() {
partition_values.push(
val.ok_or(DataFusionError::Execution(format!("Cannot partition by null value for column {}", col)))?
val.ok_or(exec_datafusion_err!("Cannot partition by null value for column {}", col))?
);
}
},
Expand Down
10 changes: 9 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ impl DefaultPhysicalPlanner {
output_url,
file_format,
copy_options,
partition_by,
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;
let parsed_url = ListingTableUrl::parse(output_url)?;
Expand All @@ -585,13 +586,20 @@ impl DefaultPhysicalPlanner {
CopyOptions::WriterOptions(writer_options) => *writer_options.clone()
};

// Note: the DataType passed here is ignored for the purposes of writing and inferred instead
// from the schema of the RecordBatch being written. This allows COPY statements to specify only
// the column name rather than column name + explicit data type.
let table_partition_cols = partition_by.iter()
.map(|s| (s.to_string(), arrow_schema::DataType::Null))
.collect::<Vec<_>>();

// Set file sink related options
let config = FileSinkConfig {
object_store_url,
table_paths: vec![parsed_url],
file_groups: vec![],
output_schema: Arc::new(schema),
table_partition_cols: vec![],
table_partition_cols,
overwrite: false,
file_type_writer_options
};
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,14 @@ impl LogicalPlanBuilder {
input: LogicalPlan,
output_url: String,
file_format: FileType,
partition_by: Vec<String>,
copy_options: CopyOptions,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url,
file_format,
partition_by,
copy_options,
})))
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct CopyTo {
pub output_url: String,
/// The file format to output (explicitly defined or inferred from file extension)
pub file_format: FileType,
/// Detmines which, if any, columns should be used for hive-style partitioned writes
pub partition_by: Vec<String>,
/// Arbitrary options as tuples
pub copy_options: CopyOptions,
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,12 +615,13 @@ impl LogicalPlan {
input: _,
output_url,
file_format,
partition_by,
copy_options,
}) => Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(inputs.swap_remove(0)),
output_url: output_url.clone(),
file_format: file_format.clone(),

partition_by: partition_by.clone(),
copy_options: copy_options.clone(),
})),
LogicalPlan::Values(Values { schema, .. }) => {
Expand Down Expand Up @@ -1551,6 +1552,7 @@ impl LogicalPlan {
input: _,
output_url,
file_format,
partition_by: _,
copy_options,
}) => {
let op_str = match copy_options {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ impl AsLogicalPlan for LogicalPlanNode {
input: Arc::new(input),
output_url: copy.output_url.clone(),
file_format: FileType::from_str(&copy.file_type)?,
partition_by: vec![],
copy_options,
},
))
Expand Down Expand Up @@ -1641,6 +1642,7 @@ impl AsLogicalPlan for LogicalPlanNode {
output_url,
file_format,
copy_options,
partition_by: _,
}) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
input,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> {
input: Arc::new(input),
output_url: "test.csv".to_string(),
file_format: FileType::CSV,
partition_by: vec![],
copy_options: CopyOptions::SQLOptions(StatementOptions::from(&options)),
});

Expand Down Expand Up @@ -354,6 +355,7 @@ async fn roundtrip_logical_plan_copy_to_writer_options() -> Result<()> {
input: Arc::new(input),
output_url: "test.parquet".to_string(),
file_format: FileType::PARQUET,
partition_by: vec![],
copy_options: CopyOptions::WriterOptions(Box::new(
FileTypeWriterOptions::Parquet(ParquetWriterOptions::new(writer_properties)),
)),
Expand Down Expand Up @@ -402,6 +404,7 @@ async fn roundtrip_logical_plan_copy_to_arrow() -> Result<()> {
input: Arc::new(input),
output_url: "test.arrow".to_string(),
file_format: FileType::ARROW,
partition_by: vec![],
copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::Arrow(
ArrowWriterOptions::new(),
))),
Expand Down Expand Up @@ -447,6 +450,7 @@ async fn roundtrip_logical_plan_copy_to_csv() -> Result<()> {
input: Arc::new(input),
output_url: "test.csv".to_string(),
file_format: FileType::CSV,
partition_by: vec![],
copy_options: CopyOptions::WriterOptions(Box::new(FileTypeWriterOptions::CSV(
CsvWriterOptions::new(
writer_properties,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,13 +718,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

let mut statement_options = StatementOptions::new(options);
let file_format = statement_options.try_infer_file_type(&statement.target)?;
let partition_by = statement_options.take_partition_by();

let copy_options = CopyOptions::SQLOptions(statement_options);

Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url: statement.target,
file_format,
partition_by,
copy_options,
}))
}
Expand Down
84 changes: 84 additions & 0 deletions datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,90 @@ COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compressi
----
2

# Copy to directory as partitioned files
query IT
COPY source_table TO 'test_files/scratch/copy/partitioned_table1/' (format parquet, compression 'zstd(10)', partition_by 'col2');
----
2

# validate multiple partitioned parquet file output
statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet STORED AS PARQUET
LOCATION 'test_files/scratch/copy/partitioned_table1/' PARTITIONED BY (col2);

query I?
select * from validate_partitioned_parquet order by col1, col2;
----
1 Foo
2 Bar

# validate partition paths were actually generated
statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet_bar STORED AS PARQUET
LOCATION 'test_files/scratch/copy/partitioned_table1/col2=Bar';

query I
select * from validate_partitioned_parquet_bar order by col1;
----
2

# Copy to directory as partitioned files
query ITT
COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table2/'
(format parquet, compression 'zstd(10)', partition_by 'column2, column3');
----
3

# validate multiple partitioned parquet file output
statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet2 STORED AS PARQUET
LOCATION 'test_files/scratch/copy/partitioned_table2/' PARTITIONED BY (column2, column3);

query I??
select * from validate_partitioned_parquet2 order by column1,column2,column3;
----
1 a x
2 b y
3 c z

statement ok
CREATE EXTERNAL TABLE validate_partitioned_parquet_a_x STORED AS PARQUET
LOCATION 'test_files/scratch/copy/partitioned_table2/column2=a/column3=x';

query I
select * from validate_partitioned_parquet_a_x order by column1;
----
1

statement ok
create table test ("'test'" varchar, "'test2'" varchar, "'test3'" varchar);

query TTT
insert into test VALUES ('a', 'x', 'aa'), ('b','y', 'bb'), ('c', 'z', 'cc')
----
3

query T
select "'test'" from test
----
a
b
c

# Note to place a single ' inside of a literal string escape by putting two ''
query TTT
copy test to 'test_files/scratch/copy/escape_quote' (format csv, partition_by '''test2'',''test3''')
----
3

statement ok
CREATE EXTERNAL TABLE validate_partitioned_escape_quote STORED AS CSV
LOCATION 'test_files/scratch/copy/escape_quote/' PARTITIONED BY ("'test2'", "'test3'");

# This triggers a panic (index out of bounds)
#query
#select * from validate_partitioned_escape_quote;

query TT
EXPLAIN COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, compression 'zstd(10)');
----
Expand Down

0 comments on commit b2a0451

Please sign in to comment.