Skip to content

Commit

Permalink
refactor: consolidate parquet stat min/max parsing in one place
Browse files Browse the repository at this point in the history
  • Loading branch information
sdd committed Aug 28, 2024
1 parent 6e78d70 commit a3b3c8a
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 227 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ once_cell = "1"
opendal = "0.49"
ordered-float = "4"
parquet = "52"
paste = "1"
pilota = "0.11.2"
pretty_assertions = "1.4"
port_scanner = "0.1.5"
rand = "0.8"
regex = "1.10.5"
reqwest = { version = "0.12", default-features = false, features = ["json"] }
rust_decimal = "1.31"
Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ once_cell = { workspace = true }
opendal = { workspace = true }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
paste = { workspace = true }
reqwest = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
Expand All @@ -84,6 +85,6 @@ ctor = { workspace = true }
iceberg-catalog-memory = { workspace = true }
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
pretty_assertions = { workspace = true }
rand = "0.8"
rand = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }
103 changes: 103 additions & 0 deletions crates/iceberg/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use arrow_array::{
use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
use bitvec::macros::internal::funty::Fundamental;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use parquet::file::statistics::Statistics;
use rust_decimal::prelude::ToPrimitive;
use uuid::Uuid;

use crate::error::Result;
use crate::spec::{
Expand Down Expand Up @@ -652,6 +654,107 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result<Box<dyn ArrowDatum + Send
}
}

macro_rules! get_parquet_stat_as_datum {
($limit_type:ident) => {
paste::paste! {
/// Gets the $limit_type value from a parquet Statistics struct, as a Datum
pub(crate) fn [<get_parquet_stat_ $limit_type _as_datum>](
primitive_type: &PrimitiveType, stats: &Statistics
) -> Result<Option<Datum>> {
Ok(Some(match (primitive_type, stats) {
(PrimitiveType::Boolean, Statistics::Boolean(stats)) => Datum::bool(*stats.$limit_type()),
(PrimitiveType::Int, Statistics::Int32(stats)) => Datum::int(*stats.$limit_type()),
(PrimitiveType::Date, Statistics::Int32(stats)) => Datum::date(*stats.$limit_type()),
(PrimitiveType::Long, Statistics::Int64(stats)) => Datum::long(*stats.$limit_type()),
(PrimitiveType::Time, Statistics::Int64(stats)) => Datum::time_micros(*stats.$limit_type())?,
(PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
Datum::timestamp_micros(*stats.$limit_type())
}
(PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
Datum::timestamptz_micros(*stats.$limit_type())
}
(PrimitiveType::TimestampNs, Statistics::Int64(stats)) => {
Datum::timestamp_nanos(*stats.$limit_type())
}
(PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => {
Datum::timestamptz_nanos(*stats.$limit_type())
}
(PrimitiveType::Float, Statistics::Float(stats)) => Datum::float(*stats.$limit_type()),
(PrimitiveType::Double, Statistics::Double(stats)) => Datum::double(*stats.$limit_type()),
(PrimitiveType::String, Statistics::ByteArray(stats)) => {
Datum::string(stats.$limit_type().as_utf8()?)
}
(PrimitiveType::Decimal {
precision: _,
scale: _,
}, Statistics::ByteArray(stats)) => {
Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from_le_bytes(stats.[<$limit_type _bytes>]().try_into()?)),
)
}
(
PrimitiveType::Decimal {
precision: _,
scale: _,
},
Statistics::Int32(stats)) => {
Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())),
)
}

(
PrimitiveType::Decimal {
precision: _,
scale: _,
},
Statistics::Int64(stats),
) => {
Datum::new(
primitive_type.clone(),
PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())),
)
}
(PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => {
let raw = stats.[<$limit_type _bytes>]();
if raw.len() != 16 {
return Err(Error::new(
ErrorKind::Unexpected,
"Invalid length of uuid bytes.",
));
}
Datum::uuid(Uuid::from_bytes(
raw[..16].try_into().unwrap(),
))
}
(PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => {
let raw = stat.[<$limit_type _bytes>]();
if raw.len() != *len as usize {
return Err(Error::new(
ErrorKind::Unexpected,
"Invalid length of fixed bytes.",
));
}
Datum::fixed(raw.to_vec())
}
(PrimitiveType::Binary, Statistics::ByteArray(stat)) => {
Datum::binary(stat.[<$limit_type _bytes>]().to_vec())
}
_ => {
return Ok(None);
}
}))
}
}
}
}

get_parquet_stat_as_datum!(min);

get_parquet_stat_as_datum!(max);

impl TryFrom<&ArrowSchema> for crate::spec::Schema {
type Error = Error;

Expand Down
61 changes: 3 additions & 58 deletions crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use fnv::FnvHashSet;
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::statistics::Statistics;

use crate::arrow::{get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum};
use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::{BoundPredicate, BoundReference};
use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema};
Expand Down Expand Up @@ -144,35 +145,7 @@ impl<'a> RowGroupMetricsEvaluator<'a> {
return Ok(None);
}

Ok(Some(match (primitive_type, stats) {
(PrimitiveType::Boolean, Statistics::Boolean(stats)) => Datum::bool(*stats.min()),
(PrimitiveType::Int, Statistics::Int32(stats)) => Datum::int(*stats.min()),
(PrimitiveType::Date, Statistics::Int32(stats)) => Datum::date(*stats.min()),
(PrimitiveType::Long, Statistics::Int64(stats)) => Datum::long(*stats.min()),
(PrimitiveType::Time, Statistics::Int64(stats)) => Datum::time_micros(*stats.min())?,
(PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
Datum::timestamp_micros(*stats.min())
}
(PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
Datum::timestamptz_micros(*stats.min())
}
(PrimitiveType::Float, Statistics::Float(stats)) => Datum::float(*stats.min()),
(PrimitiveType::Double, Statistics::Double(stats)) => Datum::double(*stats.min()),
(PrimitiveType::String, Statistics::ByteArray(stats)) => {
Datum::string(stats.min().as_utf8()?)
}
// TODO:
// * Decimal
// * Uuid
// * Fixed
// * Binary
(primitive_type, _) => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Conversion of min value for column of type {} to iceberg type {} is not yet supported", stats.physical_type(), primitive_type)
));
}
}))
get_parquet_stat_min_as_datum(&primitive_type, stats)
}

fn max_value(&self, field_id: i32) -> Result<Option<Datum>> {
Expand All @@ -184,35 +157,7 @@ impl<'a> RowGroupMetricsEvaluator<'a> {
return Ok(None);
}

Ok(Some(match (primitive_type, stats) {
(PrimitiveType::Boolean, Statistics::Boolean(stats)) => Datum::bool(*stats.max()),
(PrimitiveType::Int, Statistics::Int32(stats)) => Datum::int(*stats.max()),
(PrimitiveType::Date, Statistics::Int32(stats)) => Datum::date(*stats.max()),
(PrimitiveType::Long, Statistics::Int64(stats)) => Datum::long(*stats.max()),
(PrimitiveType::Time, Statistics::Int64(stats)) => Datum::time_micros(*stats.max())?,
(PrimitiveType::Timestamp, Statistics::Int64(stats)) => {
Datum::timestamp_micros(*stats.max())
}
(PrimitiveType::Timestamptz, Statistics::Int64(stats)) => {
Datum::timestamptz_micros(*stats.max())
}
(PrimitiveType::Float, Statistics::Float(stats)) => Datum::float(*stats.max()),
(PrimitiveType::Double, Statistics::Double(stats)) => Datum::double(*stats.max()),
(PrimitiveType::String, Statistics::ByteArray(stats)) => {
Datum::string(stats.max().as_utf8()?)
}
// TODO:
// * Decimal
// * Uuid
// * Fixed
// * Binary
(primitive_type, _) => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Conversion of max value for column of type {} to iceberg type {} is not yet supported", stats.physical_type(), primitive_type)
));
}
}))
get_parquet_stat_max_as_datum(&primitive_type, stats)
}

fn visit_inequality(
Expand Down
Loading

0 comments on commit a3b3c8a

Please sign in to comment.