diff --git a/Cargo.lock b/Cargo.lock index 90e4745ba8..2ef41c2d6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2029,6 +2029,7 @@ dependencies = [ "itertools 0.11.0", "log", "parquet2", + "path_macro", "pyo3", "rayon", "serde", @@ -3920,6 +3921,12 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" +[[package]] +name = "path_macro" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6e819bbd49d5939f682638fa54826bf1650abddcd65d000923de8ad63cc7d15" + [[package]] name = "pem" version = "3.0.4" diff --git a/Cargo.toml b/Cargo.toml index 55403d40bb..c71052df70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -161,6 +161,7 @@ jaq-std = "1.2.0" num-derive = "0.3.3" num-traits = "0.2" once_cell = "1.19.0" +path_macro = "1.0.0" pretty_assertions = "1.4.0" rand = "^0.8" rayon = "1.10.0" diff --git a/daft/daft/__init__.pyi b/daft/daft/__init__.pyi index 98f91fadb7..219cce3d23 100644 --- a/daft/daft/__init__.pyi +++ b/daft/daft/__init__.pyi @@ -1,7 +1,7 @@ import builtins import datetime from enum import Enum -from typing import TYPE_CHECKING, Any, Callable, Iterator +from typing import TYPE_CHECKING, Any, Callable, Iterator, Literal from daft.dataframe.display import MermaidOptions from daft.execution import physical_plan @@ -870,6 +870,7 @@ def read_parquet_into_pyarrow( io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: PyTimeUnit | None = None, + string_encoding: Literal["utf-8"] | Literal["raw"] = "utf-8", file_timeout_ms: int | None = None, ): ... def read_parquet_into_pyarrow_bulk( diff --git a/daft/table/table.py b/daft/table/table.py index 707ea6ec98..b6dfe2f6d2 100644 --- a/daft/table/table.py +++ b/daft/table/table.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal from daft.arrow_utils import ensure_table from daft.daft import ( @@ -524,6 +524,7 @@ def read_parquet_into_pyarrow( io_config: IOConfig | None = None, multithreaded_io: bool | None = None, coerce_int96_timestamp_unit: TimeUnit = TimeUnit.ns(), + string_encoding: Literal["utf-8"] | Literal["raw"] = "utf-8", file_timeout_ms: int | None = 900_000, # 15 minutes ) -> pa.Table: fields, metadata, columns, num_rows_read = _read_parquet_into_pyarrow( @@ -535,6 +536,7 @@ def read_parquet_into_pyarrow( io_config=io_config, multithreaded_io=multithreaded_io, coerce_int96_timestamp_unit=coerce_int96_timestamp_unit._timeunit, + string_encoding=string_encoding, file_timeout_ms=file_timeout_ms, ) schema = pa.schema(fields, metadata=metadata) diff --git a/src/arrow2/src/io/parquet/read/schema/convert.rs b/src/arrow2/src/io/parquet/read/schema/convert.rs index cf9838a2a2..1fdeccc7a1 100644 --- a/src/arrow2/src/io/parquet/read/schema/convert.rs +++ b/src/arrow2/src/io/parquet/read/schema/convert.rs @@ -7,24 +7,28 @@ use parquet2::schema::{ Repetition, }; -use crate::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; -use crate::io::parquet::read::schema::SchemaInferenceOptions; +use super::StringEncoding; +use crate::{ + datatypes::{DataType, Field, IntervalUnit, TimeUnit}, + io::parquet::read::schema::SchemaInferenceOptions, +}; /// Converts [`ParquetType`]s to a [`Field`], ignoring parquet fields that do not contain /// any physical column. #[allow(dead_code)] pub fn parquet_to_arrow_schema(fields: &[ParquetType]) -> Vec { - parquet_to_arrow_schema_with_options(fields, &None) + parquet_to_arrow_schema_with_options(fields, None) } /// Like [`parquet_to_arrow_schema`] but with configurable options which affect the behavior of schema inference pub fn parquet_to_arrow_schema_with_options( fields: &[ParquetType], - options: &Option, + options: Option, ) -> Vec { + let options = options.unwrap_or_default(); fields .iter() - .filter_map(|f| to_field(f, options.as_ref().unwrap_or(&Default::default()))) + .filter_map(|f| to_field(f, &options)) .collect::>() } @@ -145,9 +149,13 @@ fn from_int64( fn from_byte_array( logical_type: &Option, converted_type: &Option, + options: &SchemaInferenceOptions, ) -> DataType { match (logical_type, converted_type) { - (Some(PrimitiveLogicalType::String), _) => DataType::Utf8, + (Some(PrimitiveLogicalType::String), _) => match options.string_encoding { + StringEncoding::Utf8 => DataType::Utf8, + StringEncoding::Raw => DataType::Binary, + }, (Some(PrimitiveLogicalType::Json), _) => DataType::Binary, (Some(PrimitiveLogicalType::Bson), _) => DataType::Binary, (Some(PrimitiveLogicalType::Enum), _) => DataType::Binary, @@ -219,9 +227,11 @@ fn to_primitive_type_inner( PhysicalType::Int96 => DataType::Timestamp(options.int96_coerce_to_timeunit, None), PhysicalType::Float => DataType::Float32, PhysicalType::Double => DataType::Float64, - PhysicalType::ByteArray => { - from_byte_array(&primitive_type.logical_type, &primitive_type.converted_type) - } + PhysicalType::ByteArray => from_byte_array( + &primitive_type.logical_type, + &primitive_type.converted_type, + options, + ), PhysicalType::FixedLenByteArray(length) => from_fixed_len_byte_array( length, primitive_type.logical_type, @@ -440,7 +450,6 @@ mod tests { use parquet2::metadata::SchemaDescriptor; use super::*; - use crate::error::Result; #[test] @@ -1123,8 +1132,9 @@ mod tests { let parquet_schema = SchemaDescriptor::try_from_message(message_type)?; let fields = parquet_to_arrow_schema_with_options( parquet_schema.fields(), - &Some(SchemaInferenceOptions { + Some(SchemaInferenceOptions { int96_coerce_to_timeunit: tu, + ..Default::default() }), ); assert_eq!(arrow_fields, fields); diff --git a/src/arrow2/src/io/parquet/read/schema/mod.rs b/src/arrow2/src/io/parquet/read/schema/mod.rs index 293473c233..adb27b2fd9 100644 --- a/src/arrow2/src/io/parquet/read/schema/mod.rs +++ b/src/arrow2/src/io/parquet/read/schema/mod.rs @@ -1,21 +1,55 @@ //! APIs to handle Parquet <-> Arrow schemas. -use crate::datatypes::{Schema, TimeUnit}; -use crate::error::Result; +use std::str::FromStr; + +use crate::{ + datatypes::{Schema, TimeUnit}, + error::{Error, Result}, +}; mod convert; mod metadata; pub use convert::parquet_to_arrow_schema_with_options; -pub use metadata::{apply_schema_to_fields, read_schema_from_metadata}; -pub use parquet2::metadata::{FileMetaData, KeyValue, SchemaDescriptor}; -pub use parquet2::schema::types::ParquetType; - pub(crate) use convert::*; +pub use metadata::{apply_schema_to_fields, read_schema_from_metadata}; +pub use parquet2::{ + metadata::{FileMetaData, KeyValue, SchemaDescriptor}, + schema::types::ParquetType, +}; +use serde::{Deserialize, Serialize}; use self::metadata::parse_key_value_metadata; +/// String encoding options. +/// +/// Each variant of this enum maps to a different interpretation of the underlying binary data: +/// 1. `StringEncoding::Utf8` assumes the underlying binary data is UTF-8 encoded. +/// 2. `StringEncoding::Raw` makes no assumptions about the encoding of the underlying binary data. This variant will change the logical type of the column to `DataType::Binary` in the final schema. +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum StringEncoding { + Raw, + #[default] + Utf8, +} + +impl FromStr for StringEncoding { + type Err = Error; + + fn from_str(value: &str) -> Result { + match value { + "utf-8" => Ok(Self::Utf8), + "raw" => Ok(Self::Raw), + encoding => Err(Error::InvalidArgumentError(format!( + "Unrecognized encoding: {}", + encoding, + ))), + } + } +} + /// Options when inferring schemas from Parquet +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct SchemaInferenceOptions { /// When inferring schemas from the Parquet INT96 timestamp type, this is the corresponding TimeUnit /// in the inferred Arrow Timestamp type. @@ -25,12 +59,16 @@ pub struct SchemaInferenceOptions { /// (e.g. TimeUnit::Milliseconds) will result in loss of precision, but support a larger range of dates /// without overflowing when parsing the data. pub int96_coerce_to_timeunit: TimeUnit, + + /// The string encoding to assume when inferring the schema from Parquet data. + pub string_encoding: StringEncoding, } impl Default for SchemaInferenceOptions { fn default() -> Self { SchemaInferenceOptions { int96_coerce_to_timeunit: TimeUnit::Nanosecond, + string_encoding: StringEncoding::default(), } } } @@ -42,13 +80,13 @@ impl Default for SchemaInferenceOptions { /// This function errors iff the key `"ARROW:schema"` exists but is not correctly encoded, /// indicating that that the file's arrow metadata was incorrectly written. pub fn infer_schema(file_metadata: &FileMetaData) -> Result { - infer_schema_with_options(file_metadata, &None) + infer_schema_with_options(file_metadata, None) } /// Like [`infer_schema`] but with configurable options which affects the behavior of inference pub fn infer_schema_with_options( file_metadata: &FileMetaData, - options: &Option, + options: Option, ) -> Result { let mut metadata = parse_key_value_metadata(file_metadata.key_value_metadata()); let fields = parquet_to_arrow_schema_with_options(file_metadata.schema().fields(), options); diff --git a/src/daft-micropartition/src/micropartition.rs b/src/daft-micropartition/src/micropartition.rs index cc8439583c..c3059626fa 100644 --- a/src/daft-micropartition/src/micropartition.rs +++ b/src/daft-micropartition/src/micropartition.rs @@ -596,9 +596,9 @@ impl MicroPartition { ( _, _, - FileFormatConfig::Parquet(ParquetSourceConfig { + &FileFormatConfig::Parquet(ParquetSourceConfig { coerce_int96_timestamp_unit, - field_id_mapping, + ref field_id_mapping, chunk_size, .. }), @@ -646,12 +646,13 @@ impl MicroPartition { if scan_task.sources.len() == 1 { 1 } else { 128 }, // Hardcoded for to 128 bulk reads cfg.multithreaded_io, &ParquetSchemaInferenceOptions { - coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, + coerce_int96_timestamp_unit, + ..Default::default() }, Some(schema.clone()), field_id_mapping.clone(), parquet_metadata, - *chunk_size, + chunk_size, ) .context(DaftCoreComputeSnafu) } @@ -1162,7 +1163,7 @@ pub(crate) fn read_parquet_into_micropartition>( let schemas = metadata .iter() .map(|m| { - let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?; + let schema = infer_schema_with_options(m, Some((*schema_infer_options).into()))?; let daft_schema = daft_core::prelude::Schema::try_from(&schema)?; DaftResult::Ok(Arc::new(daft_schema)) }) @@ -1186,7 +1187,7 @@ pub(crate) fn read_parquet_into_micropartition>( let schemas = metadata .iter() .map(|m| { - let schema = infer_schema_with_options(m, &Some((*schema_infer_options).into()))?; + let schema = infer_schema_with_options(m, Some((*schema_infer_options).into()))?; let daft_schema = daft_core::prelude::Schema::try_from(&schema)?; DaftResult::Ok(Arc::new(daft_schema)) }) diff --git a/src/daft-parquet/Cargo.toml b/src/daft-parquet/Cargo.toml index 3e1a4876b8..1ec75b3cb1 100644 --- a/src/daft-parquet/Cargo.toml +++ b/src/daft-parquet/Cargo.toml @@ -26,6 +26,7 @@ tokio-util = {workspace = true} [dev-dependencies] bincode = {workspace = true} +path_macro = {workspace = true} [features] python = ["dep:pyo3", "common-error/python", "daft-core/python", "daft-io/python", "daft-table/python", "daft-stats/python", "daft-dsl/python", "common-arrow-ffi/python"] diff --git a/src/daft-parquet/src/file.rs b/src/daft-parquet/src/file.rs index 3b84579b6d..8ba9f4ed25 100644 --- a/src/daft-parquet/src/file.rs +++ b/src/daft-parquet/src/file.rs @@ -259,11 +259,12 @@ impl ParquetReaderBuilder { } pub fn build(self) -> super::Result { - let mut arrow_schema = - infer_schema_with_options(&self.metadata, &Some(self.schema_inference_options.into())) - .context(UnableToParseSchemaFromMetadataSnafu:: { - path: self.uri.clone(), - })?; + let options = self.schema_inference_options.into(); + let mut arrow_schema = infer_schema_with_options(&self.metadata, Some(options)).context( + UnableToParseSchemaFromMetadataSnafu { + path: self.uri.clone(), + }, + )?; if let Some(names_to_keep) = self.selected_columns { arrow_schema diff --git a/src/daft-parquet/src/lib.rs b/src/daft-parquet/src/lib.rs index d1057e95f7..e907fec22e 100644 --- a/src/daft-parquet/src/lib.rs +++ b/src/daft-parquet/src/lib.rs @@ -19,6 +19,9 @@ pub use python::register_modules; #[derive(Debug, Snafu)] pub enum Error { + #[snafu(display("{source}"))] + Arrow2Error { source: arrow2::error::Error }, + #[snafu(display("{source}"))] DaftIOError { source: daft_io::Error }, diff --git a/src/daft-parquet/src/python.rs b/src/daft-parquet/src/python.rs index 930eb7e91b..2d965053c2 100644 --- a/src/daft-parquet/src/python.rs +++ b/src/daft-parquet/src/python.rs @@ -10,7 +10,9 @@ pub mod pylib { use daft_table::python::PyTable; use pyo3::{pyfunction, types::PyModule, Bound, PyResult, Python}; - use crate::read::{ArrowChunk, ParquetSchemaInferenceOptions}; + use crate::read::{ + ArrowChunk, ParquetSchemaInferenceOptions, ParquetSchemaInferenceOptionsBuilder, + }; #[allow(clippy::too_many_arguments)] #[pyfunction] pub fn read_parquet( @@ -90,6 +92,7 @@ pub mod pylib { pub fn read_parquet_into_pyarrow( py: Python, uri: &str, + string_encoding: String, columns: Option>, start_offset: Option, num_rows: Option, @@ -99,14 +102,16 @@ pub mod pylib { coerce_int96_timestamp_unit: Option, file_timeout_ms: Option, ) -> PyResult { - let read_parquet_result = py.allow_threads(|| { + let (schema, all_arrays, num_rows) = py.allow_threads(|| { let io_client = get_io_client( multithreaded_io.unwrap_or(true), io_config.unwrap_or_default().config.into(), )?; - let schema_infer_options = ParquetSchemaInferenceOptions::new( - coerce_int96_timestamp_unit.map(|tu| tu.timeunit), - ); + let schema_infer_options = ParquetSchemaInferenceOptionsBuilder { + coerce_int96_timestamp_unit, + string_encoding, + } + .build()?; crate::read::read_parquet_into_pyarrow( uri, @@ -121,7 +126,6 @@ pub mod pylib { file_timeout_ms, ) })?; - let (schema, all_arrays, num_rows) = read_parquet_result; let pyarrow = py.import_bound(pyo3::intern!(py, "pyarrow"))?; convert_pyarrow_parquet_read_result_into_py(py, schema, all_arrays, num_rows, &pyarrow) } diff --git a/src/daft-parquet/src/read.rs b/src/daft-parquet/src/read.rs index 38974e2b01..eed16ae5b9 100644 --- a/src/daft-parquet/src/read.rs +++ b/src/daft-parquet/src/read.rs @@ -4,9 +4,16 @@ use std::{ time::Duration, }; -use arrow2::{bitmap::Bitmap, io::parquet::read::schema::infer_schema_with_options}; +use arrow2::{ + bitmap::Bitmap, + io::parquet::read::schema::{ + infer_schema_with_options, SchemaInferenceOptions, StringEncoding, + }, +}; use common_error::DaftResult; use daft_core::prelude::*; +#[cfg(feature = "python")] +use daft_core::python::PyTimeUnit; use daft_dsl::{optimization::get_required_columns, ExprRef}; use daft_io::{get_runtime, parse_url, IOClient, IOStatsRef, SourceType}; use daft_table::Table; @@ -22,18 +29,57 @@ use snafu::ResultExt; use crate::{file::ParquetReaderBuilder, JoinSnafu}; -#[derive(Clone, Copy, Serialize, Deserialize)] +#[cfg(feature = "python")] +#[derive(Clone)] +pub struct ParquetSchemaInferenceOptionsBuilder { + pub coerce_int96_timestamp_unit: Option, + pub string_encoding: String, +} + +#[cfg(feature = "python")] +impl ParquetSchemaInferenceOptionsBuilder { + pub fn build(self) -> crate::Result { + self.try_into() + } +} + +#[cfg(feature = "python")] +impl TryFrom for ParquetSchemaInferenceOptions { + type Error = crate::Error; + + fn try_from(value: ParquetSchemaInferenceOptionsBuilder) -> crate::Result { + Ok(ParquetSchemaInferenceOptions { + coerce_int96_timestamp_unit: value + .coerce_int96_timestamp_unit + .map_or(TimeUnit::Nanoseconds, From::from), + string_encoding: value.string_encoding.parse().context(crate::Arrow2Snafu)?, + }) + } +} + +#[cfg(feature = "python")] +impl Default for ParquetSchemaInferenceOptionsBuilder { + fn default() -> Self { + Self { + coerce_int96_timestamp_unit: Some(PyTimeUnit::nanoseconds().unwrap()), + string_encoding: "utf-8".into(), + } + } +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] pub struct ParquetSchemaInferenceOptions { pub coerce_int96_timestamp_unit: TimeUnit, + pub string_encoding: StringEncoding, } impl ParquetSchemaInferenceOptions { pub fn new(coerce_int96_timestamp_unit: Option) -> Self { - let default: ParquetSchemaInferenceOptions = Default::default(); let coerce_int96_timestamp_unit = - coerce_int96_timestamp_unit.unwrap_or(default.coerce_int96_timestamp_unit); + coerce_int96_timestamp_unit.unwrap_or(TimeUnit::Nanoseconds); ParquetSchemaInferenceOptions { coerce_int96_timestamp_unit, + ..Default::default() } } } @@ -42,16 +88,16 @@ impl Default for ParquetSchemaInferenceOptions { fn default() -> Self { ParquetSchemaInferenceOptions { coerce_int96_timestamp_unit: TimeUnit::Nanoseconds, + string_encoding: StringEncoding::Utf8, } } } -impl From - for arrow2::io::parquet::read::schema::SchemaInferenceOptions -{ +impl From for SchemaInferenceOptions { fn from(value: ParquetSchemaInferenceOptions) -> Self { - arrow2::io::parquet::read::schema::SchemaInferenceOptions { + SchemaInferenceOptions { int96_coerce_to_timeunit: value.coerce_int96_timestamp_unit.to_arrow(), + string_encoding: value.string_encoding, } } } @@ -470,7 +516,7 @@ async fn read_parquet_single_into_arrow( schema_infer_options: ParquetSchemaInferenceOptions, field_id_mapping: Option>>, metadata: Option>, -) -> DaftResult<(arrow2::datatypes::SchemaRef, Vec, usize)> { +) -> DaftResult { let field_id_mapping_provided = field_id_mapping.is_some(); let (source_type, fixed_uri) = parse_url(uri)?; let (metadata, schema, all_arrays, num_rows_read) = if matches!(source_type, SourceType::File) { @@ -889,8 +935,7 @@ pub fn read_parquet_schema( let builder = builder.set_infer_schema_options(schema_inference_options); let metadata = builder.metadata; - let arrow_schema = - infer_schema_with_options(&metadata, &Some(schema_inference_options.into()))?; + let arrow_schema = infer_schema_with_options(&metadata, Some(schema_inference_options.into()))?; let schema = Schema::try_from(&arrow_schema)?; Ok((schema, metadata)) } @@ -1019,20 +1064,24 @@ pub fn read_parquet_statistics( #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{path::PathBuf, sync::Arc}; + use arrow2::{datatypes::DataType, io::parquet::read::schema::StringEncoding}; use common_error::DaftResult; use daft_io::{IOClient, IOConfig}; use futures::StreamExt; - use parquet2::metadata::FileMetaData; + use parquet2::{ + metadata::FileMetaData, + schema::types::{ParquetType, PrimitiveConvertedType, PrimitiveLogicalType}, + }; - use super::{read_parquet, read_parquet_metadata, stream_parquet}; + use super::*; const PARQUET_FILE: &str = "s3://daft-public-data/test_fixtures/parquet-dev/mvp.parquet"; const PARQUET_FILE_LOCAL: &str = "tests/assets/parquet-data/mvp.parquet"; fn get_local_parquet_path() -> String { - let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + let mut d = PathBuf::from(env!("CARGO_MANIFEST_DIR")); d.push("../../"); // CARGO_MANIFEST_DIR is at src/daft-parquet d.push(PARQUET_FILE_LOCAL); d.to_str().unwrap().to_string() @@ -1116,4 +1165,68 @@ mod tests { Ok(()) })? } + + #[test] + fn test_invalid_utf8_parquet_reading() { + let parquet: Arc = path_macro::path!( + env!("CARGO_MANIFEST_DIR") + / ".." + / ".." + / "tests" + / "assets" + / "parquet-data" + / "invalid_utf8.parquet" + ) + .to_str() + .unwrap() + .into(); + let io_config = IOConfig::default(); + let io_client = Arc::new(IOClient::new(io_config.into()).unwrap()); + let runtime_handle = daft_io::get_runtime(true).unwrap(); + let file_metadata = runtime_handle + .block_on_io_pool({ + let parquet = parquet.clone(); + let io_client = io_client.clone(); + async move { read_parquet_metadata(&parquet, io_client, None, None).await } + }) + .flatten() + .unwrap(); + let primitive_type = match file_metadata.schema_descr.fields() { + [parquet_type] => match parquet_type { + ParquetType::PrimitiveType(primitive_type) => primitive_type, + ParquetType::GroupType { .. } => { + panic!("Parquet type should be primitive type, not group type") + } + }, + _ => panic!("This test parquet file should have only 1 field"), + }; + assert_eq!( + primitive_type.logical_type, + Some(PrimitiveLogicalType::String) + ); + assert_eq!( + primitive_type.converted_type, + Some(PrimitiveConvertedType::Utf8) + ); + let (schema, _, _) = read_parquet_into_pyarrow( + &parquet, + None, + None, + None, + None, + io_client, + None, + true, + ParquetSchemaInferenceOptions { + string_encoding: StringEncoding::Raw, + ..Default::default() + }, + None, + ) + .unwrap(); + match schema.fields.as_slice() { + [field] => assert_eq!(field.data_type, DataType::Binary), + _ => panic!("There should only be one field in the schema"), + }; + } } diff --git a/src/daft-parquet/src/stream_reader.rs b/src/daft-parquet/src/stream_reader.rs index dd88834aaa..178141c64d 100644 --- a/src/daft-parquet/src/stream_reader.rs +++ b/src/daft-parquet/src/stream_reader.rs @@ -229,7 +229,7 @@ pub(crate) fn local_parquet_read_into_column_iters( })?, }; - let schema = infer_schema_with_options(&metadata, &Some(schema_infer_options.into())) + let schema = infer_schema_with_options(&metadata, Some(schema_infer_options.into())) .with_context(|_| super::UnableToParseSchemaFromMetadataSnafu { path: uri.to_string(), })?; @@ -325,7 +325,7 @@ pub(crate) fn local_parquet_read_into_arrow( }; // and infer a [`Schema`] from the `metadata`. - let schema = infer_schema_with_options(&metadata, &Some(schema_infer_options.into())) + let schema = infer_schema_with_options(&metadata, Some(schema_infer_options.into())) .with_context(|_| super::UnableToParseSchemaFromMetadataSnafu { path: uri.to_string(), })?; diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 5383235c67..90621e510d 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -166,9 +166,9 @@ impl GlobScanOperator { let schema = match infer_schema { true => { let inferred_schema = match file_format_config.as_ref() { - FileFormatConfig::Parquet(ParquetSourceConfig { + &FileFormatConfig::Parquet(ParquetSourceConfig { coerce_int96_timestamp_unit, - field_id_mapping, + ref field_id_mapping, .. }) => { let io_stats = IOStatsContext::new(format!( @@ -180,7 +180,8 @@ impl GlobScanOperator { io_client.clone(), Some(io_stats), ParquetSchemaInferenceOptions { - coerce_int96_timestamp_unit: *coerce_int96_timestamp_unit, + coerce_int96_timestamp_unit, + ..Default::default() }, field_id_mapping.clone(), )?; diff --git a/tests/assets/parquet-data/invalid_utf8.parquet b/tests/assets/parquet-data/invalid_utf8.parquet new file mode 100644 index 0000000000..56e9a9595e Binary files /dev/null and b/tests/assets/parquet-data/invalid_utf8.parquet differ diff --git a/tests/table/table_io/test_parquet.py b/tests/table/table_io/test_parquet.py index e568a043f6..3a17edd4ec 100644 --- a/tests/table/table_io/test_parquet.py +++ b/tests/table/table_io/test_parquet.py @@ -5,6 +5,7 @@ import os import pathlib import tempfile +from pathlib import Path import pyarrow as pa import pyarrow.parquet as papq @@ -13,6 +14,7 @@ import daft from daft.daft import NativeStorageConfig, PythonStorageConfig, StorageConfig from daft.datatype import DataType, TimeUnit +from daft.exceptions import DaftCoreException from daft.logical.schema import Schema from daft.runners.partitioning import TableParseParquetOptions, TableReadOptions from daft.table import ( @@ -397,3 +399,22 @@ def test_read_parquet_file_missing_column_partial_read_with_pyarrow_bulk(tmpdir) read_back = read_parquet_into_pyarrow_bulk([file_path.as_posix()], columns=["x", "MISSING"]) assert len(read_back) == 1 assert tab.drop("y") == read_back[0] # only read "x" + + +@pytest.mark.parametrize( + "parquet_path", [Path(__file__).parents[2] / "assets" / "parquet-data" / "invalid_utf8.parquet"] +) +def test_parquet_read_string_utf8_into_binary(parquet_path: Path): + import pyarrow as pa + + assert parquet_path.exists() + + with pytest.raises(DaftCoreException, match="invalid utf-8 sequence"): + read_parquet_into_pyarrow(path=parquet_path.as_posix()) + + read_back = read_parquet_into_pyarrow(path=parquet_path.as_posix(), string_encoding="raw") + schema = read_back.schema + assert len(schema) == 1 + assert schema[0].name == "invalid_string" + assert schema[0].type == pa.binary() + assert read_back["invalid_string"][0].as_py() == b"\x80\x80\x80"