diff --git a/src/daft-parquet/src/statistics/column_range.rs b/src/daft-parquet/src/statistics/column_range.rs index a58daa725c..4b386c9cef 100644 --- a/src/daft-parquet/src/statistics/column_range.rs +++ b/src/daft-parquet/src/statistics/column_range.rs @@ -4,7 +4,7 @@ use daft_core::{ logical::{DateArray, Decimal128Array, TimestampArray}, BinaryArray, BooleanArray, Int128Array, Int32Array, Int64Array, Utf8Array, }, - IntoSeries, Series, + DataType, IntoSeries, Series, }; use daft_stats::ColumnRangeStatistics; use parquet2::{ @@ -15,7 +15,7 @@ use parquet2::{ }; use snafu::{OptionExt, ResultExt}; -use super::{MissingParquetColumnStatisticsSnafu, Wrap}; +use super::{DaftStatsSnafu, MissingParquetColumnStatisticsSnafu, Wrap}; use super::utils::*; use super::UnableToParseUtf8FromBinarySnafu; @@ -392,43 +392,56 @@ fn convert_int96_column_range_statistics( Ok(ColumnRangeStatistics::Missing) } -impl TryFrom<&dyn Statistics> for Wrap { - type Error = super::Error; +pub(crate) fn parquet_statistics_to_column_range_statistics( + pq_stats: &dyn Statistics, + daft_dtype: &DataType, +) -> Result { + // Create ColumnRangeStatistics containing Series objects that are the **physical** types parsed from Parquet + let ptype = pq_stats.physical_type(); + let stats = pq_stats.as_any(); + let daft_stats = match ptype { + PhysicalType::Boolean => stats + .downcast_ref::() + .unwrap() + .try_into() + .map(|wrap: Wrap| wrap.0), + PhysicalType::Int32 => stats + .downcast_ref::>() + .unwrap() + .try_into() + .map(|wrap: Wrap| wrap.0), + PhysicalType::Int64 => stats + .downcast_ref::>() + .unwrap() + .try_into() + .map(|wrap: Wrap| wrap.0), + PhysicalType::Int96 => Ok(convert_int96_column_range_statistics( + stats + .downcast_ref::>() + .unwrap(), + )?), + PhysicalType::Float => stats + .downcast_ref::>() + .unwrap() + .try_into() + .map(|wrap: Wrap| wrap.0), + PhysicalType::Double => stats + .downcast_ref::>() + .unwrap() + .try_into() + .map(|wrap: Wrap| wrap.0), + PhysicalType::ByteArray => stats + .downcast_ref::() + .unwrap() + .try_into() + .map(|wrap: Wrap| wrap.0), + PhysicalType::FixedLenByteArray(_) => stats + .downcast_ref::() + .unwrap() + .try_into() + .map(|wrap: Wrap| wrap.0), + }; - fn try_from(value: &dyn Statistics) -> Result { - let ptype = value.physical_type(); - let stats = value.as_any(); - match ptype { - PhysicalType::Boolean => stats - .downcast_ref::() - .unwrap() - .try_into(), - PhysicalType::Int32 => stats - .downcast_ref::>() - .unwrap() - .try_into(), - PhysicalType::Int64 => stats - .downcast_ref::>() - .unwrap() - .try_into(), - PhysicalType::Int96 => Ok(Wrap(convert_int96_column_range_statistics( - stats - .downcast_ref::>() - .unwrap(), - )?)), - PhysicalType::Float => stats - .downcast_ref::>() - .unwrap() - .try_into(), - PhysicalType::Double => stats - .downcast_ref::>() - .unwrap() - .try_into(), - PhysicalType::ByteArray => stats.downcast_ref::().unwrap().try_into(), - PhysicalType::FixedLenByteArray(_) => stats - .downcast_ref::() - .unwrap() - .try_into(), - } - } + // Cast to ensure that the ColumnRangeStatistics now contain the targeted Daft **logical** type + daft_stats.and_then(|s| s.cast(daft_dtype).context(DaftStatsSnafu)) } diff --git a/src/daft-parquet/src/statistics/table_stats.rs b/src/daft-parquet/src/statistics/table_stats.rs index f457229624..25200fdfed 100644 --- a/src/daft-parquet/src/statistics/table_stats.rs +++ b/src/daft-parquet/src/statistics/table_stats.rs @@ -3,7 +3,7 @@ use daft_core::schema::Schema; use daft_stats::{ColumnRangeStatistics, TableStatistics}; use snafu::ResultExt; -use super::Wrap; +use super::column_range::parquet_statistics_to_column_range_statistics; use indexmap::IndexMap; @@ -31,14 +31,16 @@ pub fn row_group_metadata_to_table_stats( .iter() .map(|(field_name, field)| { if ColumnRangeStatistics::supports_dtype(&field.dtype) { - let stats: Wrap = parquet_column_metadata + let stats: ColumnRangeStatistics = parquet_column_metadata .remove(field_name) .expect("Cannot find parsed Daft field in Parquet rowgroup metadata") .transpose() .context(super::UnableToParseParquetColumnStatisticsSnafu)? - .and_then(|v| v.as_ref().try_into().ok()) - .unwrap_or(ColumnRangeStatistics::Missing.into()); - Ok((field_name.clone(), stats.0)) + .and_then(|v| { + parquet_statistics_to_column_range_statistics(v.as_ref(), &field.dtype).ok() + }) + .unwrap_or(ColumnRangeStatistics::Missing); + Ok((field_name.clone(), stats)) } else { Ok((field_name.clone(), ColumnRangeStatistics::Missing)) } diff --git a/src/daft-stats/src/column_stats/mod.rs b/src/daft-stats/src/column_stats/mod.rs index f9f5957ca6..0df5bdce19 100644 --- a/src/daft-stats/src/column_stats/mod.rs +++ b/src/daft-stats/src/column_stats/mod.rs @@ -142,6 +142,17 @@ impl ColumnRangeStatistics { let _num_bytes = series.size_bytes().unwrap(); Self::Loaded(lower, upper) } + + /// Casts the internal [`Series`] objects to the specified DataType + pub fn cast(&self, dtype: &DataType) -> crate::Result { + match self { + ColumnRangeStatistics::Missing => Ok(ColumnRangeStatistics::Missing), + ColumnRangeStatistics::Loaded(l, r) => Ok(ColumnRangeStatistics::Loaded( + l.cast(dtype).context(DaftCoreComputeSnafu)?, + r.cast(dtype).context(DaftCoreComputeSnafu)?, + )), + } + } } impl std::fmt::Display for ColumnRangeStatistics {