diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs index 96753c8c5260..fd1c485eec39 100644 --- a/datafusion-examples/examples/csv_opener.rs +++ b/datafusion-examples/examples/csv_opener.rs @@ -49,6 +49,7 @@ async fn main() -> Result<()> { b',', b'"', object_store, + Some(b'#'), ); let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED); diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 18cd83a47097..d2ef500d03fc 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -1570,12 +1570,13 @@ config_namespace! { pub escape: Option, default = None pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED pub schema_infer_max_rec: usize, default = 100 - pub date_format: Option, default = None - pub datetime_format: Option, default = None - pub timestamp_format: Option, default = None - pub timestamp_tz_format: Option, default = None - pub time_format: Option, default = None - pub null_value: Option, default = None + pub date_format: Option, default = None + pub datetime_format: Option, default = None + pub timestamp_format: Option, default = None + pub timestamp_tz_format: Option, default = None + pub time_format: Option, default = None + pub null_value: Option, default = None + pub comment: Option, default = None } } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 645f98cd3fb0..9b132f4b524a 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -140,6 +140,12 @@ impl CsvFormat { self } + /// Lines beginning with this byte are ignored. + pub fn with_comment(mut self, comment: Option) -> Self { + self.options.comment = comment; + self + } + /// True if the first line is a header. pub fn has_header(&self) -> bool { self.options.has_header @@ -246,6 +252,7 @@ impl FileFormat for CsvFormat { self.options.delimiter, self.options.quote, self.options.escape, + self.options.comment, self.options.compression.into(), ); Ok(Arc::new(exec)) @@ -297,10 +304,14 @@ impl CsvFormat { pin_mut!(stream); while let Some(chunk) = stream.next().await.transpose()? { - let format = arrow::csv::reader::Format::default() + let mut format = arrow::csv::reader::Format::default() .with_header(self.options.has_header && first_chunk) .with_delimiter(self.options.delimiter); + if let Some(comment) = self.options.comment { + format = format.with_comment(comment); + } + let (Schema { fields, .. }, records_read) = format.infer_schema(chunk.reader(), Some(records_to_read))?; diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index f5bd72495d66..46c5a7eb985c 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -61,6 +61,8 @@ pub struct CsvReadOptions<'a> { pub quote: u8, /// An optional escape character. Defaults to None. pub escape: Option, + /// If enabled, lines beginning with this byte are ignored. + pub comment: Option, /// An optional schema representing the CSV files. If None, CSV reader will try to infer it /// based on data in file. pub schema: Option<&'a Schema>, @@ -97,6 +99,7 @@ impl<'a> CsvReadOptions<'a> { table_partition_cols: vec![], file_compression_type: FileCompressionType::UNCOMPRESSED, file_sort_order: vec![], + comment: None, } } @@ -106,6 +109,12 @@ impl<'a> CsvReadOptions<'a> { self } + /// Specify comment char to use for CSV read + pub fn comment(mut self, comment: Option) -> Self { + self.comment = comment; + self + } + /// Specify delimiter to use for CSV read pub fn delimiter(mut self, delimiter: u8) -> Self { self.delimiter = delimiter; @@ -477,6 +486,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> { let file_format = CsvFormat::default() .with_options(table_options.csv) .with_has_header(self.has_header) + .with_comment(self.comment) .with_delimiter(self.delimiter) .with_quote(self.quote) .with_escape(self.escape) diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs index 879461c2eb1e..2ac671015339 100644 --- a/datafusion/core/src/datasource/physical_plan/csv.rs +++ b/datafusion/core/src/datasource/physical_plan/csv.rs @@ -58,6 +58,7 @@ pub struct CsvExec { delimiter: u8, quote: u8, escape: Option, + comment: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, /// Compression type of the file associated with CsvExec @@ -73,6 +74,7 @@ impl CsvExec { delimiter: u8, quote: u8, escape: Option, + comment: Option, file_compression_type: FileCompressionType, ) -> Self { let (projected_schema, projected_statistics, projected_output_ordering) = @@ -92,6 +94,7 @@ impl CsvExec { metrics: ExecutionPlanMetricsSet::new(), file_compression_type, cache, + comment, } } @@ -113,6 +116,11 @@ impl CsvExec { self.quote } + /// Lines beginning with this byte are ignored. + pub fn comment(&self) -> Option { + self.comment + } + /// The escape character pub fn escape(&self) -> Option { self.escape @@ -234,6 +242,7 @@ impl ExecutionPlan for CsvExec { quote: self.quote, escape: self.escape, object_store, + comment: self.comment, }); let opener = CsvOpener { @@ -265,6 +274,7 @@ pub struct CsvConfig { quote: u8, escape: Option, object_store: Arc, + comment: Option, } impl CsvConfig { @@ -277,6 +287,7 @@ impl CsvConfig { delimiter: u8, quote: u8, object_store: Arc, + comment: Option, ) -> Self { Self { batch_size, @@ -287,6 +298,7 @@ impl CsvConfig { quote, escape: None, object_store, + comment, } } } @@ -309,6 +321,9 @@ impl CsvConfig { if let Some(escape) = self.escape { builder = builder.with_escape(escape) } + if let Some(comment) = self.comment { + builder = builder.with_comment(comment); + } builder } @@ -570,6 +585,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -635,6 +651,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -700,6 +717,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -763,6 +781,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(14, csv.base_config.file_schema.fields().len()); @@ -825,6 +844,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); assert_eq!(13, csv.base_config.file_schema.fields().len()); @@ -919,6 +939,7 @@ mod tests { b',', b'"', None, + None, file_compression_type.to_owned(), ); diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index c07f2c5dcf24..a673d755c7eb 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1496,6 +1496,7 @@ pub(crate) mod tests { b',', b'"', None, + None, FileCompressionType::UNCOMPRESSED, )) } @@ -1526,6 +1527,7 @@ pub(crate) mod tests { b',', b'"', None, + None, FileCompressionType::UNCOMPRESSED, )) } @@ -3803,6 +3805,7 @@ pub(crate) mod tests { b',', b'"', None, + None, compression_type, )), vec![("a".to_string(), "a".to_string())], diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs index 0190f35cc97b..df010e8cb32e 100644 --- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs @@ -185,6 +185,7 @@ fn try_swapping_with_csv( csv.delimiter(), csv.quote(), csv.escape(), + csv.comment(), csv.file_compression_type, )) as _ }) @@ -1694,6 +1695,7 @@ mod tests { 0, 0, None, + None, FileCompressionType::UNCOMPRESSED, )) } @@ -1720,6 +1722,7 @@ mod tests { 0, 0, None, + None, FileCompressionType::UNCOMPRESSED, )) } diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index f69c0df32e8a..757d48ed09ee 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -1508,6 +1508,7 @@ mod tests { 0, b'"', None, + None, FileCompressionType::UNCOMPRESSED, )) } diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index 1152c70d4391..1857ac8e1810 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -98,6 +98,7 @@ pub fn scan_partitioned_csv(partitions: usize, work_dir: &Path) -> Result serde::Deserialize<'de> for CsvOptions { "timeFormat", "null_value", "nullValue", + "comment", ]; #[allow(clippy::enum_variant_names)] @@ -5581,6 +5589,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { TimestampTzFormat, TimeFormat, NullValue, + Comment, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5614,6 +5623,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { "timestampTzFormat" | "timestamp_tz_format" => Ok(GeneratedField::TimestampTzFormat), "timeFormat" | "time_format" => Ok(GeneratedField::TimeFormat), "nullValue" | "null_value" => Ok(GeneratedField::NullValue), + "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5645,6 +5655,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { let mut timestamp_tz_format__ = None; let mut time_format__ = None; let mut null_value__ = None; + let mut comment__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::HasHeader => { @@ -5727,6 +5738,14 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { } null_value__ = Some(map_.next_value()?); } + GeneratedField::Comment => { + if comment__.is_some() { + return Err(serde::de::Error::duplicate_field("comment")); + } + comment__ = + Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0) + ; + } } } Ok(CsvOptions { @@ -5742,6 +5761,7 @@ impl<'de> serde::Deserialize<'de> for CsvOptions { timestamp_tz_format: timestamp_tz_format__.unwrap_or_default(), time_format: time_format__.unwrap_or_default(), null_value: null_value__.unwrap_or_default(), + comment: comment__.unwrap_or_default(), }) } } @@ -5771,6 +5791,9 @@ impl serde::Serialize for CsvScanExecNode { if self.optional_escape.is_some() { len += 1; } + if self.optional_comment.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.CsvScanExecNode", len)?; if let Some(v) = self.base_conf.as_ref() { struct_ser.serialize_field("baseConf", v)?; @@ -5791,6 +5814,13 @@ impl serde::Serialize for CsvScanExecNode { } } } + if let Some(v) = self.optional_comment.as_ref() { + match v { + csv_scan_exec_node::OptionalComment::Comment(v) => { + struct_ser.serialize_field("comment", v)?; + } + } + } struct_ser.end() } } @@ -5808,6 +5838,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "delimiter", "quote", "escape", + "comment", ]; #[allow(clippy::enum_variant_names)] @@ -5817,6 +5848,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { Delimiter, Quote, Escape, + Comment, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -5843,6 +5875,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { "delimiter" => Ok(GeneratedField::Delimiter), "quote" => Ok(GeneratedField::Quote), "escape" => Ok(GeneratedField::Escape), + "comment" => Ok(GeneratedField::Comment), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5867,6 +5900,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { let mut delimiter__ = None; let mut quote__ = None; let mut optional_escape__ = None; + let mut optional_comment__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::BaseConf => { @@ -5899,6 +5933,12 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { } optional_escape__ = map_.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalEscape::Escape); } + GeneratedField::Comment => { + if optional_comment__.is_some() { + return Err(serde::de::Error::duplicate_field("comment")); + } + optional_comment__ = map_.next_value::<::std::option::Option<_>>()?.map(csv_scan_exec_node::OptionalComment::Comment); + } } } Ok(CsvScanExecNode { @@ -5907,6 +5947,7 @@ impl<'de> serde::Deserialize<'de> for CsvScanExecNode { delimiter: delimiter__.unwrap_or_default(), quote: quote__.unwrap_or_default(), optional_escape: optional_escape__, + optional_comment: optional_comment__, }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 706794e38070..3ce20432e43b 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -165,9 +165,7 @@ pub struct ListingTableScanNode { #[prost(message, repeated, tag = "13")] pub file_sort_order: ::prost::alloc::vec::Vec, #[prost(oneof = "listing_table_scan_node::FileFormatType", tags = "10, 11, 12")] - pub file_format_type: ::core::option::Option< - listing_table_scan_node::FileFormatType, - >, + pub file_format_type: ::core::option::Option, } /// Nested message and enum types in `ListingTableScanNode`. pub mod listing_table_scan_node { @@ -352,10 +350,8 @@ pub struct CreateExternalTableNode { #[prost(message, optional, tag = "15")] pub constraints: ::core::option::Option, #[prost(map = "string, message", tag = "16")] - pub column_defaults: ::std::collections::HashMap< - ::prost::alloc::string::String, - LogicalExprNode, - >, + pub column_defaults: + ::std::collections::HashMap<::prost::alloc::string::String, LogicalExprNode>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -1479,7 +1475,10 @@ pub struct OptimizedPhysicalPlanType { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct PlanType { - #[prost(oneof = "plan_type::PlanTypeEnum", tags = "1, 7, 8, 2, 3, 4, 9, 5, 6, 10")] + #[prost( + oneof = "plan_type::PlanTypeEnum", + tags = "1, 7, 8, 2, 3, 4, 9, 5, 6, 10" + )] pub plan_type_enum: ::core::option::Option, } /// Nested message and enum types in `PlanType`. @@ -1545,9 +1544,7 @@ pub struct FullTableReference { #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableReference { #[prost(oneof = "table_reference::TableReferenceEnum", tags = "1, 2, 3")] - pub table_reference_enum: ::core::option::Option< - table_reference::TableReferenceEnum, - >, + pub table_reference_enum: ::core::option::Option, } /// Nested message and enum types in `TableReference`. pub mod table_reference { @@ -1719,6 +1716,9 @@ pub struct CsvOptions { /// Optional representation of null value #[prost(string, tag = "12")] pub null_value: ::prost::alloc::string::String, + /// Optional comment character as a byte + #[prost(bytes = "vec", tag = "13")] + pub comment: ::prost::alloc::vec::Vec, } /// Options controlling CSV format #[allow(clippy::derive_partial_eq_without_eq)] @@ -1807,29 +1807,25 @@ pub struct ColumnSpecificOptions { #[derive(Clone, PartialEq, ::prost::Message)] pub struct ColumnOptions { #[prost(oneof = "column_options::BloomFilterEnabledOpt", tags = "1")] - pub bloom_filter_enabled_opt: ::core::option::Option< - column_options::BloomFilterEnabledOpt, - >, + pub bloom_filter_enabled_opt: + ::core::option::Option, #[prost(oneof = "column_options::EncodingOpt", tags = "2")] pub encoding_opt: ::core::option::Option, #[prost(oneof = "column_options::DictionaryEnabledOpt", tags = "3")] - pub dictionary_enabled_opt: ::core::option::Option< - column_options::DictionaryEnabledOpt, - >, + pub dictionary_enabled_opt: + ::core::option::Option, #[prost(oneof = "column_options::CompressionOpt", tags = "4")] pub compression_opt: ::core::option::Option, #[prost(oneof = "column_options::StatisticsEnabledOpt", tags = "5")] - pub statistics_enabled_opt: ::core::option::Option< - column_options::StatisticsEnabledOpt, - >, + pub statistics_enabled_opt: + ::core::option::Option, #[prost(oneof = "column_options::BloomFilterFppOpt", tags = "6")] pub bloom_filter_fpp_opt: ::core::option::Option, #[prost(oneof = "column_options::BloomFilterNdvOpt", tags = "7")] pub bloom_filter_ndv_opt: ::core::option::Option, #[prost(oneof = "column_options::MaxStatisticsSizeOpt", tags = "8")] - pub max_statistics_size_opt: ::core::option::Option< - column_options::MaxStatisticsSizeOpt, - >, + pub max_statistics_size_opt: + ::core::option::Option, } /// Nested message and enum types in `ColumnOptions`. pub mod column_options { @@ -1937,27 +1933,22 @@ pub struct ParquetOptions { #[prost(string, tag = "16")] pub created_by: ::prost::alloc::string::String, #[prost(oneof = "parquet_options::MetadataSizeHintOpt", tags = "4")] - pub metadata_size_hint_opt: ::core::option::Option< - parquet_options::MetadataSizeHintOpt, - >, + pub metadata_size_hint_opt: + ::core::option::Option, #[prost(oneof = "parquet_options::CompressionOpt", tags = "10")] pub compression_opt: ::core::option::Option, #[prost(oneof = "parquet_options::DictionaryEnabledOpt", tags = "11")] - pub dictionary_enabled_opt: ::core::option::Option< - parquet_options::DictionaryEnabledOpt, - >, + pub dictionary_enabled_opt: + ::core::option::Option, #[prost(oneof = "parquet_options::StatisticsEnabledOpt", tags = "13")] - pub statistics_enabled_opt: ::core::option::Option< - parquet_options::StatisticsEnabledOpt, - >, + pub statistics_enabled_opt: + ::core::option::Option, #[prost(oneof = "parquet_options::MaxStatisticsSizeOpt", tags = "14")] - pub max_statistics_size_opt: ::core::option::Option< - parquet_options::MaxStatisticsSizeOpt, - >, + pub max_statistics_size_opt: + ::core::option::Option, #[prost(oneof = "parquet_options::ColumnIndexTruncateLengthOpt", tags = "17")] - pub column_index_truncate_length_opt: ::core::option::Option< - parquet_options::ColumnIndexTruncateLengthOpt, - >, + pub column_index_truncate_length_opt: + ::core::option::Option, #[prost(oneof = "parquet_options::EncodingOpt", tags = "19")] pub encoding_opt: ::core::option::Option, #[prost(oneof = "parquet_options::BloomFilterFppOpt", tags = "21")] @@ -2127,10 +2118,12 @@ pub struct PhysicalAggregateExprNode { pub ordering_req: ::prost::alloc::vec::Vec, #[prost(bool, tag = "3")] pub distinct: bool, - #[prost(oneof = "physical_aggregate_expr_node::AggregateFunction", tags = "1, 4")] - pub aggregate_function: ::core::option::Option< - physical_aggregate_expr_node::AggregateFunction, - >, + #[prost( + oneof = "physical_aggregate_expr_node::AggregateFunction", + tags = "1, 4" + )] + pub aggregate_function: + ::core::option::Option, } /// Nested message and enum types in `PhysicalAggregateExprNode`. pub mod physical_aggregate_expr_node { @@ -2157,9 +2150,8 @@ pub struct PhysicalWindowExprNode { #[prost(string, tag = "8")] pub name: ::prost::alloc::string::String, #[prost(oneof = "physical_window_expr_node::WindowFunction", tags = "1, 2")] - pub window_function: ::core::option::Option< - physical_window_expr_node::WindowFunction, - >, + pub window_function: + ::core::option::Option, } /// Nested message and enum types in `PhysicalWindowExprNode`. pub mod physical_window_expr_node { @@ -2361,6 +2353,8 @@ pub struct CsvScanExecNode { pub quote: ::prost::alloc::string::String, #[prost(oneof = "csv_scan_exec_node::OptionalEscape", tags = "5")] pub optional_escape: ::core::option::Option, + #[prost(oneof = "csv_scan_exec_node::OptionalComment", tags = "6")] + pub optional_comment: ::core::option::Option, } /// Nested message and enum types in `CsvScanExecNode`. pub mod csv_scan_exec_node { @@ -2370,6 +2364,12 @@ pub mod csv_scan_exec_node { #[prost(string, tag = "5")] Escape(::prost::alloc::string::String), } + #[allow(clippy::derive_partial_eq_without_eq)] + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum OptionalComment { + #[prost(string, tag = "6")] + Comment(::prost::alloc::string::String), + } } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -2752,7 +2752,9 @@ pub struct ColumnStats { #[prost(message, optional, tag = "4")] pub distinct_count: ::core::option::Option, } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum JoinType { Inner = 0, @@ -2796,7 +2798,9 @@ impl JoinType { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum JoinConstraint { On = 0, @@ -2822,7 +2826,9 @@ impl JoinConstraint { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum AggregateFunction { Min = 0, @@ -2955,7 +2961,9 @@ impl AggregateFunction { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum BuiltInWindowFunction { RowNumber = 0, @@ -3008,7 +3016,9 @@ impl BuiltInWindowFunction { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum WindowFrameUnits { Rows = 0, @@ -3037,7 +3047,9 @@ impl WindowFrameUnits { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum WindowFrameBoundType { CurrentRow = 0, @@ -3066,7 +3078,9 @@ impl WindowFrameBoundType { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum DateUnit { Day = 0, @@ -3092,7 +3106,9 @@ impl DateUnit { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum TimeUnit { Second = 0, @@ -3124,7 +3140,9 @@ impl TimeUnit { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum IntervalUnit { YearMonth = 0, @@ -3153,7 +3171,9 @@ impl IntervalUnit { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum UnionMode { Sparse = 0, @@ -3179,7 +3199,9 @@ impl UnionMode { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum CompressionTypeVariant { Gzip = 0, @@ -3214,7 +3236,9 @@ impl CompressionTypeVariant { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum PartitionMode { CollectLeft = 0, @@ -3243,7 +3267,9 @@ impl PartitionMode { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum StreamPartitionMode { SinglePartition = 0, @@ -3269,7 +3295,9 @@ impl StreamPartitionMode { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum AggregateMode { Partial = 0, @@ -3304,7 +3332,9 @@ impl AggregateMode { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum JoinSide { LeftSide = 0, @@ -3330,7 +3360,9 @@ impl JoinSide { } } } -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[derive( + Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, +)] #[repr(i32)] pub enum PrecisionInfo { Exact = 0, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 4bd07fae497f..4a987090868d 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -807,6 +807,7 @@ impl TryFrom<&protobuf::CsvOptions> for CsvOptions { Ok(CsvOptions { has_header: proto_opts.has_header, delimiter: proto_opts.delimiter[0], + comment: proto_opts.comment.first().copied(), quote: proto_opts.quote[0], escape: proto_opts.escape.first().copied(), compression: proto_opts.compression().into(), diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 1c5ba861d297..926991d544d5 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -204,6 +204,14 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } else { None }, + if let Some(protobuf::csv_scan_exec_node::OptionalComment::Comment( + comment, + )) = &scan.optional_comment + { + Some(str_to_byte(comment, "comment")?) + } else { + None + }, FileCompressionType::UNCOMPRESSED, ))), #[cfg(feature = "parquet")] @@ -1577,6 +1585,13 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { } else { None }, + optional_comment: if let Some(comment) = exec.comment() { + Some(protobuf::csv_scan_exec_node::OptionalComment::Comment( + byte_to_string(comment, "comment")?, + )) + } else { + None + }, }, )), }); diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 162a2f28e16b..d685805addd0 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -985,6 +985,7 @@ impl TryFrom<&CsvOptions> for protobuf::CsvOptions { timestamp_tz_format: opts.timestamp_tz_format.clone().unwrap_or_default(), time_format: opts.time_format.clone().unwrap_or_default(), null_value: opts.null_value.clone().unwrap_or_default(), + comment: opts.comment.map_or_else(Vec::new, |c| vec![c]), }) } }