From 7af77cec58c3e95980d7a4d4a181c8e5f9dbb938 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Wed, 13 Mar 2024 17:33:27 +0300 Subject: [PATCH 1/7] COPY TO allign with CREATE EXTERNAL TABLE --- datafusion/common/src/config.rs | 83 +++----- datafusion/common/src/file_options/mod.rs | 85 ++++---- .../src/datasource/listing_table_factory.rs | 2 +- datafusion/core/src/execution/context/mod.rs | 6 +- datafusion/core/tests/sql/sql_api.rs | 5 +- .../tests/cases/roundtrip_logical_plan.rs | 9 +- datafusion/sql/src/parser.rs | 199 ++++++++++++++---- datafusion/sql/src/statement.rs | 112 +++------- datafusion/sql/tests/sql_integration.rs | 6 +- .../sqllogictest/test_files/clickbench.slt | 2 +- datafusion/sqllogictest/test_files/copy.slt | 163 +++++++------- .../test_files/create_external_table.slt | 4 +- .../sqllogictest/test_files/csv_files.slt | 10 +- .../sqllogictest/test_files/group_by.slt | 8 +- .../sqllogictest/test_files/parquet.slt | 8 +- .../sqllogictest/test_files/repartition.slt | 2 +- .../test_files/repartition_scan.slt | 8 +- .../test_files/schema_evolution.slt | 8 +- docs/source/user-guide/sql/dml.md | 2 +- 19 files changed, 377 insertions(+), 345 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 72d51cb15a88..153514e738d9 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1131,10 +1131,21 @@ impl ConfigField for TableOptions { fn set(&mut self, key: &str, value: &str) -> Result<()> { // Extensions are handled in the public `ConfigOptions::set` let (key, rem) = key.split_once('.').unwrap_or((key, "")); + let format = if let Some(format) = &self.current_format { + format + } else { + return _config_err!("Specify a format for TableOptions"); + }; match key { - "csv" => self.csv.set(rem, value), - "parquet" => self.parquet.set(rem, value), - "json" => self.json.set(rem, value), + "format" => match format { + #[cfg(feature = "parquet")] + FileType::PARQUET => self.parquet.set(rem, value), + FileType::CSV => self.csv.set(rem, value), + FileType::JSON => self.json.set(rem, value), + _ => { + _config_err!("Config value \"{key}\" is not supported on {}", format) + } + }, _ => _config_err!("Config value \"{key}\" not found on TableOptions"), } } @@ -1170,28 +1181,7 @@ impl TableOptions { )) })?; - if prefix == "csv" || prefix == "json" || prefix == "parquet" { - if let Some(format) = &self.current_format { - match format { - FileType::CSV if prefix != "csv" => { - return Err(DataFusionError::Configuration(format!( - "Key \"{key}\" is not applicable for CSV format" - ))) - } - #[cfg(feature = "parquet")] - FileType::PARQUET if prefix != "parquet" => { - return Err(DataFusionError::Configuration(format!( - "Key \"{key}\" is not applicable for PARQUET format" - ))) - } - FileType::JSON if prefix != "json" => { - return Err(DataFusionError::Configuration(format!( - "Key \"{key}\" is not applicable for JSON format" - ))) - } - _ => {} - } - } + if prefix == "format" { return ConfigField::set(self, key, value); } @@ -1251,9 +1241,7 @@ impl TableOptions { } let mut v = Visitor(vec![]); - self.visit(&mut v, "csv", ""); - self.visit(&mut v, "json", ""); - self.visit(&mut v, "parquet", ""); + self.visit(&mut v, "format", ""); v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries())); v.0 @@ -1558,6 +1546,7 @@ mod tests { use crate::config::{ ConfigEntry, ConfigExtension, ExtensionOptions, Extensions, TableOptions, }; + use crate::FileType; #[derive(Default, Debug, Clone)] pub struct TestExtensionConfig { @@ -1611,12 +1600,13 @@ mod tests { } #[test] - fn alter_kafka_config() { + fn alter_test_extension_config() { let mut extension = Extensions::new(); extension.insert(TestExtensionConfig::default()); let mut table_config = TableOptions::new().with_extensions(extension); - table_config.set("parquet.write_batch_size", "10").unwrap(); - assert_eq!(table_config.parquet.global.write_batch_size, 10); + table_config.set_file_format(FileType::CSV); + table_config.set("format.delimiter", ";").unwrap(); + assert_eq!(table_config.csv.delimiter, b';'); table_config.set("test.bootstrap.servers", "asd").unwrap(); let kafka_config = table_config .extensions @@ -1628,38 +1618,15 @@ mod tests { ); } - #[test] - fn parquet_table_options() { - let mut table_config = TableOptions::new(); - table_config - .set("parquet.bloom_filter_enabled::col1", "true") - .unwrap(); - assert_eq!( - table_config.parquet.column_specific_options["col1"].bloom_filter_enabled, - Some(true) - ); - } - #[test] fn csv_u8_table_options() { let mut table_config = TableOptions::new(); - table_config.set("csv.delimiter", ";").unwrap(); + table_config.set_file_format(FileType::CSV); + table_config.set("format.delimiter", ";").unwrap(); assert_eq!(table_config.csv.delimiter as char, ';'); - table_config.set("csv.escape", "\"").unwrap(); + table_config.set("format.escape", "\"").unwrap(); assert_eq!(table_config.csv.escape.unwrap() as char, '"'); - table_config.set("csv.escape", "\'").unwrap(); + table_config.set("format.escape", "\'").unwrap(); assert_eq!(table_config.csv.escape.unwrap() as char, '\''); } - - #[test] - fn parquet_table_options_config_entry() { - let mut table_config = TableOptions::new(); - table_config - .set("parquet.bloom_filter_enabled::col1", "true") - .unwrap(); - let entries = table_config.entries(); - assert!(entries - .iter() - .any(|item| item.key == "parquet.bloom_filter_enabled::col1")) - } } diff --git a/datafusion/common/src/file_options/mod.rs b/datafusion/common/src/file_options/mod.rs index a72b812adc8d..eb1ce1b364fd 100644 --- a/datafusion/common/src/file_options/mod.rs +++ b/datafusion/common/src/file_options/mod.rs @@ -35,7 +35,7 @@ mod tests { config::TableOptions, file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, parsers::CompressionTypeVariant, - Result, + FileType, Result, }; use parquet::{ @@ -47,35 +47,36 @@ mod tests { #[test] fn test_writeroptions_parquet_from_statement_options() -> Result<()> { let mut option_map: HashMap = HashMap::new(); - option_map.insert("parquet.max_row_group_size".to_owned(), "123".to_owned()); - option_map.insert("parquet.data_pagesize_limit".to_owned(), "123".to_owned()); - option_map.insert("parquet.write_batch_size".to_owned(), "123".to_owned()); - option_map.insert("parquet.writer_version".to_owned(), "2.0".to_owned()); + option_map.insert("format.max_row_group_size".to_owned(), "123".to_owned()); + option_map.insert("format.data_pagesize_limit".to_owned(), "123".to_owned()); + option_map.insert("format.write_batch_size".to_owned(), "123".to_owned()); + option_map.insert("format.writer_version".to_owned(), "2.0".to_owned()); option_map.insert( - "parquet.dictionary_page_size_limit".to_owned(), + "format.dictionary_page_size_limit".to_owned(), "123".to_owned(), ); option_map.insert( - "parquet.created_by".to_owned(), + "format.created_by".to_owned(), "df write unit test".to_owned(), ); option_map.insert( - "parquet.column_index_truncate_length".to_owned(), + "format.column_index_truncate_length".to_owned(), "123".to_owned(), ); option_map.insert( - "parquet.data_page_row_count_limit".to_owned(), + "format.data_page_row_count_limit".to_owned(), "123".to_owned(), ); - option_map.insert("parquet.bloom_filter_enabled".to_owned(), "true".to_owned()); - option_map.insert("parquet.encoding".to_owned(), "plain".to_owned()); - option_map.insert("parquet.dictionary_enabled".to_owned(), "true".to_owned()); - option_map.insert("parquet.compression".to_owned(), "zstd(4)".to_owned()); - option_map.insert("parquet.statistics_enabled".to_owned(), "page".to_owned()); - option_map.insert("parquet.bloom_filter_fpp".to_owned(), "0.123".to_owned()); - option_map.insert("parquet.bloom_filter_ndv".to_owned(), "123".to_owned()); + option_map.insert("format.bloom_filter_enabled".to_owned(), "true".to_owned()); + option_map.insert("format.encoding".to_owned(), "plain".to_owned()); + option_map.insert("format.dictionary_enabled".to_owned(), "true".to_owned()); + option_map.insert("format.compression".to_owned(), "zstd(4)".to_owned()); + option_map.insert("format.statistics_enabled".to_owned(), "page".to_owned()); + option_map.insert("format.bloom_filter_fpp".to_owned(), "0.123".to_owned()); + option_map.insert("format.bloom_filter_ndv".to_owned(), "123".to_owned()); let mut table_config = TableOptions::new(); + table_config.set_file_format(FileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?; @@ -131,54 +132,52 @@ mod tests { let mut option_map: HashMap = HashMap::new(); option_map.insert( - "parquet.bloom_filter_enabled::col1".to_owned(), + "format.bloom_filter_enabled::col1".to_owned(), "true".to_owned(), ); option_map.insert( - "parquet.bloom_filter_enabled::col2.nested".to_owned(), + "format.bloom_filter_enabled::col2.nested".to_owned(), "true".to_owned(), ); - option_map.insert("parquet.encoding::col1".to_owned(), "plain".to_owned()); - option_map.insert("parquet.encoding::col2.nested".to_owned(), "rle".to_owned()); + option_map.insert("format.encoding::col1".to_owned(), "plain".to_owned()); + option_map.insert("format.encoding::col2.nested".to_owned(), "rle".to_owned()); option_map.insert( - "parquet.dictionary_enabled::col1".to_owned(), + "format.dictionary_enabled::col1".to_owned(), "true".to_owned(), ); option_map.insert( - "parquet.dictionary_enabled::col2.nested".to_owned(), + "format.dictionary_enabled::col2.nested".to_owned(), "true".to_owned(), ); - option_map.insert("parquet.compression::col1".to_owned(), "zstd(4)".to_owned()); + option_map.insert("format.compression::col1".to_owned(), "zstd(4)".to_owned()); option_map.insert( - "parquet.compression::col2.nested".to_owned(), + "format.compression::col2.nested".to_owned(), "zstd(10)".to_owned(), ); option_map.insert( - "parquet.statistics_enabled::col1".to_owned(), + "format.statistics_enabled::col1".to_owned(), "page".to_owned(), ); option_map.insert( - "parquet.statistics_enabled::col2.nested".to_owned(), + "format.statistics_enabled::col2.nested".to_owned(), "none".to_owned(), ); option_map.insert( - "parquet.bloom_filter_fpp::col1".to_owned(), + "format.bloom_filter_fpp::col1".to_owned(), "0.123".to_owned(), ); option_map.insert( - "parquet.bloom_filter_fpp::col2.nested".to_owned(), + "format.bloom_filter_fpp::col2.nested".to_owned(), "0.456".to_owned(), ); + option_map.insert("format.bloom_filter_ndv::col1".to_owned(), "123".to_owned()); option_map.insert( - "parquet.bloom_filter_ndv::col1".to_owned(), - "123".to_owned(), - ); - option_map.insert( - "parquet.bloom_filter_ndv::col2.nested".to_owned(), + "format.bloom_filter_ndv::col2.nested".to_owned(), "456".to_owned(), ); let mut table_config = TableOptions::new(); + table_config.set_file_format(FileType::PARQUET); table_config.alter_with_string_hash_map(&option_map)?; let parquet_options = ParquetWriterOptions::try_from(&table_config.parquet)?; @@ -271,16 +270,17 @@ mod tests { // for StatementOptions fn test_writeroptions_csv_from_statement_options() -> Result<()> { let mut option_map: HashMap = HashMap::new(); - option_map.insert("csv.has_header".to_owned(), "true".to_owned()); - option_map.insert("csv.date_format".to_owned(), "123".to_owned()); - option_map.insert("csv.datetime_format".to_owned(), "123".to_owned()); - option_map.insert("csv.timestamp_format".to_owned(), "2.0".to_owned()); - option_map.insert("csv.time_format".to_owned(), "123".to_owned()); - option_map.insert("csv.null_value".to_owned(), "123".to_owned()); - option_map.insert("csv.compression".to_owned(), "gzip".to_owned()); - option_map.insert("csv.delimiter".to_owned(), ";".to_owned()); + option_map.insert("format.has_header".to_owned(), "true".to_owned()); + option_map.insert("format.date_format".to_owned(), "123".to_owned()); + option_map.insert("format.datetime_format".to_owned(), "123".to_owned()); + option_map.insert("format.timestamp_format".to_owned(), "2.0".to_owned()); + option_map.insert("format.time_format".to_owned(), "123".to_owned()); + option_map.insert("format.null_value".to_owned(), "123".to_owned()); + option_map.insert("format.compression".to_owned(), "gzip".to_owned()); + option_map.insert("format.delimiter".to_owned(), ";".to_owned()); let mut table_config = TableOptions::new(); + table_config.set_file_format(FileType::CSV); table_config.alter_with_string_hash_map(&option_map)?; let csv_options = CsvWriterOptions::try_from(&table_config.csv)?; @@ -299,9 +299,10 @@ mod tests { // for StatementOptions fn test_writeroptions_json_from_statement_options() -> Result<()> { let mut option_map: HashMap = HashMap::new(); - option_map.insert("json.compression".to_owned(), "gzip".to_owned()); + option_map.insert("format.compression".to_owned(), "gzip".to_owned()); let mut table_config = TableOptions::new(); + table_config.set_file_format(FileType::JSON); table_config.alter_with_string_hash_map(&option_map)?; let json_options = JsonWriterOptions::try_from(&table_config.json)?; diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 4e126bbba9f9..ef8ee708ebf1 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -227,7 +227,7 @@ mod tests { let name = OwnedTableReference::bare("foo".to_string()); let mut options = HashMap::new(); - options.insert("csv.schema_infer_max_rec".to_owned(), "1000".to_owned()); + options.insert("format.schema_infer_max_rec".to_owned(), "1000".to_owned()); let cmd = CreateExternalTable { name, location: csv_file.path().to_str().unwrap().to_string(), diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 8bc65a0ca2cc..84aa79eb7da5 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1785,11 +1785,7 @@ impl SessionState { .0 .insert(ObjectName(vec![Ident::from(table.name.as_str())])); } - DFStatement::CopyTo(CopyToStatement { - source, - target: _, - options: _, - }) => match source { + DFStatement::CopyTo(CopyToStatement { source, .. }) => match source { CopyToSource::Relation(table_name) => { visitor.insert(table_name); } diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index d7adc9611b2f..f452be64bfa9 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -67,7 +67,10 @@ async fn unsupported_copy_returns_error() { let options = SQLOptions::new().with_allow_dml(false); - let sql = format!("copy (values(1)) to '{}'", tmpfile.to_string_lossy()); + let sql = format!( + "copy (values(1)) to '{}' STORED AS parquet", + tmpfile.to_string_lossy() + ); let df = ctx.sql_with_options(&sql, options).await; assert_eq!( df.unwrap_err().strip_backtrace(), diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index 2c8cf07e9eff..60a5f28ae666 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -35,7 +35,7 @@ use datafusion_common::config::{FormatOptions, TableOptions}; use datafusion_common::scalar::ScalarStructBuilder; use datafusion_common::{ internal_err, not_impl_err, plan_err, DFField, DFSchema, DFSchemaRef, - DataFusionError, Result, ScalarValue, + DataFusionError, FileType, Result, ScalarValue, }; use datafusion_expr::dml::CopyTo; use datafusion_expr::expr::{ @@ -314,10 +314,9 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> { let ctx = SessionContext::new(); let input = create_csv_scan(&ctx).await?; - - let mut table_options = - TableOptions::default_from_session_config(ctx.state().config_options()); - table_options.set("csv.delimiter", ";")?; + let mut table_options = ctx.state().default_table_options().clone(); + table_options.set_file_format(FileType::CSV); + table_options.set("format.delimiter", ";")?; let plan = LogicalPlan::Copy(CopyTo { input: Arc::new(input), diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index effc1d096cfd..d5b08d385544 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -102,6 +102,12 @@ pub struct CopyToStatement { pub source: CopyToSource, /// The URL to where the data is heading pub target: String, + /// Partitioned BY + pub partitioned_by: Vec, + /// CSV Header row? + pub has_header: bool, + /// File type (Parquet, NDJSON, CSV, etc) + pub file_type: String, /// Target specific options pub options: Vec<(String, Value)>, } @@ -111,15 +117,25 @@ impl fmt::Display for CopyToStatement { let Self { source, target, + partitioned_by, + file_type, options, + .. } = self; - write!(f, "COPY {source} TO {target}")?; + write!(f, "COPY {source} TO {target} STORED AS {}", file_type)?; + if !partitioned_by.is_empty() { + write!(f, " PARTITIONED BY ({})", partitioned_by.join(", "))?; + } + + if self.has_header { + write!(f, " WITH HEADER ROW")?; + } if !options.is_empty() { let opts: Vec<_> = options.iter().map(|(k, v)| format!("{k} {v}")).collect(); // print them in sorted order - write!(f, " ({})", opts.join(", "))?; + write!(f, " OPTIONS ({})", opts.join(", "))?; } Ok(()) @@ -243,6 +259,15 @@ impl fmt::Display for Statement { } } +fn ensure_not_set(field: &Option, name: &str) -> Result<(), ParserError> { + if field.is_some() { + return Err(ParserError::ParserError(format!( + "{name} specified more than once", + ))); + } + Ok(()) +} + /// Datafusion SQL Parser based on [`sqlparser`] /// /// Parses DataFusion's SQL dialect, often delegating to [`sqlparser`]'s [`Parser`]. @@ -370,21 +395,85 @@ impl<'a> DFParser<'a> { CopyToSource::Relation(table_name) }; - self.parser.expect_keyword(Keyword::TO)?; + #[derive(Default)] + struct Builder { + file_type: Option, + target: Option, + partitioned_by: Option>, + has_header: Option, + options: Option>, + } - let target = self.parser.parse_literal_string()?; + let mut builder = Builder::default(); - // check for options in parens - let options = if self.parser.peek_token().token == Token::LParen { - self.parse_value_options()? - } else { - vec![] - }; + loop { + if let Some(keyword) = self.parser.parse_one_of_keywords(&[ + Keyword::STORED, + Keyword::TO, + Keyword::PARTITIONED, + Keyword::OPTIONS, + Keyword::WITH, + ]) { + match keyword { + Keyword::STORED => { + self.parser.expect_keyword(Keyword::AS)?; + ensure_not_set(&builder.file_type, "STORED AS")?; + builder.file_type = Some(self.parse_file_format()?); + } + Keyword::TO => { + ensure_not_set(&builder.target, "TO")?; + builder.target = Some(self.parser.parse_literal_string()?); + } + Keyword::WITH => { + self.parser.expect_keyword(Keyword::HEADER)?; + self.parser.expect_keyword(Keyword::ROW)?; + ensure_not_set(&builder.has_header, "WITH HEADER ROW")?; + builder.has_header = Some(true); + } + Keyword::PARTITIONED => { + self.parser.expect_keyword(Keyword::BY)?; + ensure_not_set(&builder.partitioned_by, "PARTITIONED BY")?; + builder.partitioned_by = Some(self.parse_partitions()?); + } + Keyword::OPTIONS => { + ensure_not_set(&builder.options, "OPTIONS")?; + builder.options = Some(self.parse_value_options()?); + } + _ => { + unreachable!() + } + } + } else { + let token = self.parser.next_token(); + if token == Token::EOF || token == Token::SemiColon { + break; + } else { + return Err(ParserError::ParserError(format!( + "Unexpected token {token}" + ))); + } + } + } + + // Validations: location and file_type are required + if builder.file_type.is_none() { + return Err(ParserError::ParserError( + "Missing STORED AS clause in COPY statement".into(), + )); + } + if builder.target.is_none() { + return Err(ParserError::ParserError( + "Missing TO clause in COPY statement".into(), + )); + } Ok(Statement::CopyTo(CopyToStatement { source, - target, - options, + target: builder.target.unwrap(), + partitioned_by: builder.partitioned_by.unwrap_or(vec![]), + has_header: builder.has_header.unwrap_or(false), + file_type: builder.file_type.unwrap(), + options: builder.options.unwrap_or(vec![]), })) } @@ -624,15 +713,6 @@ impl<'a> DFParser<'a> { } let mut builder = Builder::default(); - fn ensure_not_set(field: &Option, name: &str) -> Result<(), ParserError> { - if field.is_some() { - return Err(ParserError::ParserError(format!( - "{name} specified more than once", - ))); - } - Ok(()) - } - loop { if let Some(keyword) = self.parser.parse_one_of_keywords(&[ Keyword::STORED, @@ -1321,10 +1401,13 @@ mod tests { #[test] fn copy_to_table_to_table() -> Result<(), ParserError> { // positive case - let sql = "COPY foo TO bar"; + let sql = "COPY foo TO bar STORED AS CSV"; let expected = Statement::CopyTo(CopyToStatement { source: object_name("foo"), target: "bar".to_string(), + partitioned_by: vec![], + has_header: false, + file_type: "CSV".to_string(), options: vec![], }); @@ -1335,10 +1418,22 @@ mod tests { #[test] fn explain_copy_to_table_to_table() -> Result<(), ParserError> { let cases = vec![ - ("EXPLAIN COPY foo TO bar", false, false), - ("EXPLAIN ANALYZE COPY foo TO bar", true, false), - ("EXPLAIN VERBOSE COPY foo TO bar", false, true), - ("EXPLAIN ANALYZE VERBOSE COPY foo TO bar", true, true), + ("EXPLAIN COPY foo TO bar STORED AS PARQUET", false, false), + ( + "EXPLAIN ANALYZE COPY foo TO bar STORED AS PARQUET", + true, + false, + ), + ( + "EXPLAIN VERBOSE COPY foo TO bar STORED AS PARQUET", + false, + true, + ), + ( + "EXPLAIN ANALYZE VERBOSE COPY foo TO bar STORED AS PARQUET", + true, + true, + ), ]; for (sql, analyze, verbose) in cases { println!("sql: {sql}, analyze: {analyze}, verbose: {verbose}"); @@ -1346,6 +1441,9 @@ mod tests { let expected_copy = Statement::CopyTo(CopyToStatement { source: object_name("foo"), target: "bar".to_string(), + partitioned_by: vec![], + has_header: false, + file_type: "PARQUET".to_string(), options: vec![], }); let expected = Statement::Explain(ExplainStatement { @@ -1375,10 +1473,13 @@ mod tests { panic!("Expected query, got {statement:?}"); }; - let sql = "COPY (SELECT 1) TO bar"; + let sql = "COPY (SELECT 1) TO bar STORED AS CSV WITH HEADER ROW"; let expected = Statement::CopyTo(CopyToStatement { source: CopyToSource::Query(query), target: "bar".to_string(), + partitioned_by: vec![], + has_header: true, + file_type: "CSV".to_string(), options: vec![], }); assert_eq!(verified_stmt(sql), expected); @@ -1387,10 +1488,31 @@ mod tests { #[test] fn copy_to_options() -> Result<(), ParserError> { - let sql = "COPY foo TO bar (row_group_size 55)"; + let sql = "COPY foo TO bar STORED AS CSV OPTIONS (row_group_size 55)"; let expected = Statement::CopyTo(CopyToStatement { source: object_name("foo"), target: "bar".to_string(), + partitioned_by: vec![], + has_header: false, + file_type: "CSV".to_string(), + options: vec![( + "row_group_size".to_string(), + Value::Number("55".to_string(), false), + )], + }); + assert_eq!(verified_stmt(sql), expected); + Ok(()) + } + + #[test] + fn copy_to_partitioned_by() -> Result<(), ParserError> { + let sql = "COPY foo TO bar STORED AS CSV PARTITIONED BY (a) OPTIONS (row_group_size 55)"; + let expected = Statement::CopyTo(CopyToStatement { + source: object_name("foo"), + target: "bar".to_string(), + partitioned_by: vec!["a".to_string()], + has_header: false, + file_type: "CSV".to_string(), options: vec![( "row_group_size".to_string(), Value::Number("55".to_string(), false), @@ -1404,24 +1526,24 @@ mod tests { fn copy_to_multi_options() -> Result<(), ParserError> { // order of options is preserved let sql = - "COPY foo TO bar (format parquet, row_group_size 55, compression snappy)"; + "COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy)"; let expected_options = vec![ ( - "format".to_string(), - Value::UnQuotedString("parquet".to_string()), - ), - ( - "row_group_size".to_string(), + "format.row_group_size".to_string(), Value::Number("55".to_string(), false), ), ( - "compression".to_string(), + "format.compression".to_string(), Value::UnQuotedString("snappy".to_string()), ), ]; - let options = if let Statement::CopyTo(copy_to) = verified_stmt(sql) { + let mut statements = DFParser::parse_sql(sql).unwrap(); + assert_eq!(statements.len(), 1); + let only_statement = statements.pop_front().unwrap(); + + let options = if let Statement::CopyTo(copy_to) = only_statement { copy_to.options } else { panic!("Expected copy"); @@ -1460,7 +1582,10 @@ mod tests { } let only_statement = statements.pop_front().unwrap(); - assert_eq!(canonical, only_statement.to_string()); + assert_eq!( + canonical.to_uppercase(), + only_statement.to_string().to_uppercase() + ); only_statement } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 412c3b753ed5..3df84d3d786f 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::{BTreeMap, HashMap, HashSet}; -use std::path::Path; use std::str::FromStr; use std::sync::Arc; @@ -813,20 +812,25 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { fn copy_to_plan(&self, statement: CopyToStatement) -> Result { // determine if source is table or query and handle accordingly let copy_source = statement.source; - let input = match copy_source { + let (input, input_schema, table_ref) = match copy_source { CopyToSource::Relation(object_name) => { let table_ref = self.object_name_to_table_reference(object_name.clone())?; - let table_source = self.context_provider.get_table_source(table_ref)?; - LogicalPlanBuilder::scan( + let table_source = + self.context_provider.get_table_source(table_ref.clone())?; + let plan = LogicalPlanBuilder::scan( object_name_to_string(&object_name), table_source, None, )? - .build()? + .build()?; + let input_schema = plan.schema().clone(); + (plan, input_schema, Some(table_ref)) } CopyToSource::Query(query) => { - self.query_to_plan(query, &mut PlannerContext::new())? + let plan = self.query_to_plan(query, &mut PlannerContext::new())?; + let input_schema = plan.schema().clone(); + (plan, input_schema, None) } }; @@ -852,8 +856,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { options.insert(key.to_lowercase(), value_string.to_lowercase()); } - let file_type = try_infer_file_type(&mut options, &statement.target)?; - let partition_by = take_partition_by(&mut options); + let file_type = + FileType::from_str(statement.file_type.as_str()).map_err(|_| { + DataFusionError::Execution(format!( + "Unknown FileType {}", + statement.file_type + )) + })?; + let partition_by = statement + .partitioned_by + .iter() + .map(|col| input_schema.field_with_name(table_ref.as_ref(), col)) + .collect::>>()? + .into_iter() + .map(|f| f.name().to_owned()) + .collect(); Ok(LogicalPlan::Copy(CopyTo { input: Arc::new(input), @@ -1469,82 +1486,3 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .is_ok() } } - -/// Infers the file type for a given target based on provided options or file extension. -/// -/// This function tries to determine the file type based on the 'format' option present -/// in the provided options hashmap. If 'format' is not explicitly set, the function attempts -/// to infer the file type from the file extension of the target. It returns an error if neither -/// the format option is set nor the file extension can be determined or parsed. -/// -/// # Arguments -/// -/// * `options` - A mutable reference to a HashMap containing options where the file format -/// might be specified under the 'format' key. -/// * `target` - A string slice representing the path to the file for which the file type needs to be inferred. -/// -/// # Returns -/// -/// Returns `Result` which is Ok if the file type could be successfully inferred, -/// otherwise returns an error in case of failure to determine or parse the file format or extension. -/// -/// # Errors -/// -/// This function returns an error in two cases: -/// - If the 'format' option is not set and the file extension cannot be retrieved from `target`. -/// - If the file extension is found but cannot be converted into a valid string. -/// -pub fn try_infer_file_type( - options: &mut HashMap, - target: &str, -) -> Result { - let explicit_format = options.remove("format"); - let format = match explicit_format { - Some(s) => FileType::from_str(&s), - None => { - // try to infer file format from file extension - let extension: &str = &Path::new(target) - .extension() - .ok_or(DataFusionError::Configuration( - "Format not explicitly set and unable to get file extension!" - .to_string(), - ))? - .to_str() - .ok_or(DataFusionError::Configuration( - "Format not explicitly set and failed to parse file extension!" - .to_string(), - ))? - .to_lowercase(); - - FileType::from_str(extension) - } - }?; - - Ok(format) -} - -/// Extracts and parses the 'partition_by' option from a provided options hashmap. -/// -/// This function looks for a 'partition_by' key in the options hashmap. If found, -/// it splits the value by commas, trims each resulting string, and replaces double -/// single quotes with a single quote. It returns a vector of partition column names. -/// -/// # Arguments -/// -/// * `options` - A mutable reference to a HashMap containing options where 'partition_by' -/// might be specified. -/// -/// # Returns -/// -/// Returns a `Vec` containing partition column names. If the 'partition_by' option -/// is not present, returns an empty vector. -pub fn take_partition_by(options: &mut HashMap) -> Vec { - let partition_by = options.remove("partition_by"); - match partition_by { - Some(part_cols) => part_cols - .split(',') - .map(|s| s.trim().replace("''", "'")) - .collect::>(), - None => vec![], - } -} diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 6681c3d02564..5a26f47cb05f 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -386,7 +386,7 @@ fn plan_rollback_transaction_chained() { #[test] fn plan_copy_to() { - let sql = "COPY test_decimal to 'output.csv'"; + let sql = "COPY test_decimal to 'output.csv' STORED AS CSV"; let plan = r#" CopyTo: format=csv output_url=output.csv options: () TableScan: test_decimal @@ -397,7 +397,7 @@ CopyTo: format=csv output_url=output.csv options: () #[test] fn plan_explain_copy_to() { - let sql = "EXPLAIN COPY test_decimal to 'output.csv'"; + let sql = "EXPLAIN COPY test_decimal to 'output.csv' STORED AS CSV"; let plan = r#" Explain CopyTo: format=csv output_url=output.csv options: () @@ -409,7 +409,7 @@ Explain #[test] fn plan_copy_to_query() { - let sql = "COPY (select * from test_decimal limit 10) to 'output.csv'"; + let sql = "COPY (select * from test_decimal limit 10) to 'output.csv' STORED AS CSV"; let plan = r#" CopyTo: format=csv output_url=output.csv options: () Limit: skip=0, fetch=10 diff --git a/datafusion/sqllogictest/test_files/clickbench.slt b/datafusion/sqllogictest/test_files/clickbench.slt index c2dba435263d..28bbe4af5874 100644 --- a/datafusion/sqllogictest/test_files/clickbench.slt +++ b/datafusion/sqllogictest/test_files/clickbench.slt @@ -23,7 +23,7 @@ # create.sql came from # https://github.com/ClickHouse/ClickBench/blob/8b9e3aa05ea18afa427f14909ddc678b8ef0d5e6/datafusion/create.sql # Data file made with DuckDB: -# COPY (SELECT * FROM 'hits.parquet' LIMIT 10) TO 'clickbench_hits_10.parquet' (FORMAT PARQUET); +# COPY (SELECT * FROM 'hits.parquet' LIMIT 10) TO 'clickbench_hits_10.parquet' STORED AS PARQUET; statement ok CREATE EXTERNAL TABLE hits diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index df23a993ebce..f83ecfbe527b 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -21,13 +21,13 @@ create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), (2, # Copy to directory as multiple files query IT -COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, 'parquet.compression' 'zstd(10)'); +COPY source_table TO 'test_files/scratch/copy/table/' STORED AS parquet OPTIONS ('format.compression' 'zstd(10)'); ---- 2 # Copy to directory as partitioned files query IT -COPY source_table TO 'test_files/scratch/copy/partitioned_table1/' (format parquet, 'parquet.compression' 'zstd(10)', partition_by 'col2'); +COPY source_table TO 'test_files/scratch/copy/partitioned_table1/' STORED AS parquet PARTITIONED BY (col2) OPTIONS ('format.compression' 'zstd(10)'); ---- 2 @@ -54,8 +54,8 @@ select * from validate_partitioned_parquet_bar order by col1; # Copy to directory as partitioned files query ITT -COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table2/' -(format parquet, partition_by 'column2, column3', 'parquet.compression' 'zstd(10)'); +COPY (values (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table2/' STORED AS parquet PARTITIONED BY (column2, column3) +OPTIONS ('format.compression' 'zstd(10)'); ---- 3 @@ -82,8 +82,8 @@ select * from validate_partitioned_parquet_a_x order by column1; # Copy to directory as partitioned files query TTT -COPY (values ('1', 'a', 'x'), ('2', 'b', 'y'), ('3', 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table3/' -(format parquet, 'parquet.compression' 'zstd(10)', partition_by 'column1, column3'); +COPY (values ('1', 'a', 'x'), ('2', 'b', 'y'), ('3', 'c', 'z')) TO 'test_files/scratch/copy/partitioned_table3/' STORED AS parquet PARTITIONED BY (column1, column3) +OPTIONS ('format.compression' 'zstd(10)'); ---- 3 @@ -111,49 +111,52 @@ a statement ok create table test ("'test'" varchar, "'test2'" varchar, "'test3'" varchar); -query TTT -insert into test VALUES ('a', 'x', 'aa'), ('b','y', 'bb'), ('c', 'z', 'cc') ----- -3 - -query T -select "'test'" from test ----- -a -b -c - -# Note to place a single ' inside of a literal string escape by putting two '' -query TTT -copy test to 'test_files/scratch/copy/escape_quote' (format csv, partition_by '''test2'',''test3''') ----- -3 - -statement ok -CREATE EXTERNAL TABLE validate_partitioned_escape_quote STORED AS CSV -LOCATION 'test_files/scratch/copy/escape_quote/' PARTITIONED BY ("'test2'", "'test3'"); - +## Until the partition by parsing uses ColumnDef, this test is meaningless since it becomes an overfit. Even in +## CREATE EXTERNAL TABLE, there is a schema mismatch, this should be an issue. +# +#query TTT +#insert into test VALUES ('a', 'x', 'aa'), ('b','y', 'bb'), ('c', 'z', 'cc') +#---- +#3 +# +#query T +#select "'test'" from test +#---- +#a +#b +#c +# +# # Note to place a single ' inside of a literal string escape by putting two '' +#query TTT +#copy test to 'test_files/scratch/copy/escape_quote' STORED AS CSV; +#---- +#3 +# +#statement ok +#CREATE EXTERNAL TABLE validate_partitioned_escape_quote STORED AS CSV +#LOCATION 'test_files/scratch/copy/escape_quote/' PARTITIONED BY ("'test2'", "'test3'"); +# # This triggers a panic (index out of bounds) # https://github.com/apache/arrow-datafusion/issues/9269 #query #select * from validate_partitioned_escape_quote; query TT -EXPLAIN COPY source_table TO 'test_files/scratch/copy/table/' (format parquet, 'parquet.compression' 'zstd(10)'); +EXPLAIN COPY source_table TO 'test_files/scratch/copy/table/' STORED AS PARQUET OPTIONS ('format.compression' 'zstd(10)'); ---- logical_plan -CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: (parquet.compression zstd(10)) +CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: (format.compression zstd(10)) --TableScan: source_table projection=[col1, col2] physical_plan FileSinkExec: sink=ParquetSink(file_groups=[]) --MemoryExec: partitions=1, partition_sizes=[1] # Error case -query error DataFusion error: Invalid or Unsupported Configuration: Format not explicitly set and unable to get file extension! +query error DataFusion error: SQL error: ParserError\("Missing STORED AS clause in COPY statement"\) EXPLAIN COPY source_table to 'test_files/scratch/copy/table/' query TT -EXPLAIN COPY source_table to 'test_files/scratch/copy/table/' (format parquet) +EXPLAIN COPY source_table to 'test_files/scratch/copy/table/' STORED AS PARQUET ---- logical_plan CopyTo: format=parquet output_url=test_files/scratch/copy/table/ options: () @@ -164,7 +167,7 @@ FileSinkExec: sink=ParquetSink(file_groups=[]) # Copy more files to directory via query query IT -COPY (select * from source_table UNION ALL select * from source_table) to 'test_files/scratch/copy/table/' (format parquet); +COPY (select * from source_table UNION ALL select * from source_table) to 'test_files/scratch/copy/table/' STORED AS PARQUET; ---- 4 @@ -185,7 +188,7 @@ select * from validate_parquet; query ? copy (values (struct(timestamp '2021-01-01 01:00:01', 1)), (struct(timestamp '2022-01-01 01:00:01', 2)), (struct(timestamp '2023-01-03 01:00:01', 3)), (struct(timestamp '2024-01-01 01:00:01', 4))) -to 'test_files/scratch/copy/table_nested2/' (format parquet); +to 'test_files/scratch/copy/table_nested2/' STORED AS PARQUET; ---- 4 @@ -204,7 +207,7 @@ query ?? COPY (values (struct ('foo', (struct ('foo', make_array(struct('a',1), struct('b',2))))), make_array(timestamp '2023-01-01 01:00:01',timestamp '2023-01-01 01:00:01')), (struct('bar', (struct ('foo', make_array(struct('aa',10), struct('bb',20))))), make_array(timestamp '2024-01-01 01:00:01', timestamp '2024-01-01 01:00:01'))) -to 'test_files/scratch/copy/table_nested/' (format parquet); +to 'test_files/scratch/copy/table_nested/' STORED AS PARQUET; ---- 2 @@ -221,7 +224,7 @@ select * from validate_parquet_nested; query ? copy (values ([struct('foo', 1), struct('bar', 2)])) to 'test_files/scratch/copy/array_of_struct/' -(format parquet); +STORED AS PARQUET; ---- 1 @@ -236,8 +239,7 @@ select * from validate_array_of_struct; query ? copy (values (struct('foo', [1,2,3], struct('bar', [2,3,4])))) -to 'test_files/scratch/copy/struct_with_array/' -(format parquet); +to 'test_files/scratch/copy/struct_with_array/' STORED AS PARQUET; ---- 1 @@ -255,31 +257,32 @@ select * from validate_struct_with_array; query IT COPY source_table TO 'test_files/scratch/copy/table_with_options/' -(format parquet, -'parquet.compression' snappy, -'parquet.compression::col1' 'zstd(5)', -'parquet.compression::col2' snappy, -'parquet.max_row_group_size' 12345, -'parquet.data_pagesize_limit' 1234, -'parquet.write_batch_size' 1234, -'parquet.writer_version' 2.0, -'parquet.dictionary_page_size_limit' 123, -'parquet.created_by' 'DF copy.slt', -'parquet.column_index_truncate_length' 123, -'parquet.data_page_row_count_limit' 1234, -'parquet.bloom_filter_enabled' true, -'parquet.bloom_filter_enabled::col1' false, -'parquet.bloom_filter_fpp::col2' 0.456, -'parquet.bloom_filter_ndv::col2' 456, -'parquet.encoding' plain, -'parquet.encoding::col1' DELTA_BINARY_PACKED, -'parquet.dictionary_enabled::col2' true, -'parquet.dictionary_enabled' false, -'parquet.statistics_enabled' page, -'parquet.statistics_enabled::col2' none, -'parquet.max_statistics_size' 123, -'parquet.bloom_filter_fpp' 0.001, -'parquet.bloom_filter_ndv' 100 +STORED AS PARQUET +OPTIONS ( +'format.compression' snappy, +'format.compression::col1' 'zstd(5)', +'format.compression::col2' snappy, +'format.max_row_group_size' 12345, +'format.data_pagesize_limit' 1234, +'format.write_batch_size' 1234, +'format.writer_version' 2.0, +'format.dictionary_page_size_limit' 123, +'format.created_by' 'DF copy.slt', +'format.column_index_truncate_length' 123, +'format.data_page_row_count_limit' 1234, +'format.bloom_filter_enabled' true, +'format.bloom_filter_enabled::col1' false, +'format.bloom_filter_fpp::col2' 0.456, +'format.bloom_filter_ndv::col2' 456, +'format.encoding' plain, +'format.encoding::col1' DELTA_BINARY_PACKED, +'format.dictionary_enabled::col2' true, +'format.dictionary_enabled' false, +'format.statistics_enabled' page, +'format.statistics_enabled::col2' none, +'format.max_statistics_size' 123, +'format.bloom_filter_fpp' 0.001, +'format.bloom_filter_ndv' 100 ) ---- 2 @@ -296,7 +299,7 @@ select * from validate_parquet_with_options; # Copy from table to single file query IT -COPY source_table to 'test_files/scratch/copy/table.parquet'; +COPY source_table to 'test_files/scratch/copy/table.parquet' STORED AS PARQUET; ---- 2 @@ -312,7 +315,7 @@ select * from validate_parquet_single; # copy from table to folder of compressed json files query IT -COPY source_table to 'test_files/scratch/copy/table_json_gz' (format json, 'json.compression' gzip); +COPY source_table to 'test_files/scratch/copy/table_json_gz' STORED AS JSON OPTIONS ('format.compression' gzip); ---- 2 @@ -328,7 +331,7 @@ select * from validate_json_gz; # copy from table to folder of compressed csv files query IT -COPY source_table to 'test_files/scratch/copy/table_csv' (format csv, 'csv.has_header' false, 'csv.compression' gzip); +COPY source_table to 'test_files/scratch/copy/table_csv' STORED AS CSV OPTIONS ('format.has_header' false, 'format.compression' gzip); ---- 2 @@ -344,7 +347,7 @@ select * from validate_csv; # Copy from table to single csv query IT -COPY source_table to 'test_files/scratch/copy/table.csv'; +COPY source_table to 'test_files/scratch/copy/table.csv' STORED AS CSV ; ---- 2 @@ -360,7 +363,7 @@ select * from validate_single_csv; # Copy from table to folder of json query IT -COPY source_table to 'test_files/scratch/copy/table_json' (format json); +COPY source_table to 'test_files/scratch/copy/table_json' STORED AS JSON; ---- 2 @@ -376,7 +379,7 @@ select * from validate_json; # Copy from table to single json file query IT -COPY source_table to 'test_files/scratch/copy/table.json'; +COPY source_table to 'test_files/scratch/copy/table.json' STORED AS JSON ; ---- 2 @@ -394,12 +397,12 @@ select * from validate_single_json; query IT COPY source_table to 'test_files/scratch/copy/table_csv_with_options' -(format csv, -'csv.has_header' false, -'csv.compression' uncompressed, -'csv.datetime_format' '%FT%H:%M:%S.%9f', -'csv.delimiter' ';', -'csv.null_value' 'NULLVAL'); +STORED AS CSV OPTIONS ( +'format.has_header' false, +'format.compression' uncompressed, +'format.datetime_format' '%FT%H:%M:%S.%9f', +'format.delimiter' ';', +'format.null_value' 'NULLVAL'); ---- 2 @@ -417,7 +420,7 @@ select * from validate_csv_with_options; # Copy from table to single arrow file query IT -COPY source_table to 'test_files/scratch/copy/table.arrow'; +COPY source_table to 'test_files/scratch/copy/table.arrow' STORED AS ARROW; ---- 2 @@ -437,7 +440,7 @@ select * from validate_arrow_file; query T? COPY (values ('c', arrow_cast('foo', 'Dictionary(Int32, Utf8)')), ('d', arrow_cast('bar', 'Dictionary(Int32, Utf8)'))) -to 'test_files/scratch/copy/table_dict.arrow'; +to 'test_files/scratch/copy/table_dict.arrow' STORED AS ARROW; ---- 2 @@ -456,7 +459,7 @@ d bar # Copy from table to folder of json query IT -COPY source_table to 'test_files/scratch/copy/table_arrow' (format arrow); +COPY source_table to 'test_files/scratch/copy/table_arrow' STORED AS ARROW; ---- 2 @@ -475,12 +478,12 @@ select * from validate_arrow; # Copy from table with options query error DataFusion error: Invalid or Unsupported Configuration: Config value "row_group_size" not found on JsonOptions -COPY source_table to 'test_files/scratch/copy/table.json' ('json.row_group_size' 55); +COPY source_table to 'test_files/scratch/copy/table.json' STORED AS JSON OPTIONS ('format.row_group_size' 55); # Incomplete statement query error DataFusion error: SQL error: ParserError\("Expected \), found: EOF"\) COPY (select col2, sum(col1) from source_table # Copy from table with non literal -query error DataFusion error: SQL error: ParserError\("Expected ',' or '\)' after option definition, found: \+"\) +query error DataFusion error: SQL error: ParserError\("Unexpected token \("\) COPY source_table to '/tmp/table.parquet' (row_group_size 55 + 102); diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 3b85dd9e986f..c4a26a5e227d 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -101,8 +101,8 @@ statement error DataFusion error: SQL error: ParserError\("Unexpected token FOOB CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV FOOBAR BARBAR BARFOO LOCATION 'foo.csv'; # Conflicting options -statement error DataFusion error: Invalid or Unsupported Configuration: Key "parquet.column_index_truncate_length" is not applicable for CSV format +statement error DataFusion error: Invalid or Unsupported Configuration: Config value "column_index_truncate_length" not found on CsvOptions CREATE EXTERNAL TABLE csv_table (column1 int) STORED AS CSV LOCATION 'foo.csv' -OPTIONS ('csv.delimiter' ';', 'parquet.column_index_truncate_length' '123') +OPTIONS ('format.delimiter' ';', 'format.column_index_truncate_length' '123') diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index 7b299c0cf143..ab6847afb6a5 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -23,7 +23,7 @@ c2 VARCHAR ) STORED AS CSV WITH HEADER ROW DELIMITER ',' -OPTIONS ('csv.quote' '~') +OPTIONS ('format.quote' '~') LOCATION '../core/tests/data/quote.csv'; statement ok @@ -33,7 +33,7 @@ c2 VARCHAR ) STORED AS CSV WITH HEADER ROW DELIMITER ',' -OPTIONS ('csv.escape' '\') +OPTIONS ('format.escape' '\') LOCATION '../core/tests/data/escape.csv'; query TT @@ -71,7 +71,7 @@ c2 VARCHAR ) STORED AS CSV WITH HEADER ROW DELIMITER ',' -OPTIONS ('csv.escape' '"') +OPTIONS ('format.escape' '"') LOCATION '../core/tests/data/escape.csv'; # TODO: Validate this with better data. @@ -117,14 +117,14 @@ CREATE TABLE src_table_2 ( query ITII COPY src_table_1 TO 'test_files/scratch/csv_files/csv_partitions/1.csv' -(FORMAT CSV); +STORED AS CSV; ---- 4 query ITII COPY src_table_2 TO 'test_files/scratch/csv_files/csv_partitions/2.csv' -(FORMAT CSV); +STORED AS CSV; ---- 4 diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index 9004566fcdb2..156a716871f0 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -4506,28 +4506,28 @@ CREATE TABLE src_table ( query PI COPY (SELECT * FROM src_table) TO 'test_files/scratch/group_by/timestamp_table/0.csv' -(FORMAT CSV); +STORED AS CSV; ---- 10 query PI COPY (SELECT * FROM src_table) TO 'test_files/scratch/group_by/timestamp_table/1.csv' -(FORMAT CSV); +STORED AS CSV; ---- 10 query PI COPY (SELECT * FROM src_table) TO 'test_files/scratch/group_by/timestamp_table/2.csv' -(FORMAT CSV); +STORED AS CSV; ---- 10 query PI COPY (SELECT * FROM src_table) TO 'test_files/scratch/group_by/timestamp_table/3.csv' -(FORMAT CSV); +STORED AS CSV; ---- 10 diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index b7cd1243cb0f..3cc52666d533 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -45,7 +45,7 @@ CREATE TABLE src_table ( query ITID COPY (SELECT * FROM src_table LIMIT 3) TO 'test_files/scratch/parquet/test_table/0.parquet' -(FORMAT PARQUET); +STORED AS PARQUET; ---- 3 @@ -53,7 +53,7 @@ TO 'test_files/scratch/parquet/test_table/0.parquet' query ITID COPY (SELECT * FROM src_table WHERE int_col > 3 LIMIT 3) TO 'test_files/scratch/parquet/test_table/1.parquet' -(FORMAT PARQUET); +STORED AS PARQUET; ---- 3 @@ -128,7 +128,7 @@ SortPreservingMergeExec: [string_col@1 ASC NULLS LAST,int_col@0 ASC NULLS LAST] query ITID COPY (SELECT * FROM src_table WHERE int_col > 6 LIMIT 3) TO 'test_files/scratch/parquet/test_table/2.parquet' -(FORMAT PARQUET); +STORED AS PARQUET; ---- 3 @@ -281,7 +281,7 @@ LIMIT 10; query ITID COPY (SELECT * FROM src_table WHERE int_col > 6 LIMIT 3) TO 'test_files/scratch/parquet/test_table/subdir/3.parquet' -(FORMAT PARQUET); +STORED AS PARQUET; ---- 3 diff --git a/datafusion/sqllogictest/test_files/repartition.slt b/datafusion/sqllogictest/test_files/repartition.slt index 391a6739b060..594c52f12d75 100644 --- a/datafusion/sqllogictest/test_files/repartition.slt +++ b/datafusion/sqllogictest/test_files/repartition.slt @@ -25,7 +25,7 @@ set datafusion.execution.target_partitions = 4; statement ok COPY (VALUES (1, 2), (2, 5), (3, 2), (4, 5), (5, 0)) TO 'test_files/scratch/repartition/parquet_table/2.parquet' -(FORMAT PARQUET); +STORED AS PARQUET; statement ok CREATE EXTERNAL TABLE parquet_table(column1 int, column2 int) diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index 15fe670a454c..fe0f6c1e8139 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -35,7 +35,7 @@ set datafusion.optimizer.repartition_file_min_size = 1; # Note filename 2.parquet to test sorting (on local file systems it is often listed before 1.parquet) statement ok COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/parquet_table/2.parquet' -(FORMAT PARQUET); +STORED AS PARQUET; statement ok CREATE EXTERNAL TABLE parquet_table(column1 int) @@ -86,7 +86,7 @@ set datafusion.optimizer.enable_round_robin_repartition = true; # create a second parquet file statement ok COPY (VALUES (100), (200)) TO 'test_files/scratch/repartition_scan/parquet_table/1.parquet' -(FORMAT PARQUET); +STORED AS PARQUET; ## Still expect to see the scan read the file as "4" groups with even sizes. One group should read ## parts of both files. @@ -158,7 +158,7 @@ DROP TABLE parquet_table_with_order; # create a single csv file statement ok COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/csv_table/1.csv' -(FORMAT csv, 'csv.has_header' true); +STORED AS CSV WITH HEADER ROW; statement ok CREATE EXTERNAL TABLE csv_table(column1 int) @@ -202,7 +202,7 @@ DROP TABLE csv_table; # create a single json file statement ok COPY (VALUES (1), (2), (3), (4), (5)) TO 'test_files/scratch/repartition_scan/json_table/1.json' -(FORMAT json); +STORED AS JSON; statement ok CREATE EXTERNAL TABLE json_table (column1 int) diff --git a/datafusion/sqllogictest/test_files/schema_evolution.slt b/datafusion/sqllogictest/test_files/schema_evolution.slt index aee0e97edc1e..5572c4a5ffef 100644 --- a/datafusion/sqllogictest/test_files/schema_evolution.slt +++ b/datafusion/sqllogictest/test_files/schema_evolution.slt @@ -31,7 +31,7 @@ COPY ( SELECT column1 as a, column2 as b FROM ( VALUES ('foo', 1), ('foo', 2), ('foo', 3) ) ) TO 'test_files/scratch/schema_evolution/parquet_table/1.parquet' -(FORMAT PARQUET); +STORED AS PARQUET; # File2 has only b @@ -40,7 +40,7 @@ COPY ( SELECT column1 as b FROM ( VALUES (10) ) ) TO 'test_files/scratch/schema_evolution/parquet_table/2.parquet' -(FORMAT PARQUET); +STORED AS PARQUET; # File3 has a column from 'z' which does not appear in the table # but also values from a which do appear in the table @@ -49,7 +49,7 @@ COPY ( SELECT column1 as z, column2 as a FROM ( VALUES ('bar', 'foo'), ('blarg', 'foo') ) ) TO 'test_files/scratch/schema_evolution/parquet_table/3.parquet' -(FORMAT PARQUET); +STORED AS PARQUET; # File4 has data for b and a (reversed) and d statement ok @@ -57,7 +57,7 @@ COPY ( SELECT column1 as b, column2 as a, column3 as c FROM ( VALUES (100, 'foo', 10.5), (200, 'foo', 12.6), (300, 'bzz', 13.7) ) ) TO 'test_files/scratch/schema_evolution/parquet_table/4.parquet' -(FORMAT PARQUET); +STORED AS PARQUET; # The logical distribution of `a`, `b` and `c` in the files is like this: # diff --git a/docs/source/user-guide/sql/dml.md b/docs/source/user-guide/sql/dml.md index 405e77a21b26..b9614bb8f929 100644 --- a/docs/source/user-guide/sql/dml.md +++ b/docs/source/user-guide/sql/dml.md @@ -49,7 +49,7 @@ Copy the contents of `source_table` to one or more Parquet formatted files in the `dir_name` directory: ```sql -> COPY source_table TO 'dir_name' (FORMAT parquet); +> COPY source_table TO 'dir_name' STORED AS PARQUET; +-------+ | count | +-------+ From 536e291cc290556f781163b97b10b1119dd8531c Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Thu, 14 Mar 2024 09:55:11 +0300 Subject: [PATCH 2/7] Resolve datafusion-cli error --- datafusion-cli/src/exec.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index b11f1c202284..ea765ee8eceb 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -412,7 +412,7 @@ mod tests { ) })?; for location in locations { - let sql = format!("copy (values (1,2)) to '{}';", location); + let sql = format!("copy (values (1,2)) to '{}' STORED AS PARQUET;", location); let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?; for statement in statements { //Should not fail @@ -438,8 +438,8 @@ mod tests { let location = "s3://bucket/path/file.parquet"; // Missing region, use object_store defaults - let sql = format!("COPY (values (1,2)) TO '{location}' - (format parquet, 'aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}')"); + let sql = format!("COPY (values (1,2)) TO '{location}' STORED AS PARQUET + OPTIONS ('aws.access_key_id' '{access_key_id}', 'aws.secret_access_key' '{secret_access_key}')"); copy_to_table_test(location, &sql).await?; Ok(()) From 68105cfd69bca02f2db95900a9775b3f27a51b90 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Fri, 15 Mar 2024 14:44:12 +0300 Subject: [PATCH 3/7] Make STORED AS optional --- datafusion/sql/src/parser.rs | 37 +++++++++---------- datafusion/sql/src/statement.rs | 33 ++++++++++++++--- datafusion/sql/tests/sql_integration.rs | 16 +++++++- .../sqllogictest/test_files/clickbench.slt | 2 +- datafusion/sqllogictest/test_files/copy.slt | 6 +-- 5 files changed, 63 insertions(+), 31 deletions(-) diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index d5b08d385544..c3b967af903d 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -107,7 +107,7 @@ pub struct CopyToStatement { /// CSV Header row? pub has_header: bool, /// File type (Parquet, NDJSON, CSV, etc) - pub file_type: String, + pub stored_as: Option, /// Target specific options pub options: Vec<(String, Value)>, } @@ -118,12 +118,15 @@ impl fmt::Display for CopyToStatement { source, target, partitioned_by, - file_type, + stored_as, options, .. } = self; - write!(f, "COPY {source} TO {target} STORED AS {}", file_type)?; + write!(f, "COPY {source} TO {target}")?; + if let Some(file_type) = stored_as { + write!(f, " STORED AS {}", file_type)?; + } if !partitioned_by.is_empty() { write!(f, " PARTITIONED BY ({})", partitioned_by.join(", "))?; } @@ -397,7 +400,7 @@ impl<'a> DFParser<'a> { #[derive(Default)] struct Builder { - file_type: Option, + stored_as: Option, target: Option, partitioned_by: Option>, has_header: Option, @@ -417,8 +420,8 @@ impl<'a> DFParser<'a> { match keyword { Keyword::STORED => { self.parser.expect_keyword(Keyword::AS)?; - ensure_not_set(&builder.file_type, "STORED AS")?; - builder.file_type = Some(self.parse_file_format()?); + ensure_not_set(&builder.stored_as, "STORED AS")?; + builder.stored_as = Some(self.parse_file_format()?); } Keyword::TO => { ensure_not_set(&builder.target, "TO")?; @@ -455,24 +458,20 @@ impl<'a> DFParser<'a> { } } - // Validations: location and file_type are required - if builder.file_type.is_none() { - return Err(ParserError::ParserError( - "Missing STORED AS clause in COPY statement".into(), - )); - } if builder.target.is_none() { return Err(ParserError::ParserError( "Missing TO clause in COPY statement".into(), )); } + let target = builder.target.unwrap(); + Ok(Statement::CopyTo(CopyToStatement { source, - target: builder.target.unwrap(), + target, partitioned_by: builder.partitioned_by.unwrap_or(vec![]), has_header: builder.has_header.unwrap_or(false), - file_type: builder.file_type.unwrap(), + stored_as: builder.stored_as, options: builder.options.unwrap_or(vec![]), })) } @@ -1407,7 +1406,7 @@ mod tests { target: "bar".to_string(), partitioned_by: vec![], has_header: false, - file_type: "CSV".to_string(), + stored_as: Some("CSV".to_owned()), options: vec![], }); @@ -1443,7 +1442,7 @@ mod tests { target: "bar".to_string(), partitioned_by: vec![], has_header: false, - file_type: "PARQUET".to_string(), + stored_as: Some("PARQUET".to_owned()), options: vec![], }); let expected = Statement::Explain(ExplainStatement { @@ -1479,7 +1478,7 @@ mod tests { target: "bar".to_string(), partitioned_by: vec![], has_header: true, - file_type: "CSV".to_string(), + stored_as: Some("CSV".to_owned()), options: vec![], }); assert_eq!(verified_stmt(sql), expected); @@ -1494,7 +1493,7 @@ mod tests { target: "bar".to_string(), partitioned_by: vec![], has_header: false, - file_type: "CSV".to_string(), + stored_as: Some("CSV".to_owned()), options: vec![( "row_group_size".to_string(), Value::Number("55".to_string(), false), @@ -1512,7 +1511,7 @@ mod tests { target: "bar".to_string(), partitioned_by: vec!["a".to_string()], has_header: false, - file_type: "CSV".to_string(), + stored_as: Some("CSV".to_owned()), options: vec![( "row_group_size".to_string(), Value::Number("55".to_string(), false), diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 3df84d3d786f..3e56b06e97be 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::{BTreeMap, HashMap, HashSet}; +use std::path::Path; use std::str::FromStr; use std::sync::Arc; @@ -856,13 +857,33 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { options.insert(key.to_lowercase(), value_string.to_lowercase()); } - let file_type = - FileType::from_str(statement.file_type.as_str()).map_err(|_| { - DataFusionError::Execution(format!( - "Unknown FileType {}", - statement.file_type + let file_type = if let Some(file_type) = statement.stored_as { + FileType::from_str(&file_type).map_err(|_| { + DataFusionError::Configuration(format!("Unknown FileType {}", file_type)) + })? + } else { + let e = || { + DataFusionError::Configuration( + "Format not explicitly set and unable to get file extension! Use STORED AS to define file format." + .to_string(), + ) + }; + // try to infer file format from file extension + let extension: &str = &Path::new(&statement.target) + .extension() + .ok_or_else(e)? + .to_str() + .ok_or_else(e)? + .to_lowercase(); + + FileType::from_str(extension).map_err(|e| { + DataFusionError::Configuration(format!( + "{}. Use STORED AS to define file format.", + e )) - })?; + })? + }; + let partition_by = statement .partitioned_by .iter() diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 5a26f47cb05f..a1c001ffe730 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -397,7 +397,7 @@ CopyTo: format=csv output_url=output.csv options: () #[test] fn plan_explain_copy_to() { - let sql = "EXPLAIN COPY test_decimal to 'output.csv' STORED AS CSV"; + let sql = "EXPLAIN COPY test_decimal to 'output.csv'"; let plan = r#" Explain CopyTo: format=csv output_url=output.csv options: () @@ -407,9 +407,21 @@ Explain quick_test(sql, plan); } +#[test] +fn plan_explain_copy_to_format() { + let sql = "EXPLAIN COPY test_decimal to 'output.tbl' STORED AS CSV"; + let plan = r#" +Explain + CopyTo: format=csv output_url=output.tbl options: () + TableScan: test_decimal + "# + .trim(); + quick_test(sql, plan); +} + #[test] fn plan_copy_to_query() { - let sql = "COPY (select * from test_decimal limit 10) to 'output.csv' STORED AS CSV"; + let sql = "COPY (select * from test_decimal limit 10) to 'output.csv'"; let plan = r#" CopyTo: format=csv output_url=output.csv options: () Limit: skip=0, fetch=10 diff --git a/datafusion/sqllogictest/test_files/clickbench.slt b/datafusion/sqllogictest/test_files/clickbench.slt index 28bbe4af5874..9057e9c69b99 100644 --- a/datafusion/sqllogictest/test_files/clickbench.slt +++ b/datafusion/sqllogictest/test_files/clickbench.slt @@ -23,7 +23,7 @@ # create.sql came from # https://github.com/ClickHouse/ClickBench/blob/8b9e3aa05ea18afa427f14909ddc678b8ef0d5e6/datafusion/create.sql # Data file made with DuckDB: -# COPY (SELECT * FROM 'hits.parquet' LIMIT 10) TO 'clickbench_hits_10.parquet' STORED AS PARQUET; +# COPY (SELECT * FROM 'hits.parquet' LIMIT 10) TO 'clickbench_hits_10.parquet'; statement ok CREATE EXTERNAL TABLE hits diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index f83ecfbe527b..4d4f596d0c60 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -152,7 +152,7 @@ FileSinkExec: sink=ParquetSink(file_groups=[]) --MemoryExec: partitions=1, partition_sizes=[1] # Error case -query error DataFusion error: SQL error: ParserError\("Missing STORED AS clause in COPY statement"\) +query error DataFusion error: Invalid or Unsupported Configuration: Format not explicitly set and unable to get file extension! Use STORED AS to define file format. EXPLAIN COPY source_table to 'test_files/scratch/copy/table/' query TT @@ -299,7 +299,7 @@ select * from validate_parquet_with_options; # Copy from table to single file query IT -COPY source_table to 'test_files/scratch/copy/table.parquet' STORED AS PARQUET; +COPY source_table to 'test_files/scratch/copy/table.parquet'; ---- 2 @@ -347,7 +347,7 @@ select * from validate_csv; # Copy from table to single csv query IT -COPY source_table to 'test_files/scratch/copy/table.csv' STORED AS CSV ; +COPY source_table to 'test_files/scratch/copy/table.csv'; ---- 2 From 8df4b0e0938f81a1b094489cf7b48103efa35cb1 Mon Sep 17 00:00:00 2001 From: Mehmet Ozan Kabak Date: Sat, 16 Mar 2024 11:25:00 -0700 Subject: [PATCH 4/7] Review --- datafusion/common/src/config.rs | 4 +-- datafusion/core/tests/sql/sql_api.rs | 9 ++++--- datafusion/sql/src/parser.rs | 27 +++++++++---------- datafusion/sql/src/statement.rs | 12 +++------ datafusion/sql/tests/sql_integration.rs | 7 +++-- .../sqllogictest/test_files/clickbench.slt | 2 +- 6 files changed, 26 insertions(+), 35 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 153514e738d9..a52d39df1396 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1131,9 +1131,7 @@ impl ConfigField for TableOptions { fn set(&mut self, key: &str, value: &str) -> Result<()> { // Extensions are handled in the public `ConfigOptions::set` let (key, rem) = key.split_once('.').unwrap_or((key, "")); - let format = if let Some(format) = &self.current_format { - format - } else { + let Some(format) = &self.current_format else { return _config_err!("Specify a format for TableOptions"); }; match key { diff --git a/datafusion/core/tests/sql/sql_api.rs b/datafusion/core/tests/sql/sql_api.rs index f452be64bfa9..b3a819fbc331 100644 --- a/datafusion/core/tests/sql/sql_api.rs +++ b/datafusion/core/tests/sql/sql_api.rs @@ -16,6 +16,7 @@ // under the License. use datafusion::prelude::*; + use tempfile::TempDir; #[tokio::test] @@ -27,7 +28,7 @@ async fn unsupported_ddl_returns_error() { // disallow ddl let options = SQLOptions::new().with_allow_ddl(false); - let sql = "create view test_view as select * from test"; + let sql = "CREATE VIEW test_view AS SELECT * FROM test"; let df = ctx.sql_with_options(sql, options).await; assert_eq!( df.unwrap_err().strip_backtrace(), @@ -46,7 +47,7 @@ async fn unsupported_dml_returns_error() { let options = SQLOptions::new().with_allow_dml(false); - let sql = "insert into test values (1)"; + let sql = "INSERT INTO test VALUES (1)"; let df = ctx.sql_with_options(sql, options).await; assert_eq!( df.unwrap_err().strip_backtrace(), @@ -68,7 +69,7 @@ async fn unsupported_copy_returns_error() { let options = SQLOptions::new().with_allow_dml(false); let sql = format!( - "copy (values(1)) to '{}' STORED AS parquet", + "COPY (values(1)) TO '{}' STORED AS parquet", tmpfile.to_string_lossy() ); let df = ctx.sql_with_options(&sql, options).await; @@ -109,7 +110,7 @@ async fn ddl_can_not_be_planned_by_session_state() { let state = ctx.state(); // can not create a logical plan for catalog DDL - let sql = "drop table test"; + let sql = "DROP TABLE test"; let plan = state.create_logical_plan(sql).await.unwrap(); let physical_plan = state.create_physical_plan(&plan).await; assert_eq!( diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index c3b967af903d..0ed7d08ef378 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -17,21 +17,20 @@ //! [`DFParser`]: DataFusion SQL Parser based on [`sqlparser`] +use std::collections::{HashMap, VecDeque}; +use std::fmt; +use std::str::FromStr; + use datafusion_common::parsers::CompressionTypeVariant; -use sqlparser::ast::{OrderByExpr, Query, Value}; -use sqlparser::tokenizer::Word; use sqlparser::{ ast::{ - ColumnDef, ColumnOptionDef, ObjectName, Statement as SQLStatement, - TableConstraint, + ColumnDef, ColumnOptionDef, ObjectName, OrderByExpr, Query, + Statement as SQLStatement, TableConstraint, Value, }, dialect::{keywords::Keyword, Dialect, GenericDialect}, parser::{Parser, ParserError}, - tokenizer::{Token, TokenWithLocation, Tokenizer}, + tokenizer::{Token, TokenWithLocation, Tokenizer, Word}, }; -use std::collections::VecDeque; -use std::fmt; -use std::{collections::HashMap, str::FromStr}; // Use `Parser::expected` instead, if possible macro_rules! parser_err { @@ -102,11 +101,11 @@ pub struct CopyToStatement { pub source: CopyToSource, /// The URL to where the data is heading pub target: String, - /// Partitioned BY + /// Partition keys pub partitioned_by: Vec, - /// CSV Header row? + /// Indicates whether there is a header row (e.g. CSV) pub has_header: bool, - /// File type (Parquet, NDJSON, CSV, etc) + /// File type (Parquet, NDJSON, CSV etc.) pub stored_as: Option, /// Target specific options pub options: Vec<(String, Value)>, @@ -458,13 +457,11 @@ impl<'a> DFParser<'a> { } } - if builder.target.is_none() { + let Some(target) = builder.target else { return Err(ParserError::ParserError( "Missing TO clause in COPY statement".into(), )); - } - - let target = builder.target.unwrap(); + }; Ok(Statement::CopyTo(CopyToStatement { source, diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 3e56b06e97be..e50aceb757df 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -815,16 +815,12 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let copy_source = statement.source; let (input, input_schema, table_ref) = match copy_source { CopyToSource::Relation(object_name) => { - let table_ref = - self.object_name_to_table_reference(object_name.clone())?; + let table_name = object_name_to_string(&object_name); + let table_ref = self.object_name_to_table_reference(object_name)?; let table_source = self.context_provider.get_table_source(table_ref.clone())?; - let plan = LogicalPlanBuilder::scan( - object_name_to_string(&object_name), - table_source, - None, - )? - .build()?; + let plan = + LogicalPlanBuilder::scan(table_name, table_source, None)?.build()?; let input_schema = plan.schema().clone(); (plan, input_schema, Some(table_ref)) } diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index a1c001ffe730..d104cce7b013 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -22,12 +22,10 @@ use std::{sync::Arc, vec}; use arrow_schema::TimeUnit::Nanosecond; use arrow_schema::*; -use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect}; - +use datafusion_common::config::ConfigOptions; use datafusion_common::{ - config::ConfigOptions, DataFusionError, Result, ScalarValue, TableReference, + plan_err, DataFusionError, ParamValues, Result, ScalarValue, TableReference, }; -use datafusion_common::{plan_err, ParamValues}; use datafusion_expr::{ logical_plan::{LogicalPlan, Prepare}, AggregateUDF, ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, TableSource, @@ -37,6 +35,7 @@ use datafusion_sql::{ parser::DFParser, planner::{ContextProvider, ParserOptions, SqlToRel}, }; +use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect}; use rstest::rstest; diff --git a/datafusion/sqllogictest/test_files/clickbench.slt b/datafusion/sqllogictest/test_files/clickbench.slt index 9057e9c69b99..c2dba435263d 100644 --- a/datafusion/sqllogictest/test_files/clickbench.slt +++ b/datafusion/sqllogictest/test_files/clickbench.slt @@ -23,7 +23,7 @@ # create.sql came from # https://github.com/ClickHouse/ClickBench/blob/8b9e3aa05ea18afa427f14909ddc678b8ef0d5e6/datafusion/create.sql # Data file made with DuckDB: -# COPY (SELECT * FROM 'hits.parquet' LIMIT 10) TO 'clickbench_hits_10.parquet'; +# COPY (SELECT * FROM 'hits.parquet' LIMIT 10) TO 'clickbench_hits_10.parquet' (FORMAT PARQUET); statement ok CREATE EXTERNAL TABLE hits From 59e51b543d92c367c0a131d50f8f85a981b22d6e Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Mon, 18 Mar 2024 09:56:11 +0300 Subject: [PATCH 5/7] Review resolved --- datafusion/common/src/config.rs | 27 +++++++++++++++++++++++++++ datafusion/sql/src/parser.rs | 1 - 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index a52d39df1396..0cc7c6ed776c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1627,4 +1627,31 @@ mod tests { table_config.set("format.escape", "\'").unwrap(); assert_eq!(table_config.csv.escape.unwrap() as char, '\''); } + + #[cfg(feature = "parquet")] + #[test] + fn parquet_table_options() { + let mut table_config = TableOptions::new(); + table_config.set_file_format(FileType::PARQUET); + table_config + .set("format.bloom_filter_enabled::col1", "true") + .unwrap(); + assert_eq!( + table_config.parquet.column_specific_options["col1"].bloom_filter_enabled, + Some(true) + ); + } + + #[cfg(feature = "parquet")] + #[test] + fn parquet_table_options_config_entry() { + let mut table_config = TableOptions::new(); + table_config + .set("parquet.bloom_filter_enabled::col1", "true") + .unwrap(); + let entries = table_config.entries(); + assert!(entries + .iter() + .any(|item| item.key == "parquet.bloom_filter_enabled::col1")) + } } diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index 0ed7d08ef378..a5d7970495c5 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -136,7 +136,6 @@ impl fmt::Display for CopyToStatement { if !options.is_empty() { let opts: Vec<_> = options.iter().map(|(k, v)| format!("{k} {v}")).collect(); - // print them in sorted order write!(f, " OPTIONS ({})", opts.join(", "))?; } From 8c82a9416dd4143ac9b1a7fddc174c64e978ac05 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Mon, 18 Mar 2024 10:01:25 +0300 Subject: [PATCH 6/7] Merge resolve --- datafusion/sql/tests/sql_integration.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 96a6a5d131d9..b9b981bee517 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -24,20 +24,21 @@ use arrow_schema::TimeUnit::Nanosecond; use arrow_schema::*; use datafusion_common::config::ConfigOptions; use datafusion_common::{ - plan_err, DataFusionError, ParamValues, Result, ScalarValue, TableReference, + plan_err, DFSchema, DataFusionError, ParamValues, Result, ScalarValue, TableReference, }; use datafusion_expr::{ logical_plan::{LogicalPlan, Prepare}, AggregateUDF, ColumnarValue, ScalarUDF, ScalarUDFImpl, Signature, TableSource, Volatility, WindowUDF, }; +use datafusion_sql::unparser::{expr_to_sql, plan_to_sql}; use datafusion_sql::{ parser::DFParser, - planner::{ContextProvider, ParserOptions, SqlToRel}, + planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel}, }; -use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect}; use rstest::rstest; +use sqlparser::dialect::{Dialect, GenericDialect, HiveDialect, MySqlDialect}; use sqlparser::parser::Parser; #[test] From 065295f69f7498743f61e3015b2a6e82b2c71f1b Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Mon, 18 Mar 2024 11:17:13 +0300 Subject: [PATCH 7/7] Enhancing comments, solving some bugs --- datafusion-cli/src/catalog.rs | 2 +- datafusion/common/src/config.rs | 143 ++++++++++++++++-- datafusion/core/src/dataframe/mod.rs | 9 +- datafusion/core/src/dataframe/parquet.rs | 5 +- .../src/datasource/file_format/options.rs | 2 +- .../core/src/datasource/listing/table.rs | 40 ++--- .../src/datasource/listing_table_factory.rs | 4 +- datafusion/core/src/execution/context/mod.rs | 9 +- datafusion/core/src/physical_planner.rs | 2 +- datafusion/core/src/test_util/parquet.rs | 2 +- .../tests/cases/roundtrip_logical_plan.rs | 2 +- 11 files changed, 170 insertions(+), 50 deletions(-) diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index a8ecb98637cb..46dd8bb00f06 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -189,7 +189,7 @@ impl SchemaProvider for DynamicFileSchemaProvider { &state, table_url.scheme(), url, - state.default_table_options(), + &state.default_table_options(), ) .await?; state.runtime_env().register_object_store(url, store); diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index f83247aec928..968d8215ca4d 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1109,23 +1109,69 @@ macro_rules! extensions_options { } } +/// Represents the configuration options available for handling different table formats within a data processing application. +/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration +/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions. #[derive(Debug, Clone, Default)] pub struct TableOptions { + /// Configuration options for CSV file handling. This includes settings like the delimiter, + /// quote character, and whether the first row is considered as headers. pub csv: CsvOptions, + + /// Configuration options for Parquet file handling. This includes settings for compression, + /// encoding, and other Parquet-specific file characteristics. pub parquet: TableParquetOptions, + + /// Configuration options for JSON file handling. pub json: JsonOptions, + + /// The current file format that the table operations should assume. This option allows + /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON). pub current_format: Option, - /// Optional extensions registered using [`Extensions::insert`] + + /// Optional extensions that can be used to extend or customize the behavior of the table + /// options. Extensions can be registered using `Extensions::insert` and might include + /// custom file handling logic, additional configuration parameters, or other enhancements. pub extensions: Extensions, } impl ConfigField for TableOptions { + /// Visits configuration settings for the current file format, or all formats if none is selected. + /// + /// This method adapts the behavior based on whether a file format is currently selected in `current_format`. + /// If a format is selected, it visits only the settings relevant to that format. Otherwise, + /// it visits all available format settings. fn visit(&self, v: &mut V, _key_prefix: &str, _description: &'static str) { - self.csv.visit(v, "csv", ""); - self.parquet.visit(v, "parquet", ""); - self.json.visit(v, "json", ""); + if let Some(file_type) = &self.current_format { + match file_type { + #[cfg(feature = "parquet")] + FileType::PARQUET => self.parquet.visit(v, "format", ""), + FileType::CSV => self.csv.visit(v, "format", ""), + FileType::JSON => self.json.visit(v, "format", ""), + _ => {} + } + } else { + self.csv.visit(v, "csv", ""); + self.parquet.visit(v, "parquet", ""); + self.json.visit(v, "json", ""); + } } + /// Sets a configuration value for a specific key within `TableOptions`. + /// + /// This method delegates setting configuration values to the specific file format configurations, + /// based on the current format selected. If no format is selected, it returns an error. + /// + /// # Parameters + /// + /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter") + /// for CSV format. + /// * `value`: The value to set for the specified configuration key. + /// + /// # Returns + /// + /// A result indicating success or an error if the key is not recognized, if a format is not specified, + /// or if setting the configuration value fails for the specific format. fn set(&mut self, key: &str, value: &str) -> Result<()> { // Extensions are handled in the public `ConfigOptions::set` let (key, rem) = key.split_once('.').unwrap_or((key, "")); @@ -1148,28 +1194,78 @@ impl ConfigField for TableOptions { } impl TableOptions { - /// Creates a new [`ConfigOptions`] with default values + /// Constructs a new instance of `TableOptions` with default settings. + /// + /// # Returns + /// + /// A new `TableOptions` instance with default configuration values. pub fn new() -> Self { Self::default() } + /// Sets the file format for the table. + /// + /// # Parameters + /// + /// * `format`: The file format to use (e.g., CSV, Parquet). pub fn set_file_format(&mut self, format: FileType) { self.current_format = Some(format); } + /// Creates a new `TableOptions` instance initialized with settings from a given session config. + /// + /// # Parameters + /// + /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings. + /// + /// # Returns + /// + /// A new `TableOptions` instance with settings applied from the session config. pub fn default_from_session_config(config: &ConfigOptions) -> Self { - let mut initial = TableOptions::default(); - initial.parquet.global = config.execution.parquet.clone(); + let initial = TableOptions::default(); + initial.combine_with_session_config(config); initial } - /// Set extensions to provided value + /// Updates the current `TableOptions` with settings from a given session config. + /// + /// # Parameters + /// + /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied. + /// + /// # Returns + /// + /// A new `TableOptions` instance with updated settings from the session config. + pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self { + let mut clone = self.clone(); + clone.parquet.global = config.execution.parquet.clone(); + clone + } + + /// Sets the extensions for this `TableOptions` instance. + /// + /// # Parameters + /// + /// * `extensions`: The `Extensions` instance to set. + /// + /// # Returns + /// + /// A new `TableOptions` instance with the specified extensions applied. pub fn with_extensions(mut self, extensions: Extensions) -> Self { self.extensions = extensions; self } - /// Set a configuration option + /// Sets a specific configuration option. + /// + /// # Parameters + /// + /// * `key`: The configuration key (e.g., "format.delimiter"). + /// * `value`: The value to set for the specified key. + /// + /// # Returns + /// + /// A result indicating success or failure in setting the configuration option. pub fn set(&mut self, key: &str, value: &str) -> Result<()> { let (prefix, _) = key.split_once('.').ok_or_else(|| { DataFusionError::Configuration(format!( @@ -1190,6 +1286,15 @@ impl TableOptions { e.0.set(key, value) } + /// Initializes a new `TableOptions` from a hash map of string settings. + /// + /// # Parameters + /// + /// * `settings`: A hash map where each key-value pair represents a configuration setting. + /// + /// # Returns + /// + /// A result containing the new `TableOptions` instance or an error if any setting could not be applied. pub fn from_string_hash_map(settings: &HashMap) -> Result { let mut ret = Self::default(); for (k, v) in settings { @@ -1199,6 +1304,15 @@ impl TableOptions { Ok(ret) } + /// Modifies the current `TableOptions` instance with settings from a hash map. + /// + /// # Parameters + /// + /// * `settings`: A hash map where each key-value pair represents a configuration setting. + /// + /// # Returns + /// + /// A result indicating success or failure in applying the settings. pub fn alter_with_string_hash_map( &mut self, settings: &HashMap, @@ -1209,7 +1323,11 @@ impl TableOptions { Ok(()) } - /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`] + /// Retrieves all configuration entries from this `TableOptions`. + /// + /// # Returns + /// + /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`. pub fn entries(&self) -> Vec { struct Visitor(Vec); @@ -1644,12 +1762,13 @@ mod tests { #[test] fn parquet_table_options_config_entry() { let mut table_config = TableOptions::new(); + table_config.set_file_format(FileType::PARQUET); table_config - .set("parquet.bloom_filter_enabled::col1", "true") + .set("format.bloom_filter_enabled::col1", "true") .unwrap(); let entries = table_config.entries(); assert!(entries .iter() - .any(|item| item.key == "parquet.bloom_filter_enabled::col1")) + .any(|item| item.key == "format.bloom_filter_enabled::col1")) } } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 5f192b83fdd9..eaec90b8dfd4 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1161,8 +1161,8 @@ impl DataFrame { "Overwrites are not implemented for DataFrame::write_csv.".to_owned(), )); } - let table_options = self.session_state.default_table_options(); - let props = writer_options.unwrap_or_else(|| table_options.csv.clone()); + let props = writer_options + .unwrap_or_else(|| self.session_state.default_table_options().csv); let plan = LogicalPlanBuilder::copy_to( self.plan, @@ -1210,9 +1210,8 @@ impl DataFrame { )); } - let table_options = self.session_state.default_table_options(); - - let props = writer_options.unwrap_or_else(|| table_options.json.clone()); + let props = writer_options + .unwrap_or_else(|| self.session_state.default_table_options().json); let plan = LogicalPlanBuilder::copy_to( self.plan, diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index f4e8c9dfcd6f..e3f606e322fe 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -57,9 +57,8 @@ impl DataFrame { )); } - let table_options = self.session_state.default_table_options(); - - let props = writer_options.unwrap_or_else(|| table_options.parquet.clone()); + let props = writer_options + .unwrap_or_else(|| self.session_state.default_table_options().parquet); let plan = LogicalPlanBuilder::copy_to( self.plan, diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index f66683c311c1..f5bd72495d66 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -461,7 +461,7 @@ pub trait ReadOptions<'a> { return Ok(Arc::new(s.to_owned())); } - self.to_listing_options(config, state.default_table_options().clone()) + self.to_listing_options(config, state.default_table_options()) .infer_schema(&state, &table_path) .await } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 2a2551236e1b..c1e337b5c44a 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -118,7 +118,7 @@ impl ListingTableConfig { } } - fn infer_format(path: &str) -> Result<(Arc, String)> { + fn infer_file_type(path: &str) -> Result<(FileType, String)> { let err_msg = format!("Unable to infer file type from path: {path}"); let mut exts = path.rsplit('.'); @@ -139,20 +139,7 @@ impl ListingTableConfig { .get_ext_with_compression(file_compression_type.to_owned()) .map_err(|_| DataFusionError::Internal(err_msg))?; - let file_format: Arc = match file_type { - FileType::ARROW => Arc::new(ArrowFormat), - FileType::AVRO => Arc::new(AvroFormat), - FileType::CSV => Arc::new( - CsvFormat::default().with_file_compression_type(file_compression_type), - ), - FileType::JSON => Arc::new( - JsonFormat::default().with_file_compression_type(file_compression_type), - ), - #[cfg(feature = "parquet")] - FileType::PARQUET => Arc::new(ParquetFormat::default()), - }; - - Ok((file_format, ext)) + Ok((file_type, ext)) } /// Infer `ListingOptions` based on `table_path` suffix. @@ -173,10 +160,27 @@ impl ListingTableConfig { .await .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; - let (format, file_extension) = - ListingTableConfig::infer_format(file.location.as_ref())?; + let (file_type, file_extension) = + ListingTableConfig::infer_file_type(file.location.as_ref())?; + + let mut table_options = state.default_table_options(); + table_options.set_file_format(file_type.clone()); + let file_format: Arc = match file_type { + FileType::CSV => { + Arc::new(CsvFormat::default().with_options(table_options.csv)) + } + #[cfg(feature = "parquet")] + FileType::PARQUET => { + Arc::new(ParquetFormat::default().with_options(table_options.parquet)) + } + FileType::AVRO => Arc::new(AvroFormat), + FileType::JSON => { + Arc::new(JsonFormat::default().with_options(table_options.json)) + } + FileType::ARROW => Arc::new(ArrowFormat), + }; - let listing_options = ListingOptions::new(format) + let listing_options = ListingOptions::new(file_format) .with_file_extension(file_extension) .with_target_partitions(state.config().target_partitions()); diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index ef8ee708ebf1..b616e0181cfc 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -34,7 +34,6 @@ use crate::datasource::TableProvider; use crate::execution::context::SessionState; use arrow::datatypes::{DataType, SchemaRef}; -use datafusion_common::config::TableOptions; use datafusion_common::{arrow_datafusion_err, DataFusionError, FileType}; use datafusion_expr::CreateExternalTable; @@ -58,8 +57,7 @@ impl TableProviderFactory for ListingTableFactory { state: &SessionState, cmd: &CreateExternalTable, ) -> datafusion_common::Result> { - let mut table_options = - TableOptions::default_from_session_config(state.config_options()); + let mut table_options = state.default_table_options(); let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| { DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type)) })?; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 37c279b2aa15..aac42af5f373 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -384,9 +384,9 @@ impl SessionContext { self.state.read().config.clone() } - /// Return a copied version of config for this Session + /// Return a copied version of table options for this Session pub fn copied_table_options(&self) -> TableOptions { - self.state.read().default_table_options().clone() + self.state.read().default_table_options() } /// Creates a [`DataFrame`] from SQL query text. @@ -2002,8 +2002,9 @@ impl SessionState { } /// return the TableOptions options with its extensions - pub fn default_table_options(&self) -> &TableOptions { - &self.table_option_namespace + pub fn default_table_options(&self) -> TableOptions { + self.table_option_namespace + .combine_with_session_config(self.config_options()) } /// Get a new TaskContext to run in this session diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 96f5e1c3ffd3..ee581ca64214 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -595,7 +595,7 @@ impl DefaultPhysicalPlanner { table_partition_cols, overwrite: false, }; - let mut table_options = session_state.default_table_options().clone(); + let mut table_options = session_state.default_table_options(); let sink_format: Arc = match format_options { FormatOptions::CSV(options) => { table_options.csv = options.clone(); diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs index 7a466a666d8d..8113d799a184 100644 --- a/datafusion/core/src/test_util/parquet.rs +++ b/datafusion/core/src/test_util/parquet.rs @@ -165,7 +165,7 @@ impl TestParquetFile { // run coercion on the filters to coerce types etc. let props = ExecutionProps::new(); let context = SimplifyContext::new(&props).with_schema(df_schema.clone()); - let parquet_options = ctx.state().default_table_options().parquet.clone(); + let parquet_options = ctx.copied_table_options().parquet; if let Some(filter) = maybe_filter { let simplifier = ExprSimplifier::new(context); let filter = simplifier.coerce(filter, df_schema.clone()).unwrap(); diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index a448578f280a..f53904926f55 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -314,7 +314,7 @@ async fn roundtrip_logical_plan_copy_to_sql_options() -> Result<()> { let ctx = SessionContext::new(); let input = create_csv_scan(&ctx).await?; - let mut table_options = ctx.state().default_table_options().clone(); + let mut table_options = ctx.copied_table_options(); table_options.set_file_format(FileType::CSV); table_options.set("format.delimiter", ";")?;