Skip to content

Commit

Permalink
Fix 3 failing roundtrip tests by casting the statistics Series when p…
Browse files Browse the repository at this point in the history
…arsing from Parquet
  • Loading branch information
Jay Chia committed Nov 18, 2023
1 parent bceea55 commit 43032c1
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 45 deletions.
93 changes: 53 additions & 40 deletions src/daft-parquet/src/statistics/column_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use daft_core::{
logical::{DateArray, Decimal128Array, TimestampArray},
BinaryArray, BooleanArray, Int128Array, Int32Array, Int64Array, Utf8Array,
},
IntoSeries, Series,
DataType, IntoSeries, Series,
};
use daft_stats::ColumnRangeStatistics;
use parquet2::{
Expand All @@ -15,7 +15,7 @@ use parquet2::{
};
use snafu::{OptionExt, ResultExt};

use super::{MissingParquetColumnStatisticsSnafu, Wrap};
use super::{DaftStatsSnafu, MissingParquetColumnStatisticsSnafu, Wrap};

use super::utils::*;
use super::UnableToParseUtf8FromBinarySnafu;
Expand Down Expand Up @@ -392,43 +392,56 @@ fn convert_int96_column_range_statistics(
Ok(ColumnRangeStatistics::Missing)
}

impl TryFrom<&dyn Statistics> for Wrap<ColumnRangeStatistics> {
type Error = super::Error;
pub(crate) fn parquet_statistics_to_column_range_statistics(
pq_stats: &dyn Statistics,
daft_dtype: &DataType,
) -> Result<ColumnRangeStatistics, super::Error> {
// Create ColumnRangeStatistics containing Series objects that are the **physical** types parsed from Parquet
let ptype = pq_stats.physical_type();
let stats = pq_stats.as_any();
let daft_stats = match ptype {
PhysicalType::Boolean => stats
.downcast_ref::<BooleanStatistics>()
.unwrap()
.try_into()
.map(|wrap: Wrap<ColumnRangeStatistics>| wrap.0),
PhysicalType::Int32 => stats
.downcast_ref::<PrimitiveStatistics<i32>>()
.unwrap()
.try_into()
.map(|wrap: Wrap<ColumnRangeStatistics>| wrap.0),
PhysicalType::Int64 => stats
.downcast_ref::<PrimitiveStatistics<i64>>()
.unwrap()
.try_into()
.map(|wrap: Wrap<ColumnRangeStatistics>| wrap.0),
PhysicalType::Int96 => Ok(convert_int96_column_range_statistics(
stats
.downcast_ref::<PrimitiveStatistics<[u32; 3]>>()
.unwrap(),
)?),
PhysicalType::Float => stats
.downcast_ref::<PrimitiveStatistics<f32>>()
.unwrap()
.try_into()
.map(|wrap: Wrap<ColumnRangeStatistics>| wrap.0),
PhysicalType::Double => stats
.downcast_ref::<PrimitiveStatistics<f64>>()
.unwrap()
.try_into()
.map(|wrap: Wrap<ColumnRangeStatistics>| wrap.0),
PhysicalType::ByteArray => stats
.downcast_ref::<BinaryStatistics>()
.unwrap()
.try_into()
.map(|wrap: Wrap<ColumnRangeStatistics>| wrap.0),
PhysicalType::FixedLenByteArray(_) => stats
.downcast_ref::<FixedLenStatistics>()
.unwrap()
.try_into()
.map(|wrap: Wrap<ColumnRangeStatistics>| wrap.0),
};

fn try_from(value: &dyn Statistics) -> Result<Self, Self::Error> {
let ptype = value.physical_type();
let stats = value.as_any();
match ptype {
PhysicalType::Boolean => stats
.downcast_ref::<BooleanStatistics>()
.unwrap()
.try_into(),
PhysicalType::Int32 => stats
.downcast_ref::<PrimitiveStatistics<i32>>()
.unwrap()
.try_into(),
PhysicalType::Int64 => stats
.downcast_ref::<PrimitiveStatistics<i64>>()
.unwrap()
.try_into(),
PhysicalType::Int96 => Ok(Wrap(convert_int96_column_range_statistics(
stats
.downcast_ref::<PrimitiveStatistics<[u32; 3]>>()
.unwrap(),
)?)),
PhysicalType::Float => stats
.downcast_ref::<PrimitiveStatistics<f32>>()
.unwrap()
.try_into(),
PhysicalType::Double => stats
.downcast_ref::<PrimitiveStatistics<f64>>()
.unwrap()
.try_into(),
PhysicalType::ByteArray => stats.downcast_ref::<BinaryStatistics>().unwrap().try_into(),
PhysicalType::FixedLenByteArray(_) => stats
.downcast_ref::<FixedLenStatistics>()
.unwrap()
.try_into(),
}
}
// Cast to ensure that the ColumnRangeStatistics now contain the targeted Daft **logical** type
daft_stats.and_then(|s| s.cast(daft_dtype).context(DaftStatsSnafu))
}
12 changes: 7 additions & 5 deletions src/daft-parquet/src/statistics/table_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use daft_core::schema::Schema;
use daft_stats::{ColumnRangeStatistics, TableStatistics};
use snafu::ResultExt;

use super::Wrap;
use super::column_range::parquet_statistics_to_column_range_statistics;

use indexmap::IndexMap;

Expand Down Expand Up @@ -31,14 +31,16 @@ pub fn row_group_metadata_to_table_stats(
.iter()
.map(|(field_name, field)| {
if ColumnRangeStatistics::supports_dtype(&field.dtype) {
let stats: Wrap<ColumnRangeStatistics> = parquet_column_metadata
let stats: ColumnRangeStatistics = parquet_column_metadata
.remove(field_name)
.expect("Cannot find parsed Daft field in Parquet rowgroup metadata")
.transpose()
.context(super::UnableToParseParquetColumnStatisticsSnafu)?
.and_then(|v| v.as_ref().try_into().ok())
.unwrap_or(ColumnRangeStatistics::Missing.into());
Ok((field_name.clone(), stats.0))
.and_then(|v| {
parquet_statistics_to_column_range_statistics(v.as_ref(), &field.dtype).ok()
})
.unwrap_or(ColumnRangeStatistics::Missing);
Ok((field_name.clone(), stats))
} else {
Ok((field_name.clone(), ColumnRangeStatistics::Missing))
}
Expand Down
11 changes: 11 additions & 0 deletions src/daft-stats/src/column_stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ impl ColumnRangeStatistics {
let _num_bytes = series.size_bytes().unwrap();
Self::Loaded(lower, upper)
}

/// Casts the internal [`Series`] objects to the specified DataType
pub fn cast(&self, dtype: &DataType) -> crate::Result<Self> {
match self {
ColumnRangeStatistics::Missing => Ok(ColumnRangeStatistics::Missing),
ColumnRangeStatistics::Loaded(l, r) => Ok(ColumnRangeStatistics::Loaded(
l.cast(dtype).context(DaftCoreComputeSnafu)?,
r.cast(dtype).context(DaftCoreComputeSnafu)?,
)),
}
}
}

impl std::fmt::Display for ColumnRangeStatistics {
Expand Down

0 comments on commit 43032c1

Please sign in to comment.