From ac2287b70087d1832b1e2cf153b2c169113b8dfd Mon Sep 17 00:00:00 2001 From: Benjamin Bannier Date: Sun, 12 May 2024 12:21:41 +0200 Subject: [PATCH] Add support for reading CSV files with comments This patch adds support for parsing CSV files containing comment lines. Closes #10262. --- datafusion-examples/examples/csv_opener.rs | 1 + datafusion/common/src/config.rs | 1 + .../core/src/datasource/file_format/csv.rs | 13 +++++++++- .../src/datasource/file_format/options.rs | 10 ++++++++ .../core/src/datasource/physical_plan/csv.rs | 22 +++++++++++++++++ .../enforce_distribution.rs | 3 +++ .../physical_optimizer/projection_pushdown.rs | 3 +++ .../replace_with_order_preserving_variants.rs | 1 + datafusion/core/src/test/mod.rs | 3 +++ .../proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 1 + .../proto-common/src/generated/pbjson.rs | 20 ++++++++++++++++ .../proto-common/src/generated/prost.rs | 3 +++ datafusion/proto-common/src/to_proto/mod.rs | 1 + datafusion/proto/proto/datafusion.proto | 3 +++ .../src/generated/datafusion_proto_common.rs | 3 +++ datafusion/proto/src/generated/pbjson.rs | 21 ++++++++++++++++ datafusion/proto/src/generated/prost.rs | 8 +++++++ datafusion/proto/src/physical_plan/mod.rs | 15 ++++++++++++ .../sqllogictest/test_files/csv_files.slt | 24 +++++++++++++++++++ 20 files changed, 156 insertions(+), 1 deletion(-) diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs index d02aa9b3088c..1f45026a214d 100644 --- a/datafusion-examples/examples/csv_opener.rs +++ b/datafusion-examples/examples/csv_opener.rs @@ -48,6 +48,7 @@ async fn main() -> Result<()> { b',', b'"', object_store, + Some(b'#'), ); let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED); diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index a4f937b6e2a3..1c431d04cd35 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1567,6 +1567,7 @@ config_namespace! { pub timestamp_tz_format: Option, default = None pub time_format: Option, default = None pub null_value: Option, default = None + pub comment: Option, default = None } } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 60192b7f771d..2139b35621f2 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -147,6 +147,12 @@ impl CsvFormat { self.options.has_header } + /// Lines beginning with this byte are ignored. + pub fn with_comment(mut self, comment: Option) -> Self { + self.options.comment = comment; + self + } + /// The character separating values within a row. /// - default to ',' pub fn with_delimiter(mut self, delimiter: u8) -> Self { @@ -252,6 +258,7 @@ impl FileFormat for CsvFormat { self.options.delimiter, self.options.quote, self.options.escape, + self.options.comment, self.options.compression.into(), ); Ok(Arc::new(exec)) @@ -300,7 +307,7 @@ impl CsvFormat { pin_mut!(stream); while let Some(chunk) = stream.next().await.transpose()? { - let format = arrow::csv::reader::Format::default() + let mut format = arrow::csv::reader::Format::default() .with_header( first_chunk && self @@ -310,6 +317,10 @@ impl CsvFormat { ) .with_delimiter(self.options.delimiter); + if let Some(comment) = self.options.comment { + format = format.with_comment(comment); + } + let (Schema { fields, .. }, records_read) = format.infer_schema(chunk.reader(), Some(records_to_read))?; diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index f5bd72495d66..c6d143ed6749 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -61,6 +61,8 @@ pub struct CsvReadOptions<'a> { pub quote: u8, /// An optional escape character. Defaults to None. pub escape: Option, + /// If enabled, lines beginning with this byte are ignored. + pub comment: Option, /// An optional schema representing the CSV files. If None, CSV reader will try to infer it /// based on data in file. pub schema: Option<&'a Schema>, @@ -97,6 +99,7 @@ impl<'a> CsvReadOptions<'a> { table_partition_cols: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, file_sort_order: vec![], + comment: None, } } @@ -106,6 +109,12 @@ impl<'a> CsvReadOptions<'a> { self } + /// Specify comment char to use for CSV read + pub fn comment(mut self, comment: u8) -> Self { + self.comment = Some(comment); + self + } + /// Specify delimiter to use for CSV read pub fn delimiter(mut self, delimiter: u8) -> Self { self.delimiter = delimiter; @@ -477,6 +486,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { let file_format = CsvFormat::default() .with_options(table_options.csv) .with_has_header(self.has_header) + .with_comment(self.comment) .with_delimiter(self.delimiter) .with_quote(self.quote) .with_escape(self.escape) diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 8203e414de97..c06c630c45d1 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -58,6 +58,7 @@ pub struct CsvExec { delimiter: u8, quote: u8, escape: Option, + comment: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Compression type of the file associated with CsvExec @@ -73,6 +74,7 @@ impl CsvExec { delimiter: u8, quote: u8, escape: Option, + comment: Option, file_compression_type: FileCompressionType, ) -> Self { let (projected_schema, projected_statistics, projected_output_ordering) = @@ -92,6 +94,7 @@ impl CsvExec { metrics: ExecutionPlanMetricsSet::new(), file_compression_type, cache, + comment, } } @@ -113,6 +116,11 @@ impl CsvExec { self.quote } + /// Lines beginning with this byte are ignored. + pub fn comment(&self) -> Option { + self.comment + } + /// The escape character pub fn escape(&self) -> Option { self.escape @@ -234,6 +242,7 @@ impl ExecutionPlan for CsvExec { quote: self.quote, escape: self.escape, object_store, + comment: self.comment, }); let opener = CsvOpener { @@ -265,9 +274,11 @@ pub struct CsvConfig { quote: u8, escape: Option, object_store: Arc, + comment: Option, } impl CsvConfig { + #[allow(clippy::too_many_arguments)] /// Returns a [`CsvConfig`] pub fn new( batch_size: usize, @@ -277,6 +288,7 @@ impl CsvConfig { delimiter: u8, quote: u8, object_store: Arc, + comment: Option, ) -> Self { Self { batch_size, @@ -287,6 +299,7 @@ impl CsvConfig { quote, escape: None, object_store, + comment, } } } @@ -309,6 +322,9 @@ impl CsvConfig { if let Some(escape) = self.escape { builder = builder.with_escape(escape) } + if let Some(comment) = self.comment { + builder = builder.with_comment(comment); + } builder } @@ -570,6 +586,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -636,6 +653,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -702,6 +720,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -765,6 +784,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(14, csv.base_config.file_schema.fields().len()); @@ -827,6 +847,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -921,6 +942,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 88fa3a978af7..f7c2aee578ba 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1471,6 +1471,7 @@ pub(crate) mod tests { b',', b'"', None, + None, FileCompressionType::UNCOMPRESSED, )) } @@ -1494,6 +1495,7 @@ pub(crate) mod tests { b',', b'"', None, + None, FileCompressionType::UNCOMPRESSED, )) } @@ -3767,6 +3769,7 @@ pub(crate) mod tests { b',', b'"', None, + None, compression_type, )), vec![("a".to_string(), "a".to_string())], diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 335632ab6efa..70524dfcea7d 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -185,6 +185,7 @@ fn try_swapping_with_csv( csv.delimiter(), csv.quote(), csv.escape(), + csv.comment(), csv.file_compression_type, )) as _ }) @@ -1686,6 +1687,7 @@ mod tests { 0, 0, None, + None, FileCompressionType::UNCOMPRESSED, )) } @@ -1708,6 +1710,7 @@ mod tests { 0, 0, None, + None, FileCompressionType::UNCOMPRESSED, )) } diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index e3ef3b95aa06..013155b8400a 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -1502,6 +1502,7 @@ mod tests { 0, b'"', None, + None, FileCompressionType::UNCOMPRESSED, )) } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index f76e1bb60b53..e91f83f1199b 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -98,6 +98,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result for CsvOptions { .then(|| proto_opts.time_format.clone()), null_value: (!proto_opts.null_value.is_empty()) .then(|| proto_opts.null_value.clone()), + comment: proto_opts.comment.first().copied(), }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 6b2372433684..6f8409b82afe 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -1850,6 +1850,9 @@ impl serde::Serialize for CsvOptions { if !self.null_value.is_empty() { len += 1; } + if !self.comment.is_empty() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.CsvOptions", len)?; if !self.has_header.is_empty() { #[allow(clippy::needless_borrow)] @@ -1894,6 +1897,10 @@ impl serde::Serialize for CsvOptions { if !self.null_value.is_empty() { struct_ser.serialize_field("nullValue", &self.null_value)?; } + if !self.comment.is_empty() { + #[allow(clippy::needless_borrow)] + struct_ser.serialize_field("comment", pbjson::private::base64::encode(&self.comment).as_str())?; + } struct_ser.end() } } @@ -1924,6 +1931,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "timeFormat", "null_value", "nullValue", + "comment", ]; #[allow(clippy::enum_variant_names)] @@ -1940,6 +1948,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { TimestampTzFormat, TimeFormat, NullValue, + Comment, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -1973,6 +1982,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "timestampTzFormat" | "timestamp_tz_format" => Ok(GeneratedField::TimestampTzFormat), "timeFormat" | "time_format" => Ok(GeneratedField::TimeFormat), "nullValue" | "null_value" => Ok(GeneratedField::NullValue), + "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -2004,6 +2014,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut timestamp_tz_format__ = None; let mut time_format__ = None; let mut null_value__ = None; + let mut comment__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -2088,6 +2099,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { } null_value__ = Some(map_.next_value()?); } + GeneratedField::Comment => { + if comment__.is_some() { + return Err(serde::de::Error::duplicate_field("comment")); + } + comment__ = + Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } } } Ok(CsvOptions { @@ -2103,6 +2122,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { timestamp_tz_format: timestamp_tz_format__.unwrap_or_default(), time_format: time_format__.unwrap_or_default(), null_value: null_value__.unwrap_or_default(), + comment: comment__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 48da143bc7ed..ff17a40738b5 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -608,6 +608,9 @@ pub struct CsvOptions { /// Optional representation of null value #[prost(string, tag = "12")] pub null_value: ::prost::alloc::string::String, + /// Optional comment character as a byte + #[prost(bytes = "vec", tag = "13")] + pub comment: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 28f6952aac44..8e7ee9a7d6fa 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -894,6 +894,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(), time_format: opts.time_format.clone().unwrap_or_default(), null_value: opts.null_value.clone().unwrap_or_default(), + comment: opts.comment.map_or_else(Vec::new, |h| vec![h]), }) } } diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 5aaae07f4d68..8e1fce319323 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -994,6 +994,9 @@ message CsvScanExecNode { oneof optional_escape { string escape = 5; } + oneof optional_comment { + string comment = 6; + } } message AvroScanExecNode { diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 48da143bc7ed..ff17a40738b5 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -608,6 +608,9 @@ pub struct CsvOptions { /// Optional representation of null value #[prost(string, tag = "12")] pub null_value: ::prost::alloc::string::String, + /// Optional comment character as a byte + #[prost(bytes = "vec", tag = "13")] + pub comment: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index cd754e4d9f27..7d4b083e5bd6 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3698,6 +3698,9 @@ impl serde::Serialize for CsvScanExecNode { if self.optional_escape.is_some() { len += 1; } + if self.optional_comment.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.CsvScanExecNode", len)?; if let Some(v) = self.base_conf.as_ref() { struct_ser.serialize_field("baseConf", v)?; @@ -3718,6 +3721,13 @@ impl serde::Serialize for CsvScanExecNode { } } } + if let Some(v) = self.optional_comment.as_ref() { + match v { + csv_scan_exec_node::OptionalComment::Comment(v) => { + struct_ser.serialize_field("comment", v)?; + } + } + } struct_ser.end() } } @@ -3735,6 +3745,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "delimiter", "quote", "escape", + "comment", ]; #[allow(clippy::enum_variant_names)] @@ -3744,6 +3755,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { Delimiter, Quote, Escape, + Comment, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -3770,6 +3782,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "delimiter" => Ok(GeneratedField::Delimiter), "quote" => Ok(GeneratedField::Quote), "escape" => Ok(GeneratedField::Escape), + "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -3794,6 +3807,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { let mut delimiter__ = None; let mut quote__ = None; let mut optional_escape__ = None; + let mut optional_comment__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::BaseConf => { @@ -3826,6 +3840,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { } optional_escape__ = map_.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalEscape::Escape); } + GeneratedField::Comment => { + if optional_comment__.is_some() { + return Err(serde::de::Error::duplicate_field("comment")); + } + optional_comment__ = map_.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalComment::Comment); + } } } Ok(CsvScanExecNode { @@ -3834,6 +3854,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { delimiter: delimiter__.unwrap_or_default(), quote: quote__.unwrap_or_default(), optional_escape: optional_escape__, + optional_comment: optional_comment__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 1b38168ba1d2..4874b6b9393f 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1530,6 +1530,8 @@ pub struct CsvScanExecNode { pub quote: ::prost::alloc::string::String, #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")] pub optional_escape: ::core::option::Option, + #[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")] + pub optional_comment: ::core::option::Option, } /// Nested message and enum types in `CsvScanExecNode`. pub mod csv_scan_exec_node { @@ -1539,6 +1541,12 @@ pub mod csv_scan_exec_node { #[prost(string, tag = "5")] Escape(::prost::alloc::string::String), } + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum OptionalComment { + #[prost(string, tag = "6")] + Comment(::prost::alloc::string::String), + } } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 550176a42e66..d0011e4917bf 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -203,6 +203,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } else { None }, + if let Some(protobuf::csv_scan_exec_node::OptionalComment::Comment( + comment, + )) = &scan.optional_comment + { + Some(str_to_byte(comment, "comment")?) + } else { + None + }, FileCompressionType::UNCOMPRESSED, ))), #[cfg(feature = "parquet")] @@ -1558,6 +1566,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } else { None }, + optional_comment: if let Some(comment) = exec.comment() { + Some(protobuf::csv_scan_exec_node::OptionalComment::Comment( + byte_to_string(comment, "comment")?, + )) + } else { + None + }, }, )), }); diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index f581fa9abcbb..8902b3eebf24 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -202,3 +202,27 @@ select * from stored_table_with_necessary_quoting; 2 f|f|f 3 g|g|g 4 h|h|h + +# Read CSV file with comments +statement ok +COPY (VALUES + ('column1,column2'), + ('#second line is a comment'), + ('2,3')) +TO 'test_files/scratch/csv_files/file_with_comments.csv' +OPTIONS ('format.delimiter' '|'); + +statement ok +CREATE EXTERNAL TABLE stored_table_with_comments ( + c1 VARCHAR, + c2 VARCHAR +) STORED AS CSV +LOCATION 'test_files/scratch/csv_files/file_with_comments.csv' +OPTIONS ('format.comment' '#', + 'format.delimiter' ','); + +query TT +SELECT * from stored_table_with_comments; +---- +column1 column2 +2 3