diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 47da14574c5d..d251f568fc27 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -303,6 +303,9 @@ config_namespace! { /// statistics into the same file groups. /// Currently experimental pub split_file_groups_by_statistics: bool, default = false + + /// Should Datafusion keep the columns used for partition_by in the output RecordBatches + pub keep_partition_by_columns: bool, default = false } } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index e1fc8273e6ff..88fd0cedf483 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1338,6 +1338,7 @@ impl DataFrame { FormatOptions::CSV(props), HashMap::new(), options.partition_by, + Default::default(), )? .build()?; @@ -1393,6 +1394,7 @@ impl DataFrame { FormatOptions::JSON(props), Default::default(), options.partition_by, + Default::default(), )? .build()?; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 0ec46df0ae5d..0299ddecfd4c 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -66,6 +66,7 @@ impl DataFrame { FormatOptions::PARQUET(props), Default::default(), options.partition_by, + Default::default(), )? .build()?; DataFrame { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 0c8238a615c2..26bb7740e9de 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -917,6 +917,7 @@ impl TableProvider for ListingTable { .await?; let file_groups = file_list_stream.try_collect::>().await?; + let keep_partition_by_columns = state.config().options().execution.keep_partition_by_columns; // Sink related option, apart from format let config = FileSinkConfig { @@ -926,7 +927,7 @@ impl TableProvider for ListingTable { output_schema: self.schema(), table_partition_cols: self.options.table_partition_cols.clone(), overwrite, - keep_partition_by_columns: false, + keep_partition_by_columns, }; let unsorted: Vec> = vec![]; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index d1005f2e8433..ff99e31d0b85 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -762,6 +762,7 @@ impl DefaultPhysicalPlanner { format_options, partition_by, options: source_option_tuples, + hive_options }) => { let input_exec = children.one()?; let parsed_url = ListingTableUrl::parse(output_url)?; @@ -777,17 +778,10 @@ impl DefaultPhysicalPlanner { .map(|s| (s.to_string(), arrow_schema::DataType::Null)) .collect::>(); - let keep_partition_by_columns = source_option_tuples - .get("format.keep_partition_by_columns") + let keep_partition_by_columns = hive_options + .get("hive.keep_partition_by_columns") .map(|v| v.trim() == "true") - .unwrap_or(false); - - let mut updated_source_options_tuples = HashMap::new(); - for (k, v) in source_option_tuples { - if k != "format.keep_partition_by_columns" { - updated_source_options_tuples.insert(k.clone(), v.clone()); - } - } + .unwrap_or(false) || session_state.config().options().execution.keep_partition_by_columns; // Set file sink related options let config = FileSinkConfig { @@ -804,20 +798,20 @@ impl DefaultPhysicalPlanner { FormatOptions::CSV(options) => { table_options.csv = options.clone(); table_options.set_file_format(FileType::CSV); - table_options.alter_with_string_hash_map(&updated_source_options_tuples)?; + table_options.alter_with_string_hash_map(source_option_tuples)?; Arc::new(CsvFormat::default().with_options(table_options.csv)) } FormatOptions::JSON(options) => { table_options.json = options.clone(); table_options.set_file_format(FileType::JSON); - table_options.alter_with_string_hash_map(&updated_source_options_tuples)?; + table_options.alter_with_string_hash_map(source_option_tuples)?; Arc::new(JsonFormat::default().with_options(table_options.json)) } #[cfg(feature = "parquet")] FormatOptions::PARQUET(options) => { table_options.parquet = options.clone(); table_options.set_file_format(FileType::PARQUET); - table_options.alter_with_string_hash_map(&updated_source_options_tuples)?; + table_options.alter_with_string_hash_map(source_option_tuples)?; Arc::new( ParquetFormat::default().with_options(table_options.parquet), ) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 434f4dace1de..d94b071a3127 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -274,13 +274,15 @@ impl LogicalPlanBuilder { format_options: FormatOptions, options: HashMap, partition_by: Vec, + hive_options: HashMap, ) -> Result { Ok(Self::from(LogicalPlan::Copy(CopyTo { input: Arc::new(input), output_url, + partition_by, format_options, options, - partition_by, + hive_options, }))) } diff --git a/datafusion/expr/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs index 707cff8ab5f1..308ca1e0da20 100644 --- a/datafusion/expr/src/logical_plan/display.rs +++ b/datafusion/expr/src/logical_plan/display.rs @@ -428,17 +428,24 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> { format_options, partition_by: _, options, + hive_options, }) => { let op_str = options .iter() .map(|(k, v)| format!("{}={}", k, v)) .collect::>() .join(", "); + let hive_op_str = hive_options + .iter() + .map(|(k, v)| format!("{}={}", k, v)) + .collect::>() + .join(", "); json!({ "Node Type": "CopyTo", "Output URL": output_url, "Format Options": format!("{}", format_options), - "Options": op_str + "Options": op_str, + "Hive Options": hive_op_str, }) } LogicalPlan::Ddl(ddl) => { diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 13f3759ab8c0..2eb085087484 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -39,6 +39,8 @@ pub struct CopyTo { pub format_options: FormatOptions, /// SQL Options that can affect the formats pub options: HashMap, + /// Hive Options that can affect hive-style partitioning + pub hive_options: HashMap, } // Implement PartialEq manually diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 6e7efaf39e3e..fd445bedabe3 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -860,12 +860,14 @@ impl LogicalPlan { format_options, options, partition_by, + hive_options, }) => Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(inputs.swap_remove(0)), output_url: output_url.clone(), format_options: format_options.clone(), options: options.clone(), partition_by: partition_by.clone(), + hive_options: hive_options.clone(), })), LogicalPlan::Values(Values { schema, .. }) => { Ok(LogicalPlan::Values(Values { diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 86c0cffd80a1..1d6d0fe07760 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -258,6 +258,7 @@ impl TreeNode for LogicalPlan { partition_by, format_options, options, + hive_options, }) => rewrite_arc(input, f)?.update_data(|input| { LogicalPlan::Copy(CopyTo { input, @@ -265,6 +266,7 @@ impl TreeNode for LogicalPlan { partition_by, format_options, options, + hive_options, }) }), LogicalPlan::Ddl(ddl) => { diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index cb492b390c76..f062f313ad10 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -881,6 +881,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { }; let mut options = HashMap::new(); + let mut hive_options = HashMap::new(); for (key, value) in statement.options { let value_string = match value_to_string(&value) { None => { @@ -888,7 +889,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } Some(v) => v, }; - if !(&key.contains('.')) { + + if key.to_lowercase().contains("keep_partition_by_columns") { + let renamed_key = if !&key.starts_with("hive.") { + format!("hive.{}", key) + } else { + key + }; + hive_options.insert(renamed_key.to_lowercase(), value_string.to_lowercase()); + } else if !(&key.contains('.')) { // If config does not belong to any namespace, assume it is // a format option and apply the format prefix for backwards // compatibility. @@ -941,6 +950,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { format_options: file_type.into(), partition_by, options, + hive_options, })) } diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 1f012d6e8850..8ed3b88016cf 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -169,7 +169,7 @@ physical_plan # Copy to directory as partitioned files with keep_partition_by_columns enabled query TT COPY (values ('1', 'a'), ('2', 'b'), ('3', 'c')) TO 'test_files/scratch/copy/partitioned_table4/' STORED AS parquet PARTITIONED BY (column1) -OPTIONS ('keep_partition_by_columns' true); +OPTIONS (KEEP_PARTITION_BY_COLUMNS true); ---- 3 diff --git a/docs/source/user-guide/sql/dml.md b/docs/source/user-guide/sql/dml.md index c1c609261898..61d63bf20e80 100644 --- a/docs/source/user-guide/sql/dml.md +++ b/docs/source/user-guide/sql/dml.md @@ -41,7 +41,8 @@ clause is not specified, it will be inferred from the file extension if possible `PARTITIONED BY` specifies the columns to use for partitioning the output files into separate hive-style directories. By default, columns used in `PARTITIONED BY` will be removed from the output format. If you want to keep the columns, you should provide the option -`keep_partition_by_columns true`. +`KEEP_PARTITION_BY_COLUMNS true`. `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled +through `ExecutionOptions` within `SessionConfig`. The output format is determined by the first match of the following rules: diff --git a/docs/source/user-guide/sql/write_options.md b/docs/source/user-guide/sql/write_options.md index 3c4790dd0255..89e6d08efee1 100644 --- a/docs/source/user-guide/sql/write_options.md +++ b/docs/source/user-guide/sql/write_options.md @@ -70,6 +70,16 @@ In this example, we write the entirety of `source_table` out to a folder of parq ## Available Options +### Hive Specific Options + +The following options are available when writing hive-style partitioned data. + +| Option | Description | Default Value | +|---------------------------|------------------------------------------------------------------------------------|---------------| +| KEEP_PARTITION_BY_COLUMNS | Flag to retain the columns in the output data when using `PARTITIONED BY` queries. | false | + +Note: `KEEP_PARTITION_BY_COLUMNS` flag can also be enabled through `ExecutionOptions` within `SessionConfig`. + ### JSON Format Specific Options The following options are available when writing JSON files. Note: If any unsupported option is specified, an error will be raised and the query will fail.