diff --git a/Cargo.toml b/Cargo.toml index b59d4326e..8d04f6799 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,9 +72,11 @@ once_cell = "1" opendal = "0.49" ordered-float = "4" parquet = "52" +paste = "1" pilota = "0.11.2" pretty_assertions = "1.4" port_scanner = "0.1.5" +rand = "0.8" regex = "1.10.5" reqwest = { version = "0.12", default-features = false, features = ["json"] } rust_decimal = "1.31" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 95d7f9274..6166d360d 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -66,6 +66,7 @@ once_cell = { workspace = true } opendal = { workspace = true } ordered-float = { workspace = true } parquet = { workspace = true, features = ["async"] } +paste = { workspace = true } reqwest = { workspace = true } rust_decimal = { workspace = true } serde = { workspace = true } @@ -84,6 +85,6 @@ ctor = { workspace = true } iceberg-catalog-memory = { workspace = true } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } pretty_assertions = { workspace = true } -rand = "0.8" +rand = { workspace = true } tempfile = { workspace = true } tera = { workspace = true } diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index a41243756..2ff43e0f0 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -30,7 +30,9 @@ use arrow_array::{ use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit}; use bitvec::macros::internal::funty::Fundamental; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; +use parquet::file::statistics::Statistics; use rust_decimal::prelude::ToPrimitive; +use uuid::Uuid; use crate::error::Result; use crate::spec::{ @@ -652,6 +654,107 @@ pub(crate) fn get_arrow_datum(datum: &Datum) -> Result { + paste::paste! { + /// Gets the $limit_type value from a parquet Statistics struct, as a Datum + pub(crate) fn []( + primitive_type: &PrimitiveType, stats: &Statistics + ) -> Result> { + Ok(Some(match (primitive_type, stats) { + (PrimitiveType::Boolean, Statistics::Boolean(stats)) => Datum::bool(*stats.$limit_type()), + (PrimitiveType::Int, Statistics::Int32(stats)) => Datum::int(*stats.$limit_type()), + (PrimitiveType::Date, Statistics::Int32(stats)) => Datum::date(*stats.$limit_type()), + (PrimitiveType::Long, Statistics::Int64(stats)) => Datum::long(*stats.$limit_type()), + (PrimitiveType::Time, Statistics::Int64(stats)) => Datum::time_micros(*stats.$limit_type())?, + (PrimitiveType::Timestamp, Statistics::Int64(stats)) => { + Datum::timestamp_micros(*stats.$limit_type()) + } + (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => { + Datum::timestamptz_micros(*stats.$limit_type()) + } + (PrimitiveType::TimestampNs, Statistics::Int64(stats)) => { + Datum::timestamp_nanos(*stats.$limit_type()) + } + (PrimitiveType::TimestamptzNs, Statistics::Int64(stats)) => { + Datum::timestamptz_nanos(*stats.$limit_type()) + } + (PrimitiveType::Float, Statistics::Float(stats)) => Datum::float(*stats.$limit_type()), + (PrimitiveType::Double, Statistics::Double(stats)) => Datum::double(*stats.$limit_type()), + (PrimitiveType::String, Statistics::ByteArray(stats)) => { + Datum::string(stats.$limit_type().as_utf8()?) + } + (PrimitiveType::Decimal { + precision: _, + scale: _, + }, Statistics::ByteArray(stats)) => { + Datum::new( + primitive_type.clone(), + PrimitiveLiteral::Int128(i128::from_le_bytes(stats.[<$limit_type _bytes>]().try_into()?)), + ) + } + ( + PrimitiveType::Decimal { + precision: _, + scale: _, + }, + Statistics::Int32(stats)) => { + Datum::new( + primitive_type.clone(), + PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())), + ) + } + + ( + PrimitiveType::Decimal { + precision: _, + scale: _, + }, + Statistics::Int64(stats), + ) => { + Datum::new( + primitive_type.clone(), + PrimitiveLiteral::Int128(i128::from(*stats.$limit_type())), + ) + } + (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stats)) => { + let raw = stats.[<$limit_type _bytes>](); + if raw.len() != 16 { + return Err(Error::new( + ErrorKind::Unexpected, + "Invalid length of uuid bytes.", + )); + } + Datum::uuid(Uuid::from_bytes( + raw[..16].try_into().unwrap(), + )) + } + (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => { + let raw = stat.[<$limit_type _bytes>](); + if raw.len() != *len as usize { + return Err(Error::new( + ErrorKind::Unexpected, + "Invalid length of fixed bytes.", + )); + } + Datum::fixed(raw.to_vec()) + } + (PrimitiveType::Binary, Statistics::ByteArray(stat)) => { + Datum::binary(stat.[<$limit_type _bytes>]().to_vec()) + } + _ => { + return Ok(None); + } + })) + } + } + } +} + +get_parquet_stat_as_datum!(min); + +get_parquet_stat_as_datum!(max); + impl TryFrom<&ArrowSchema> for crate::spec::Schema { type Error = Error; diff --git a/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs index 2c0a2244c..4bf53d6ee 100644 --- a/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/row_group_metrics_evaluator.rs @@ -23,6 +23,7 @@ use fnv::FnvHashSet; use parquet::file::metadata::RowGroupMetaData; use parquet::file::statistics::Statistics; +use crate::arrow::{get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum}; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference}; use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema}; @@ -144,35 +145,7 @@ impl<'a> RowGroupMetricsEvaluator<'a> { return Ok(None); } - Ok(Some(match (primitive_type, stats) { - (PrimitiveType::Boolean, Statistics::Boolean(stats)) => Datum::bool(*stats.min()), - (PrimitiveType::Int, Statistics::Int32(stats)) => Datum::int(*stats.min()), - (PrimitiveType::Date, Statistics::Int32(stats)) => Datum::date(*stats.min()), - (PrimitiveType::Long, Statistics::Int64(stats)) => Datum::long(*stats.min()), - (PrimitiveType::Time, Statistics::Int64(stats)) => Datum::time_micros(*stats.min())?, - (PrimitiveType::Timestamp, Statistics::Int64(stats)) => { - Datum::timestamp_micros(*stats.min()) - } - (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => { - Datum::timestamptz_micros(*stats.min()) - } - (PrimitiveType::Float, Statistics::Float(stats)) => Datum::float(*stats.min()), - (PrimitiveType::Double, Statistics::Double(stats)) => Datum::double(*stats.min()), - (PrimitiveType::String, Statistics::ByteArray(stats)) => { - Datum::string(stats.min().as_utf8()?) - } - // TODO: - // * Decimal - // * Uuid - // * Fixed - // * Binary - (primitive_type, _) => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - format!("Conversion of min value for column of type {} to iceberg type {} is not yet supported", stats.physical_type(), primitive_type) - )); - } - })) + get_parquet_stat_min_as_datum(&primitive_type, stats) } fn max_value(&self, field_id: i32) -> Result> { @@ -184,35 +157,7 @@ impl<'a> RowGroupMetricsEvaluator<'a> { return Ok(None); } - Ok(Some(match (primitive_type, stats) { - (PrimitiveType::Boolean, Statistics::Boolean(stats)) => Datum::bool(*stats.max()), - (PrimitiveType::Int, Statistics::Int32(stats)) => Datum::int(*stats.max()), - (PrimitiveType::Date, Statistics::Int32(stats)) => Datum::date(*stats.max()), - (PrimitiveType::Long, Statistics::Int64(stats)) => Datum::long(*stats.max()), - (PrimitiveType::Time, Statistics::Int64(stats)) => Datum::time_micros(*stats.max())?, - (PrimitiveType::Timestamp, Statistics::Int64(stats)) => { - Datum::timestamp_micros(*stats.max()) - } - (PrimitiveType::Timestamptz, Statistics::Int64(stats)) => { - Datum::timestamptz_micros(*stats.max()) - } - (PrimitiveType::Float, Statistics::Float(stats)) => Datum::float(*stats.max()), - (PrimitiveType::Double, Statistics::Double(stats)) => Datum::double(*stats.max()), - (PrimitiveType::String, Statistics::ByteArray(stats)) => { - Datum::string(stats.max().as_utf8()?) - } - // TODO: - // * Decimal - // * Uuid - // * Fixed - // * Binary - (primitive_type, _) => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - format!("Conversion of max value for column of type {} to iceberg type {} is not yet supported", stats.physical_type(), primitive_type) - )); - } - })) + get_parquet_stat_max_as_datum(&primitive_type, stats) } fn visit_inequality( diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 11ba04f6a..3e2db5855 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -27,23 +27,20 @@ use futures::future::BoxFuture; use itertools::Itertools; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; use parquet::arrow::AsyncArrowWriter; -use parquet::data_type::{ - BoolType, ByteArray, ByteArrayType, DataType as ParquetDataType, DoubleType, FixedLenByteArray, - FixedLenByteArrayType, FloatType, Int32Type, Int64Type, -}; use parquet::file::properties::WriterProperties; -use parquet::file::statistics::{from_thrift, Statistics, TypedStatistics}; +use parquet::file::statistics::{from_thrift, Statistics}; use parquet::format::FileMetaData; -use uuid::Uuid; use super::location_generator::{FileNameGenerator, LocationGenerator}; use super::track_writer::TrackWriter; use super::{FileWriter, FileWriterBuilder}; -use crate::arrow::DEFAULT_MAP_FIELD_NAME; +use crate::arrow::{ + get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, DEFAULT_MAP_FIELD_NAME, +}; use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ visit_schema, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, NestedFieldRef, - PrimitiveLiteral, PrimitiveType, Schema, SchemaRef, SchemaVisitor, StructType, Type, + PrimitiveType, Schema, SchemaRef, SchemaVisitor, StructType, Type, }; use crate::writer::CurrentFileStatus; use crate::{Error, ErrorKind, Result}; @@ -237,34 +234,26 @@ impl MinMaxColAggregator { } } - fn update_state( - &mut self, - field_id: i32, - state: &TypedStatistics, - convert_func: impl Fn(::T) -> Result, - ) { - if state.min_is_exact() { - let val = convert_func(state.min().clone()).unwrap(); - self.lower_bounds - .entry(field_id) - .and_modify(|e| { - if *e > val { - *e = val.clone() - } - }) - .or_insert(val); - } - if state.max_is_exact() { - let val = convert_func(state.max().clone()).unwrap(); - self.upper_bounds - .entry(field_id) - .and_modify(|e| { - if *e < val { - *e = val.clone() - } - }) - .or_insert(val); - } + fn update_state_min(&mut self, field_id: i32, datum: Datum) { + self.lower_bounds + .entry(field_id) + .and_modify(|e| { + if *e > datum { + *e = datum.clone() + } + }) + .or_insert(datum); + } + + fn update_state_max(&mut self, field_id: i32, datum: Datum) { + self.upper_bounds + .entry(field_id) + .and_modify(|e| { + if *e > datum { + *e = datum.clone() + } + }) + .or_insert(datum); } fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> { @@ -287,142 +276,28 @@ impl MinMaxColAggregator { )); }; - match (&ty, value) { - (PrimitiveType::Boolean, Statistics::Boolean(stat)) => { - let convert_func = |v: bool| Result::::Ok(Datum::bool(v)); - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::Int, Statistics::Int32(stat)) => { - let convert_func = |v: i32| Result::::Ok(Datum::int(v)); - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::Long, Statistics::Int64(stat)) => { - let convert_func = |v: i64| Result::::Ok(Datum::long(v)); - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::Float, Statistics::Float(stat)) => { - let convert_func = |v: f32| Result::::Ok(Datum::float(v)); - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::Double, Statistics::Double(stat)) => { - let convert_func = |v: f64| Result::::Ok(Datum::double(v)); - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::String, Statistics::ByteArray(stat)) => { - let convert_func = |v: ByteArray| { - Result::::Ok(Datum::string( - String::from_utf8(v.data().to_vec()).unwrap(), - )) - }; - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::Binary, Statistics::ByteArray(stat)) => { - let convert_func = - |v: ByteArray| Result::::Ok(Datum::binary(v.data().to_vec())); - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::Date, Statistics::Int32(stat)) => { - let convert_func = |v: i32| Result::::Ok(Datum::date(v)); - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::Time, Statistics::Int64(stat)) => { - let convert_func = |v: i64| Datum::time_micros(v); - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::Timestamp, Statistics::Int64(stat)) => { - let convert_func = |v: i64| Result::::Ok(Datum::timestamp_micros(v)); - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::Timestamptz, Statistics::Int64(stat)) => { - let convert_func = |v: i64| Result::::Ok(Datum::timestamptz_micros(v)); - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::TimestampNs, Statistics::Int64(stat)) => { - let convert_func = |v: i64| Result::::Ok(Datum::timestamp_nanos(v)); - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::TimestamptzNs, Statistics::Int64(stat)) => { - let convert_func = |v: i64| Result::::Ok(Datum::timestamptz_nanos(v)); - self.update_state::(field_id, &stat, convert_func) - } - ( - PrimitiveType::Decimal { - precision: _, - scale: _, - }, - Statistics::ByteArray(stat), - ) => { - let convert_func = |v: ByteArray| -> Result { - Result::::Ok(Datum::new( - ty.clone(), - PrimitiveLiteral::Int128(i128::from_le_bytes(v.data().try_into().unwrap())), - )) - }; - self.update_state::(field_id, &stat, convert_func) - } - ( - PrimitiveType::Decimal { - precision: _, - scale: _, - }, - Statistics::Int32(stat), - ) => { - let convert_func = |v: i32| { - Result::::Ok(Datum::new( - ty.clone(), - PrimitiveLiteral::Int128(i128::from(v)), - )) - }; - self.update_state::(field_id, &stat, convert_func) - } - ( - PrimitiveType::Decimal { - precision: _, - scale: _, - }, - Statistics::Int64(stat), - ) => { - let convert_func = |v: i64| { - Result::::Ok(Datum::new( - ty.clone(), - PrimitiveLiteral::Int128(i128::from(v)), - )) - }; - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::Uuid, Statistics::FixedLenByteArray(stat)) => { - let convert_func = |v: FixedLenByteArray| { - if v.len() != 16 { - return Err(Error::new( - ErrorKind::Unexpected, - "Invalid length of uuid bytes.", - )); - } - Ok(Datum::uuid(Uuid::from_bytes( - v.data()[..16].try_into().unwrap(), - ))) - }; - self.update_state::(field_id, &stat, convert_func) - } - (PrimitiveType::Fixed(len), Statistics::FixedLenByteArray(stat)) => { - let convert_func = |v: FixedLenByteArray| { - if v.len() != *len as usize { - return Err(Error::new( - ErrorKind::Unexpected, - "Invalid length of fixed bytes.", - )); - } - Ok(Datum::fixed(v.data().to_vec())) - }; - self.update_state::(field_id, &stat, convert_func) - } - (ty, value) => { + if value.min_is_exact() { + let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else { return Err(Error::new( ErrorKind::Unexpected, format!("Statistics {} is not match with field type {}.", value, ty), - )) - } + )); + }; + + self.update_state_min(field_id, min_datum); } + + if value.max_is_exact() { + let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Statistics {} is not match with field type {}.", value, ty), + )); + }; + + self.update_state_max(field_id, max_datum); + } + Ok(()) } @@ -609,6 +484,7 @@ mod tests { use arrow_select::concat::concat_batches; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use tempfile::TempDir; + use uuid::Uuid; use super::*; use crate::io::FileIOBuilder;