Skip to content

Commit

Permalink
this commit contains:
Browse files Browse the repository at this point in the history
 - separate options by prefix 'hive.'
 - add hive_options to CopyTo struct
 - add more documentation
 - add session execution flag to enable feature, false by default
  • Loading branch information
Héctor Veiga Ortiz committed Jun 25, 2024
1 parent 556d1d1 commit 3ec3ecc
Show file tree
Hide file tree
Showing 14 changed files with 56 additions and 19 deletions.
3 changes: 3 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,9 @@ config_namespace! {
/// statistics into the same file groups.
/// Currently experimental
pub split_file_groups_by_statistics: bool, default = false

/// Should Datafusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false
}
}

Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,7 @@ impl DataFrame {
FormatOptions::CSV(props),
HashMap::new(),
options.partition_by,
Default::default(),
)?
.build()?;

Expand Down Expand Up @@ -1393,6 +1394,7 @@ impl DataFrame {
FormatOptions::JSON(props),
Default::default(),
options.partition_by,
Default::default(),
)?
.build()?;

Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl DataFrame {
FormatOptions::PARQUET(props),
Default::default(),
options.partition_by,
Default::default(),
)?
.build()?;
DataFrame {
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,7 @@ impl TableProvider for ListingTable {
.await?;

let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
let keep_partition_by_columns = state.config().options().execution.keep_partition_by_columns;

// Sink related option, apart from format
let config = FileSinkConfig {
Expand All @@ -926,7 +927,7 @@ impl TableProvider for ListingTable {
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
overwrite,
keep_partition_by_columns: false,
keep_partition_by_columns,
};

let unsorted: Vec<Vec<Expr>> = vec![];
Expand Down
20 changes: 7 additions & 13 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ impl DefaultPhysicalPlanner {
format_options,
partition_by,
options: source_option_tuples,
hive_options
}) => {
let input_exec = children.one()?;
let parsed_url = ListingTableUrl::parse(output_url)?;
Expand All @@ -777,17 +778,10 @@ impl DefaultPhysicalPlanner {
.map(|s| (s.to_string(), arrow_schema::DataType::Null))
.collect::<Vec<_>>();

let keep_partition_by_columns = source_option_tuples
.get("format.keep_partition_by_columns")
let keep_partition_by_columns = hive_options
.get("hive.keep_partition_by_columns")
.map(|v| v.trim() == "true")
.unwrap_or(false);

let mut updated_source_options_tuples = HashMap::new();
for (k, v) in source_option_tuples {
if k != "format.keep_partition_by_columns" {
updated_source_options_tuples.insert(k.clone(), v.clone());
}
}
.unwrap_or(false) || session_state.config().options().execution.keep_partition_by_columns;

// Set file sink related options
let config = FileSinkConfig {
Expand All @@ -804,20 +798,20 @@ impl DefaultPhysicalPlanner {
FormatOptions::CSV(options) => {
table_options.csv = options.clone();
table_options.set_file_format(FileType::CSV);
table_options.alter_with_string_hash_map(&updated_source_options_tuples)?;
table_options.alter_with_string_hash_map(source_option_tuples)?;
Arc::new(CsvFormat::default().with_options(table_options.csv))
}
FormatOptions::JSON(options) => {
table_options.json = options.clone();
table_options.set_file_format(FileType::JSON);
table_options.alter_with_string_hash_map(&updated_source_options_tuples)?;
table_options.alter_with_string_hash_map(source_option_tuples)?;
Arc::new(JsonFormat::default().with_options(table_options.json))
}
#[cfg(feature = "parquet")]
FormatOptions::PARQUET(options) => {
table_options.parquet = options.clone();
table_options.set_file_format(FileType::PARQUET);
table_options.alter_with_string_hash_map(&updated_source_options_tuples)?;
table_options.alter_with_string_hash_map(source_option_tuples)?;
Arc::new(
ParquetFormat::default().with_options(table_options.parquet),
)
Expand Down
4 changes: 3 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,15 @@ impl LogicalPlanBuilder {
format_options: FormatOptions,
options: HashMap<String, String>,
partition_by: Vec<String>,
hive_options: HashMap<String, String>,
) -> Result<Self> {
Ok(Self::from(LogicalPlan::Copy(CopyTo {
input: Arc::new(input),
output_url,
partition_by,
format_options,
options,
partition_by,
hive_options,
})))
}

Expand Down
9 changes: 8 additions & 1 deletion datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,17 +428,24 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
format_options,
partition_by: _,
options,
hive_options,
}) => {
let op_str = options
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(", ");
let hive_op_str = hive_options
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(", ");
json!({
"Node Type": "CopyTo",
"Output URL": output_url,
"Format Options": format!("{}", format_options),
"Options": op_str
"Options": op_str,
"Hive Options": hive_op_str,
})
}
LogicalPlan::Ddl(ddl) => {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct CopyTo {
pub format_options: FormatOptions,
/// SQL Options that can affect the formats
pub options: HashMap<String, String>,
/// Hive Options that can affect hive-style partitioning
pub hive_options: HashMap<String, String>,
}

// Implement PartialEq manually
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -860,12 +860,14 @@ impl LogicalPlan {
format_options,
options,
partition_by,
hive_options,
}) => Ok(LogicalPlan::Copy(CopyTo {
input: Arc::new(inputs.swap_remove(0)),
output_url: output_url.clone(),
format_options: format_options.clone(),
options: options.clone(),
partition_by: partition_by.clone(),
hive_options: hive_options.clone(),
})),
LogicalPlan::Values(Values { schema, .. }) => {
Ok(LogicalPlan::Values(Values {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,15 @@ impl TreeNode for LogicalPlan {
partition_by,
format_options,
options,
hive_options,
}) => rewrite_arc(input, f)?.update_data(|input| {
LogicalPlan::Copy(CopyTo {
input,
output_url,
partition_by,
format_options,
options,
hive_options,
})
}),
LogicalPlan::Ddl(ddl) => {
Expand Down
12 changes: 11 additions & 1 deletion datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,14 +881,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
};

let mut options = HashMap::new();
let mut hive_options = HashMap::new();
for (key, value) in statement.options {
let value_string = match value_to_string(&value) {
None => {
return plan_err!("Unsupported Value in COPY statement {}", value);
}
Some(v) => v,
};
if !(&key.contains('.')) {

if key.to_lowercase().contains("keep_partition_by_columns") {
let renamed_key = if !&key.starts_with("hive.") {
format!("hive.{}", key)
} else {
key
};
hive_options.insert(renamed_key.to_lowercase(), value_string.to_lowercase());
} else if !(&key.contains('.')) {
// If config does not belong to any namespace, assume it is
// a format option and apply the format prefix for backwards
// compatibility.
Expand Down Expand Up @@ -941,6 +950,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
format_options: file_type.into(),
partition_by,
options,
hive_options,
}))
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/copy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ physical_plan
# Copy to directory as partitioned files with keep_partition_by_columns enabled
query TT
COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1)
OPTIONS ('keep_partition_by_columns' true);
OPTIONS (KEEP_PARTITION_BY_COLUMNS true);
----
3

Expand Down
3 changes: 2 additions & 1 deletion docs/source/user-guide/sql/dml.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ clause is not specified, it will be inferred from the file extension if possible
`PARTITIONED BY` specifies the columns to use for partitioning the output files into
separate hive-style directories. By default, columns used in `PARTITIONED BY` will be removed
from the output format. If you want to keep the columns, you should provide the option
`keep_partition_by_columns true`.
`KEEP_PARTITION_BY_COLUMNS true`. `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled
through `ExecutionOptions` within `SessionConfig`.

The output format is determined by the first match of the following rules:

Expand Down
10 changes: 10 additions & 0 deletions docs/source/user-guide/sql/write_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ In this example, we write the entirety of `source_table` out to a folder of parq

## Available Options

### Hive Specific Options

The following options are available when writing hive-style partitioned data.

| Option | Description | Default Value |
|---------------------------|------------------------------------------------------------------------------------|---------------|
| KEEP_PARTITION_BY_COLUMNS | Flag to retain the columns in the output data when using `PARTITIONED BY` queries. | false |

Note: `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled through `ExecutionOptions` within `SessionConfig`.

### JSON Format Specific Options

The following options are available when writing JSON files. Note: If any unsupported option is specified, an error will be raised and the query will fail.
Expand Down

0 comments on commit 3ec3ecc

Please sign in to comment.