From fea1817c45e4da4c7d4a84bb2370aee6c0de1185 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Tue, 1 Oct 2024 12:56:10 +0200 Subject: [PATCH] Merge branch 'feat/safe-partition-spec' --- crates/catalog/memory/src/catalog.rs | 4 +- .../src/expr/visitors/expression_evaluator.rs | 131 ++--- .../visitors/inclusive_metrics_evaluator.rs | 11 +- .../src/expr/visitors/inclusive_projection.rs | 155 +++-- crates/iceberg/src/io/object_cache.rs | 2 +- crates/iceberg/src/scan.rs | 8 +- crates/iceberg/src/spec/manifest.rs | 512 ++++++++-------- crates/iceberg/src/spec/partition.rs | 550 ++++++++++++++---- crates/iceberg/src/spec/table_metadata.rs | 313 ++++++---- .../writer/file_writer/location_generator.rs | 5 +- 10 files changed, 1083 insertions(+), 608 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 1da044821..eebce36ff 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -355,7 +355,7 @@ mod tests { assert_eq!(metadata.current_schema().as_ref(), expected_schema); - let expected_partition_spec = PartitionSpec::builder(expected_schema) + let expected_partition_spec = PartitionSpec::builder((*expected_schema).clone()) .with_spec_id(0) .build() .unwrap(); @@ -365,7 +365,7 @@ mod tests { .partition_specs_iter() .map(|p| p.as_ref()) .collect_vec(), - vec![&expected_partition_spec] + vec![&expected_partition_spec.into_schemaless()] ); let expected_sorted_order = SortOrder::builder() diff --git a/crates/iceberg/src/expr/visitors/expression_evaluator.rs b/crates/iceberg/src/expr/visitors/expression_evaluator.rs index 8f3c2971c..94a003ef6 100644 --- a/crates/iceberg/src/expr/visitors/expression_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/expression_evaluator.rs @@ -259,14 +259,11 @@ mod tests { }; use crate::spec::{ DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionSpec, - PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type, - UnboundPartitionField, + PartitionSpecRef, PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField, }; use crate::Result; - fn create_schema_and_partition_spec( - r#type: PrimitiveType, - ) -> Result<(SchemaRef, PartitionSpecRef)> { + fn create_partition_spec(r#type: PrimitiveType) -> Result { let schema = Schema::builder() .with_fields(vec![Arc::new(NestedField::optional( 1, @@ -275,7 +272,7 @@ mod tests { ))]) .build()?; - let spec = PartitionSpec::builder(&schema) + let spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField::builder() .source_id(1) @@ -287,16 +284,15 @@ mod tests { .build() .unwrap(); - Ok((Arc::new(schema), Arc::new(spec))) + Ok(Arc::new(spec)) } fn create_partition_filter( - schema: &Schema, partition_spec: PartitionSpecRef, predicate: &BoundPredicate, case_sensitive: bool, ) -> Result { - let partition_type = partition_spec.partition_type(schema)?; + let partition_type = partition_spec.partition_type(); let partition_fields = partition_type.fields().to_owned(); let partition_schema = Schema::builder() @@ -304,7 +300,8 @@ mod tests { .with_fields(partition_fields) .build()?; - let mut inclusive_projection = InclusiveProjection::new(partition_spec); + let mut inclusive_projection = + InclusiveProjection::new((*partition_spec).clone().into_schemaless().into()); let partition_filter = inclusive_projection .project(predicate)? @@ -315,13 +312,11 @@ mod tests { } fn create_expression_evaluator( - schema: &Schema, partition_spec: PartitionSpecRef, predicate: &BoundPredicate, case_sensitive: bool, ) -> Result { - let partition_filter = - create_partition_filter(schema, partition_spec, predicate, case_sensitive)?; + let partition_filter = create_partition_filter(partition_spec, predicate, case_sensitive)?; Ok(ExpressionEvaluator::new(partition_filter)) } @@ -375,7 +370,7 @@ mod tests { #[test] fn test_expr_or() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::LessThan, @@ -387,10 +382,10 @@ mod tests { Reference::new("a"), Datum::float(0.4), ))) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -404,7 +399,7 @@ mod tests { #[test] fn test_expr_and() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::LessThan, @@ -416,10 +411,10 @@ mod tests { Reference::new("a"), Datum::float(0.4), ))) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -433,17 +428,17 @@ mod tests { #[test] fn test_expr_not_in() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Set(SetExpression::new( PredicateOperator::NotIn, Reference::new("a"), FnvHashSet::from_iter([Datum::float(0.9), Datum::float(1.2), Datum::float(2.4)]), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -457,17 +452,17 @@ mod tests { #[test] fn test_expr_in() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Set(SetExpression::new( PredicateOperator::In, Reference::new("a"), FnvHashSet::from_iter([Datum::float(1.0), Datum::float(1.2), Datum::float(2.4)]), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -481,17 +476,17 @@ mod tests { #[test] fn test_expr_not_starts_with() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::String)?; + let partition_spec = create_partition_spec(PrimitiveType::String)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::NotStartsWith, Reference::new("a"), Datum::string("not"), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_string(); @@ -505,17 +500,17 @@ mod tests { #[test] fn test_expr_starts_with() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::String)?; + let partition_spec = create_partition_spec(PrimitiveType::String)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::StartsWith, Reference::new("a"), Datum::string("test"), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_string(); @@ -529,17 +524,17 @@ mod tests { #[test] fn test_expr_not_eq() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::NotEq, Reference::new("a"), Datum::float(0.9), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -553,17 +548,17 @@ mod tests { #[test] fn test_expr_eq() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::Eq, Reference::new("a"), Datum::float(1.0), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -577,17 +572,17 @@ mod tests { #[test] fn test_expr_greater_than_or_eq() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::GreaterThanOrEq, Reference::new("a"), Datum::float(1.0), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -601,17 +596,17 @@ mod tests { #[test] fn test_expr_greater_than() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::GreaterThan, Reference::new("a"), Datum::float(0.9), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -625,17 +620,17 @@ mod tests { #[test] fn test_expr_less_than_or_eq() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::LessThanOrEq, Reference::new("a"), Datum::float(1.0), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -649,17 +644,17 @@ mod tests { #[test] fn test_expr_less_than() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Binary(BinaryExpression::new( PredicateOperator::LessThan, Reference::new("a"), Datum::float(1.1), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -673,15 +668,15 @@ mod tests { #[test] fn test_expr_is_not_nan() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Unary(UnaryExpression::new( PredicateOperator::NotNan, Reference::new("a"), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -695,15 +690,15 @@ mod tests { #[test] fn test_expr_is_nan() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Unary(UnaryExpression::new( PredicateOperator::IsNan, Reference::new("a"), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -717,15 +712,15 @@ mod tests { #[test] fn test_expr_is_not_null() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Unary(UnaryExpression::new( PredicateOperator::NotNull, Reference::new("a"), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -739,15 +734,15 @@ mod tests { #[test] fn test_expr_is_null() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; let predicate = Predicate::Unary(UnaryExpression::new( PredicateOperator::IsNull, Reference::new("a"), )) - .bind(schema.clone(), case_sensitive)?; + .bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -761,11 +756,12 @@ mod tests { #[test] fn test_expr_always_false() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; - let predicate = Predicate::AlwaysFalse.bind(schema.clone(), case_sensitive)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let predicate = + Predicate::AlwaysFalse.bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); @@ -779,11 +775,12 @@ mod tests { #[test] fn test_expr_always_true() -> Result<()> { let case_sensitive = true; - let (schema, partition_spec) = create_schema_and_partition_spec(PrimitiveType::Float)?; - let predicate = Predicate::AlwaysTrue.bind(schema.clone(), case_sensitive)?; + let partition_spec = create_partition_spec(PrimitiveType::Float)?; + let predicate = + Predicate::AlwaysTrue.bind(partition_spec.schema_ref().clone(), case_sensitive)?; let expression_evaluator = - create_expression_evaluator(&schema, partition_spec, &predicate, case_sensitive)?; + create_expression_evaluator(partition_spec, &predicate, case_sensitive)?; let data_file = create_data_file_float(); diff --git a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs index a2ee4722f..89bbd6f56 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs @@ -504,10 +504,10 @@ mod test { #[test] fn test_data_file_no_partitions() { - let (table_schema_ref, _partition_spec_ref) = create_test_schema_and_partition_spec(); + let partition_spec_ref = create_test_partition_spec(); let partition_filter = Predicate::AlwaysTrue - .bind(table_schema_ref.clone(), false) + .bind(partition_spec_ref.schema_ref().clone(), false) .unwrap(); let case_sensitive = false; @@ -1645,7 +1645,7 @@ mod test { assert!(result, "Should read: NotIn on no nulls column"); } - fn create_test_schema_and_partition_spec() -> (Arc, Arc) { + fn create_test_partition_spec() -> Arc { let table_schema = Schema::builder() .with_fields(vec![Arc::new(NestedField::optional( 1, @@ -1656,7 +1656,7 @@ mod test { .unwrap(); let table_schema_ref = Arc::new(table_schema); - let partition_spec = PartitionSpec::builder(&table_schema_ref) + let partition_spec = PartitionSpec::builder(table_schema_ref.clone()) .with_spec_id(1) .add_unbound_fields(vec![UnboundPartitionField::builder() .source_id(1) @@ -1667,8 +1667,7 @@ mod test { .unwrap() .build() .unwrap(); - let partition_spec_ref = Arc::new(partition_spec); - (table_schema_ref, partition_spec_ref) + Arc::new(partition_spec) } fn not_null(reference: &str) -> BoundPredicate { diff --git a/crates/iceberg/src/expr/visitors/inclusive_projection.rs b/crates/iceberg/src/expr/visitors/inclusive_projection.rs index 2087207ea..7594e6458 100644 --- a/crates/iceberg/src/expr/visitors/inclusive_projection.rs +++ b/crates/iceberg/src/expr/visitors/inclusive_projection.rs @@ -21,16 +21,16 @@ use fnv::FnvHashSet; use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor}; use crate::expr::{BoundPredicate, BoundReference, Predicate}; -use crate::spec::{Datum, PartitionField, PartitionSpecRef}; +use crate::spec::{Datum, PartitionField, SchemalessPartitionSpecRef}; use crate::Error; pub(crate) struct InclusiveProjection { - partition_spec: PartitionSpecRef, + partition_spec: SchemalessPartitionSpecRef, cached_parts: HashMap>, } impl InclusiveProjection { - pub(crate) fn new(partition_spec: PartitionSpecRef) -> Self { + pub(crate) fn new(partition_spec: SchemalessPartitionSpecRef) -> Self { Self { partition_spec, cached_parts: HashMap::new(), @@ -235,7 +235,7 @@ mod tests { use crate::expr::visitors::inclusive_projection::InclusiveProjection; use crate::expr::{Bind, Predicate, Reference}; use crate::spec::{ - Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, Schema, Transform, Type, + Datum, NestedField, PartitionSpec, PrimitiveType, Schema, Transform, Type, UnboundPartitionField, }; @@ -265,13 +265,14 @@ mod tests { #[test] fn test_inclusive_projection_logic_ops() { let schema = build_test_schema(); + let arc_schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(&schema) + let partition_spec = PartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .build() - .unwrap(); + .unwrap() + .into_schemaless(); - let arc_schema = Arc::new(schema); let arc_partition_spec = Arc::new(partition_spec); // this predicate contains only logic operators, @@ -295,8 +296,9 @@ mod tests { #[test] fn test_inclusive_projection_identity_transform() { let schema = build_test_schema(); + let arc_schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(&schema) + let partition_spec = PartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_field( UnboundPartitionField::builder() @@ -308,9 +310,9 @@ mod tests { ) .unwrap() .build() - .unwrap(); + .unwrap() + .into_schemaless(); - let arc_schema = Arc::new(schema); let arc_partition_spec = Arc::new(partition_spec); let unbound_predicate = Reference::new("a").less_than(Datum::int(10)); @@ -321,7 +323,7 @@ mod tests { // should result in the same Predicate as the original // `unbound_predicate`, since we have just a single partition field, // and it has an Identity transform - let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); + let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec); let result = inclusive_projection.project(&bound_predicate).unwrap(); let expected = "a < 10".to_string(); @@ -330,34 +332,95 @@ mod tests { } #[test] - fn test_inclusive_projection_date_transforms() { + fn test_inclusive_projection_date_year_transform() { let schema = build_test_schema(); + let arc_schema = Arc::new(schema); + + let partition_spec = PartitionSpec::builder(arc_schema.clone()) + .with_spec_id(1) + .add_unbound_fields(vec![UnboundPartitionField { + source_id: 2, + name: "year".to_string(), + field_id: Some(1000), + transform: Transform::Year, + }]) + .unwrap() + .build() + .unwrap() + .into_schemaless(); + + let arc_partition_spec = Arc::new(partition_spec); + + let unbound_predicate = + Reference::new("date").less_than(Datum::date_from_str("2024-01-01").unwrap()); + + let bound_predicate = unbound_predicate.bind(arc_schema.clone(), false).unwrap(); + + // applying InclusiveProjection to bound_predicate + // should result in a predicate that correctly handles + // year, month and date + let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec); + let result = inclusive_projection.project(&bound_predicate).unwrap(); + + let expected = "year <= 53".to_string(); + + assert_eq!(result.to_string(), expected); + } + + #[test] + fn test_inclusive_projection_date_month_transform() { + let schema = build_test_schema(); + let arc_schema = Arc::new(schema); + + let partition_spec = PartitionSpec::builder(arc_schema.clone()) + .with_spec_id(1) + .add_unbound_fields(vec![UnboundPartitionField { + source_id: 2, + name: "month".to_string(), + field_id: Some(1000), + transform: Transform::Month, + }]) + .unwrap() + .build() + .unwrap() + .into_schemaless(); + + let arc_partition_spec = Arc::new(partition_spec); + + let unbound_predicate = + Reference::new("date").less_than(Datum::date_from_str("2024-01-01").unwrap()); + + let bound_predicate = unbound_predicate.bind(arc_schema.clone(), false).unwrap(); - let partition_spec = PartitionSpec { - spec_id: 1, - fields: vec![ - PartitionField { - source_id: 2, - name: "year".to_string(), - field_id: 1000, - transform: Transform::Year, - }, - PartitionField { - source_id: 2, - name: "month".to_string(), - field_id: 1001, - transform: Transform::Month, - }, - PartitionField { - source_id: 2, - name: "day".to_string(), - field_id: 1002, - transform: Transform::Day, - }, - ], - }; + // applying InclusiveProjection to bound_predicate + // should result in a predicate that correctly handles + // year, month and date + let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec); + let result = inclusive_projection.project(&bound_predicate).unwrap(); + + let expected = "month <= 647".to_string(); + + assert_eq!(result.to_string(), expected); + } + #[test] + fn test_inclusive_projection_date_day_transform() { + let schema = build_test_schema(); let arc_schema = Arc::new(schema); + + let partition_spec = PartitionSpec::builder(arc_schema.clone()) + .with_spec_id(1) + .add_unbound_fields(vec![UnboundPartitionField { + source_id: 2, + name: "day".to_string(), + field_id: Some(1000), + transform: Transform::Day, + }]) + .unwrap() + .build() + .unwrap() + .into_schemaless(); + let arc_partition_spec = Arc::new(partition_spec); let unbound_predicate = @@ -368,10 +431,10 @@ mod tests { // applying InclusiveProjection to bound_predicate // should result in a predicate that correctly handles // year, month and date - let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); + let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec); let result = inclusive_projection.project(&bound_predicate).unwrap(); - let expected = "((year <= 53) AND (month <= 647)) AND (day <= 19722)".to_string(); + let expected = "day <= 19722".to_string(); assert_eq!(result.to_string(), expected); } @@ -379,8 +442,9 @@ mod tests { #[test] fn test_inclusive_projection_truncate_transform() { let schema = build_test_schema(); + let arc_schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(&schema) + let partition_spec = PartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_field( UnboundPartitionField::builder() @@ -392,9 +456,9 @@ mod tests { ) .unwrap() .build() - .unwrap(); + .unwrap() + .into_schemaless(); - let arc_schema = Arc::new(schema); let arc_partition_spec = Arc::new(partition_spec); let unbound_predicate = Reference::new("name").starts_with(Datum::string("Testy McTest")); @@ -408,7 +472,7 @@ mod tests { // name that start with "Testy McTest" into a partition // for values of name that start with the first four letters // of that, ie "Test". - let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); + let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec); let result = inclusive_projection.project(&bound_predicate).unwrap(); let expected = "name_truncate STARTS WITH \"Test\"".to_string(); @@ -419,8 +483,9 @@ mod tests { #[test] fn test_inclusive_projection_bucket_transform() { let schema = build_test_schema(); + let arc_schema = Arc::new(schema); - let partition_spec = PartitionSpec::builder(&schema) + let partition_spec = PartitionSpec::builder(arc_schema.clone()) .with_spec_id(1) .add_unbound_field( UnboundPartitionField::builder() @@ -432,9 +497,9 @@ mod tests { ) .unwrap() .build() - .unwrap(); + .unwrap() + .into_schemaless(); - let arc_schema = Arc::new(schema); let arc_partition_spec = Arc::new(partition_spec); let unbound_predicate = Reference::new("a").equal_to(Datum::int(10)); @@ -445,7 +510,7 @@ mod tests { // should result in the "a = 10" predicate being // transformed into "a = 2", since 10 gets bucketed // to 2 with a Bucket(7) partition - let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone()); + let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec); let result = inclusive_projection.project(&bound_predicate).unwrap(); let expected = "a_bucket[7] = 2".to_string(); diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 731072a5a..35b6a2c94 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -262,7 +262,7 @@ mod tests { ) .write(Manifest::new( ManifestMetadata::builder() - .schema((*current_schema).clone()) + .schema(current_schema.clone()) .content(ManifestContentType::Data) .format_version(FormatVersion::V2) .partition_spec((**current_partition_spec).clone()) diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index f5cbbcf06..ab00b1eec 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -688,7 +688,7 @@ impl PartitionFilterCache { &self, spec_id: i32, table_metadata: &TableMetadataRef, - schema: &SchemaRef, + schema: &Schema, case_sensitive: bool, filter: BoundPredicate, ) -> Result> { @@ -714,11 +714,11 @@ impl PartitionFilterCache { format!("Could not find partition spec for id {}", spec_id), ))?; - let partition_type = partition_spec.partition_type(schema.as_ref())?; + let partition_type = partition_spec.partition_type(schema)?; let partition_fields = partition_type.fields().to_owned(); let partition_schema = Arc::new( Schema::builder() - .with_schema_id(partition_spec.spec_id) + .with_schema_id(partition_spec.spec_id()) .with_fields(partition_fields) .build()?, ); @@ -1012,7 +1012,7 @@ mod tests { ) .write(Manifest::new( ManifestMetadata::builder() - .schema((*current_schema).clone()) + .schema(current_schema.clone()) .content(ManifestContentType::Data) .format_version(FormatVersion::V2) .partition_spec((**current_partition_spec).clone()) diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index f0dfdf47c..12d52e494 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -31,7 +31,7 @@ use typed_builder::TypedBuilder; use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; use super::{ Datum, FieldSummary, FormatVersion, ManifestContentType, ManifestFile, PartitionSpec, Schema, - SchemaId, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER, + SchemaId, SchemaRef, Struct, INITIAL_SEQUENCE_NUMBER, UNASSIGNED_SEQUENCE_NUMBER, }; use crate::error::Result; use crate::io::OutputFile; @@ -55,7 +55,7 @@ impl Manifest { let metadata = ManifestMetadata::parse(meta)?; // Parse manifest entries - let partition_type = metadata.partition_spec.partition_type(&metadata.schema)?; + let partition_type = metadata.partition_spec.partition_type(); let entries = match metadata.format_version { FormatVersion::V1 => { @@ -65,7 +65,7 @@ impl Manifest { .into_iter() .map(|value| { from_value::<_serde::ManifestEntryV1>(&value?)? - .try_into(&partition_type, &metadata.schema) + .try_into(partition_type, &metadata.schema) }) .collect::>>()? } @@ -76,7 +76,7 @@ impl Manifest { .into_iter() .map(|value| { from_value::<_serde::ManifestEntryV2>(&value?)? - .try_into(&partition_type, &metadata.schema) + .try_into(partition_type, &metadata.schema) }) .collect::>>()? } @@ -206,10 +206,7 @@ impl ManifestWriter { /// Write a manifest. pub async fn write(mut self, manifest: Manifest) -> Result { // Create the avro writer - let partition_type = manifest - .metadata - .partition_spec - .partition_type(&manifest.metadata.schema)?; + let partition_type = manifest.metadata.partition_spec.partition_type(); let table_schema = &manifest.metadata.schema; let avro_schema = match manifest.metadata.format_version { FormatVersion::V1 => manifest_schema_v1(partition_type.clone())?, @@ -284,12 +281,12 @@ impl ManifestWriter { let value = match manifest.metadata.format_version { FormatVersion::V1 => to_value(_serde::ManifestEntryV1::try_from( (*entry).clone(), - &partition_type, + partition_type, )?)? .resolve(&avro_schema)?, FormatVersion::V2 => to_value(_serde::ManifestEntryV2::try_from( (*entry).clone(), - &partition_type, + partition_type, )?)? .resolve(&avro_schema)?, }; @@ -705,7 +702,7 @@ mod _const_schema { pub struct ManifestMetadata { /// The table schema at the time the manifest /// was written - schema: Schema, + schema: SchemaRef, /// ID of the schema used to write the manifest as a string schema_id: SchemaId, /// The partition spec used to write the manifest @@ -719,7 +716,7 @@ pub struct ManifestMetadata { impl ManifestMetadata { /// Parse from metadata in avro file. pub fn parse(meta: &HashMap>) -> Result { - let schema = { + let schema = Arc::new({ let bs = meta.get("schema").ok_or_else(|| { Error::new( ErrorKind::DataInvalid, @@ -733,7 +730,7 @@ impl ManifestMetadata { ) .with_source(err) })? - }; + }); let schema_id: i32 = meta .get("schema-id") .map(|bs| { @@ -776,7 +773,10 @@ impl ManifestMetadata { }) .transpose()? .unwrap_or(0); - PartitionSpec { spec_id, fields } + PartitionSpec::builder(schema.clone()) + .with_spec_id(spec_id) + .add_unbound_fields(fields.into_iter().map(|f| f.into_unbound()))? + .build()? }; let format_version = if let Some(bs) = meta.get("format-version") { serde_json::from_slice::(bs).map_err(|err| { @@ -1514,82 +1514,82 @@ mod tests { #[tokio::test] async fn test_parse_manifest_v2_unpartition() { + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + // id v_int v_long v_float v_double v_varchar v_bool v_date v_timestamp v_decimal v_ts_ntz + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 3, + "v_long", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "v_float", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 5, + "v_double", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 6, + "v_varchar", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 7, + "v_bool", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 8, + "v_date", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 9, + "v_timestamp", + Type::Primitive(PrimitiveType::Timestamptz), + )), + Arc::new(NestedField::optional( + 10, + "v_decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 36, + scale: 10, + }), + )), + Arc::new(NestedField::optional( + 11, + "v_ts_ntz", + Type::Primitive(PrimitiveType::Timestamp), + )), + Arc::new(NestedField::optional( + 12, + "v_ts_ns_ntz", + Type::Primitive(PrimitiveType::TimestampNs), + )), + ]) + .build() + .unwrap(), + ); let manifest = Manifest { metadata: ManifestMetadata { schema_id: 0, - schema: Schema::builder() - .with_fields(vec![ - // id v_int v_long v_float v_double v_varchar v_bool v_date v_timestamp v_decimal v_ts_ntz - Arc::new(NestedField::optional( - 1, - "id", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 2, - "v_int", - Type::Primitive(PrimitiveType::Int), - )), - Arc::new(NestedField::optional( - 3, - "v_long", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 4, - "v_float", - Type::Primitive(PrimitiveType::Float), - )), - Arc::new(NestedField::optional( - 5, - "v_double", - Type::Primitive(PrimitiveType::Double), - )), - Arc::new(NestedField::optional( - 6, - "v_varchar", - Type::Primitive(PrimitiveType::String), - )), - Arc::new(NestedField::optional( - 7, - "v_bool", - Type::Primitive(PrimitiveType::Boolean), - )), - Arc::new(NestedField::optional( - 8, - "v_date", - Type::Primitive(PrimitiveType::Date), - )), - Arc::new(NestedField::optional( - 9, - "v_timestamp", - Type::Primitive(PrimitiveType::Timestamptz), - )), - Arc::new(NestedField::optional( - 10, - "v_decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 36, - scale: 10, - }), - )), - Arc::new(NestedField::optional( - 11, - "v_ts_ntz", - Type::Primitive(PrimitiveType::Timestamp), - )), - Arc::new(NestedField::optional( - 12, - "v_ts_ns_ntz", - Type::Primitive(PrimitiveType::TimestampNs - ))), - ]) - .build() - .unwrap(), - partition_spec: PartitionSpec { - spec_id: 0, - fields: vec![], - }, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V2, }, @@ -1628,94 +1628,83 @@ mod tests { #[tokio::test] async fn test_parse_manifest_v2_partition() { + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 3, + "v_long", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 4, + "v_float", + Type::Primitive(PrimitiveType::Float), + )), + Arc::new(NestedField::optional( + 5, + "v_double", + Type::Primitive(PrimitiveType::Double), + )), + Arc::new(NestedField::optional( + 6, + "v_varchar", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 7, + "v_bool", + Type::Primitive(PrimitiveType::Boolean), + )), + Arc::new(NestedField::optional( + 8, + "v_date", + Type::Primitive(PrimitiveType::Date), + )), + Arc::new(NestedField::optional( + 9, + "v_timestamp", + Type::Primitive(PrimitiveType::Timestamptz), + )), + Arc::new(NestedField::optional( + 10, + "v_decimal", + Type::Primitive(PrimitiveType::Decimal { + precision: 36, + scale: 10, + }), + )), + Arc::new(NestedField::optional( + 11, + "v_ts_ntz", + Type::Primitive(PrimitiveType::Timestamp), + )), + Arc::new(NestedField::optional( + 12, + "v_ts_ns_ntz", + Type::Primitive(PrimitiveType::TimestampNs), + )), + ]) + .build() + .unwrap(), + ); let manifest = Manifest { metadata: ManifestMetadata { schema_id: 0, - schema: Schema::builder() - .with_fields(vec![ - Arc::new(NestedField::optional( - 1, - "id", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 2, - "v_int", - Type::Primitive(PrimitiveType::Int), - )), - Arc::new(NestedField::optional( - 3, - "v_long", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 4, - "v_float", - Type::Primitive(PrimitiveType::Float), - )), - Arc::new(NestedField::optional( - 5, - "v_double", - Type::Primitive(PrimitiveType::Double), - )), - Arc::new(NestedField::optional( - 6, - "v_varchar", - Type::Primitive(PrimitiveType::String), - )), - Arc::new(NestedField::optional( - 7, - "v_bool", - Type::Primitive(PrimitiveType::Boolean), - )), - Arc::new(NestedField::optional( - 8, - "v_date", - Type::Primitive(PrimitiveType::Date), - )), - Arc::new(NestedField::optional( - 9, - "v_timestamp", - Type::Primitive(PrimitiveType::Timestamptz), - )), - Arc::new(NestedField::optional( - 10, - "v_decimal", - Type::Primitive(PrimitiveType::Decimal { - precision: 36, - scale: 10, - }), - )), - Arc::new(NestedField::optional( - 11, - "v_ts_ntz", - Type::Primitive(PrimitiveType::Timestamp), - )), - Arc::new(NestedField::optional( - 12, - "v_ts_ns_ntz", - Type::Primitive(PrimitiveType::TimestampNs - ))) - ]) - .build() - .unwrap(), - partition_spec: PartitionSpec { - spec_id: 0, - fields: vec![ - PartitionField { - name: "v_int".to_string(), - transform: Transform::Identity, - source_id: 2, - field_id: 1000, - }, - PartitionField { - name: "v_long".to_string(), - transform: Transform::Identity, - source_id: 3, - field_id: 1001, - }, - ], - }, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema) + .with_spec_id(0).add_partition_field("v_int", "v_int", Transform::Identity).unwrap() + .add_partition_field("v_long", "v_long", Transform::Identity).unwrap().build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V2, }, @@ -1797,34 +1786,34 @@ mod tests { #[tokio::test] async fn test_parse_manifest_v1_unpartition() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Int), + )), + Arc::new(NestedField::optional( + 2, + "data", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 3, + "comment", + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); let manifest = Manifest { metadata: ManifestMetadata { schema_id: 1, - schema: Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - Arc::new(NestedField::optional( - 1, - "id", - Type::Primitive(PrimitiveType::Int), - )), - Arc::new(NestedField::optional( - 2, - "data", - Type::Primitive(PrimitiveType::String), - )), - Arc::new(NestedField::optional( - 3, - "comment", - Type::Primitive(PrimitiveType::String), - )), - ]) - .build() - .unwrap(), - partition_spec: PartitionSpec { - spec_id: 0, - fields: vec![], - }, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V1, }, @@ -1862,38 +1851,33 @@ mod tests { #[tokio::test] async fn test_parse_manifest_v1_partition() { + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "data", + Type::Primitive(PrimitiveType::String), + )), + Arc::new(NestedField::optional( + 3, + "category", + Type::Primitive(PrimitiveType::String), + )), + ]) + .build() + .unwrap(), + ); let manifest = Manifest { metadata: ManifestMetadata { schema_id: 0, - schema: Schema::builder() - .with_fields(vec![ - Arc::new(NestedField::optional( - 1, - "id", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 2, - "data", - Type::Primitive(PrimitiveType::String), - )), - Arc::new(NestedField::optional( - 3, - "category", - Type::Primitive(PrimitiveType::String), - )), - ]) - .build() - .unwrap(), - partition_spec: PartitionSpec { - spec_id: 0, - fields: vec![PartitionField { - name: "category".to_string(), - transform: Transform::Identity, - source_id: 3, - field_id: 1000, - }], - }, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema).add_partition_field("category", "category", Transform::Identity).unwrap().build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V1, }, @@ -1951,28 +1935,28 @@ mod tests { #[tokio::test] async fn test_parse_manifest_with_schema_evolution() { + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + ]) + .build() + .unwrap(), + ); let manifest = Manifest { metadata: ManifestMetadata { schema_id: 0, - schema: Schema::builder() - .with_fields(vec![ - Arc::new(NestedField::optional( - 1, - "id", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 2, - "v_int", - Type::Primitive(PrimitiveType::Int), - )), - ]) - .build() - .unwrap(), - partition_spec: PartitionSpec { - spec_id: 0, - fields: vec![], - }, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V2, }, @@ -2023,28 +2007,28 @@ mod tests { // Compared with original manifest, the lower_bounds and upper_bounds no longer has data for field 3, and // other parts should be same. + let schema = Arc::new( + Schema::builder() + .with_fields(vec![ + Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + )), + Arc::new(NestedField::optional( + 2, + "v_int", + Type::Primitive(PrimitiveType::Int), + )), + ]) + .build() + .unwrap(), + ); let expected_manifest = Manifest { metadata: ManifestMetadata { schema_id: 0, - schema: Schema::builder() - .with_fields(vec![ - Arc::new(NestedField::optional( - 1, - "id", - Type::Primitive(PrimitiveType::Long), - )), - Arc::new(NestedField::optional( - 2, - "v_int", - Type::Primitive(PrimitiveType::Int), - )), - ]) - .build() - .unwrap(), - partition_spec: PartitionSpec { - spec_id: 0, - fields: vec![], - }, + schema: schema.clone(), + partition_spec: PartitionSpec::builder(schema).with_spec_id(0).build().unwrap(), content: ManifestContentType::Data, format_version: FormatVersion::V2, }, diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 36763df7e..72e398768 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use super::transform::Transform; -use super::{NestedField, Schema, StructType}; +use super::{NestedField, Schema, SchemaRef, StructType}; use crate::{Error, ErrorKind, Result}; pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; @@ -54,22 +54,51 @@ impl PartitionField { } } -/// Partition spec that defines how to produce a tuple of partition values from a record. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)] -#[serde(rename_all = "kebab-case")] +/// Partition spec that defines how to produce a tuple of partition values from a record. +/// `PartitionSpec` is bound to a specific schema. +#[derive(Debug, PartialEq, Eq, Clone)] pub struct PartitionSpec { /// Identifier for PartitionSpec - pub(crate) spec_id: i32, + spec_id: i32, /// Details of the partition spec - pub(crate) fields: Vec, + fields: Vec, + /// The schema this partition spec is bound to + schema: SchemaRef, + /// Type of the partition spec + partition_type: StructType, +} + +/// Reference to [`SchemalessPartitionSpec`]. +pub type SchemalessPartitionSpecRef = Arc; +/// Partition spec that defines how to produce a tuple of partition values from a record. +/// Schemaless partition specs are never constructed manually. They occur when a table is mutated +/// and partition spec and schemas are updated. While old partition specs are retained, the bound +/// schema might not be available anymore as part of the table metadata. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct SchemalessPartitionSpec { + /// Identifier for PartitionSpec + spec_id: i32, + /// Details of the partition spec + fields: Vec, } impl PartitionSpec { /// Create partition spec builder - pub fn builder(schema: &Schema) -> PartitionSpecBuilder { + pub fn builder(schema: impl Into) -> PartitionSpecBuilder { PartitionSpecBuilder::new(schema) } + /// Get a new unpatitioned partition spec + pub fn unpartition_spec(schema: impl Into) -> Self { + Self { + spec_id: DEFAULT_PARTITION_SPEC_ID, + fields: vec![], + schema: schema.into(), + partition_type: StructType::new(vec![]), + } + } + /// Spec id of the partition spec pub fn spec_id(&self) -> i32 { self.spec_id @@ -80,45 +109,32 @@ impl PartitionSpec { &self.fields } + /// The schema this partition spec is bound to + pub fn schema(&self) -> &Schema { + &self.schema + } + + /// The schema ref this partition spec is bound to + pub fn schema_ref(&self) -> &SchemaRef { + &self.schema + } + /// Returns if the partition spec is unpartitioned. /// /// A [`PartitionSpec`] is unpartitioned if it has no fields or all fields are [`Transform::Void`] transform. pub fn is_unpartitioned(&self) -> bool { - self.fields.is_empty() - || self - .fields - .iter() - .all(|f| matches!(f.transform, Transform::Void)) - } - - /// Returns the partition type of this partition spec. - pub fn partition_type(&self, schema: &Schema) -> Result { - let mut fields = Vec::with_capacity(self.fields.len()); - for partition_field in &self.fields { - let field = schema - .field_by_id(partition_field.source_id) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!( - "No column with source column id {} in schema {:?}", - partition_field.source_id, schema - ), - ) - })?; - let res_type = partition_field.transform.result_type(&field.field_type)?; - let field = - NestedField::optional(partition_field.field_id, &partition_field.name, res_type) - .into(); - fields.push(field); - } - Ok(StructType::new(fields)) + >::is_unpartitioned(self) } /// Turn this partition spec into an unbound partition spec. /// /// The `field_id` is retained as `partition_id` in the unbound partition spec. pub fn into_unbound(self) -> UnboundPartitionSpec { + >::into_unbound(self) + } + + /// Turn this partition spec into a preserved partition spec. + pub fn into_schemaless(self) -> SchemalessPartitionSpec { self.into() } @@ -131,49 +147,59 @@ impl PartitionSpec { /// * Field names /// * Source column ids /// * Transforms - pub fn is_compatible_with(&self, other: &UnboundPartitionSpec) -> bool { - if self.fields.len() != other.fields.len() { - return false; - } - - for (this_field, other_field) in self.fields.iter().zip(&other.fields) { - if this_field.source_id != other_field.source_id - || this_field.transform != other_field.transform - || this_field.name != other_field.name - { - return false; - } - } - - true + pub fn is_compatible_with>( + &self, + other: &T, + ) -> bool { + >::is_compatible_with(self, other) } /// Check if this partition spec has sequential partition ids. /// Sequential ids start from 1000 and increment by 1 for each field. /// This is required for spec version 1 pub fn has_sequential_ids(&self) -> bool { - for (index, field) in self.fields.iter().enumerate() { - let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64) - .checked_add(1) - .and_then(|id| id.checked_add(index as i64)) - .unwrap_or(i64::MAX); - - if field.field_id as i64 != expected_id { - return false; - } - } - - true + ::has_sequential_ids(self) } /// Get the highest field id in the partition spec. /// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999). pub fn highest_field_id(&self) -> i32 { - self.fields - .iter() - .map(|f| f.field_id) - .max() - .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID) + ::highest_field_id(self) + } + + /// Returns the partition type of this partition spec. + pub fn partition_type(&self) -> &StructType { + &self.partition_type + } +} + +impl SchemalessPartitionSpec { + /// Fields of the partition spec + pub fn fields(&self) -> &[PartitionField] { + &self.fields + } + + /// Spec id of the partition spec + pub fn spec_id(&self) -> i32 { + self.spec_id + } + + /// Bind this schemaless partition spec to a schema. + pub fn bind(self, schema: impl Into) -> Result { + PartitionSpecBuilder::new_from_unbound(self.into_unbound(), schema)?.build() + } + + /// Get a new unpatitioned partition spec + pub fn unpartition_spec() -> Self { + Self { + spec_id: DEFAULT_PARTITION_SPEC_ID, + fields: vec![], + } + } + + /// Returns the partition type of this partition spec. + pub fn partition_type(&self, schema: &Schema) -> Result { + PartitionSpecBuilder::partition_type(&self.fields, schema) } } @@ -212,7 +238,7 @@ impl UnboundPartitionSpec { } /// Bind this unbound partition spec to a schema. - pub fn bind(self, schema: &Schema) -> Result { + pub fn bind(self, schema: impl Into) -> Result { PartitionSpecBuilder::new_from_unbound(self, schema)?.build() } @@ -235,6 +261,179 @@ impl UnboundPartitionSpec { } } +/// Trait for common functions between [`PartitionSpec`], [`UnboundPartitionSpec`] and [`PreservedPartitionSpec`] +pub trait UnboundPartitionSpecInterface { + /// Fields of the partition spec + fn fields(&self) -> &[T]; + + /// Turn this partition spec into an unbound partition spec. + fn into_unbound(self) -> UnboundPartitionSpec; + + /// Returns if the partition spec is unpartitioned. + /// + /// A spec is unpartitioned if it has no fields or all fields are [`Transform::Void`] transform. + fn is_unpartitioned(&self) -> bool { + self.fields().is_empty() + || self + .fields() + .iter() + .all(|f| matches!(f.transform(), Transform::Void)) + } + + /// Check if this partition spec is compatible with another partition spec. + /// + /// Returns true if the partition spec is equal to the other spec with partition field ids ignored and + /// spec_id ignored. The following must be identical: + /// * The number of fields + /// * Field order + /// * Field names + /// * Source column ids + /// * Transforms + fn is_compatible_with>( + &self, + other: &O, + ) -> bool { + if self.fields().len() != other.fields().len() { + return false; + } + + for (this_field, other_field) in self.fields().iter().zip(other.fields()) { + if this_field.source_id() != other_field.source_id() + || this_field.transform() != other_field.transform() + || this_field.name() != other_field.name() + { + return false; + } + } + + true + } +} + +/// Trait for common functions between [`PartitionSpec`] and [`PreservedPartitionSpec`] +pub trait PartitionSpecInterface: UnboundPartitionSpecInterface { + /// Spec id of the partition spec + fn spec_id(&self) -> i32; + + /// Check if this partition spec has sequential partition ids. + /// Sequential ids start from 1000 and increment by 1 for each field. + /// This is required for spec version 1 + fn has_sequential_ids(&self) -> bool { + for (index, field) in self.fields().iter().enumerate() { + let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64) + .checked_add(1) + .and_then(|id| id.checked_add(index as i64)) + .unwrap_or(i64::MAX); + + if field.field_id as i64 != expected_id { + return false; + } + } + + true + } + + /// Get the highest field id in the partition spec. + /// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999). + fn highest_field_id(&self) -> i32 { + self.fields() + .iter() + .map(|f| f.field_id) + .max() + .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID) + } +} + +impl PartitionSpecInterface for PartitionSpec { + fn spec_id(&self) -> i32 { + self.spec_id + } +} + +impl PartitionSpecInterface for SchemalessPartitionSpec { + fn spec_id(&self) -> i32 { + self.spec_id + } +} + +impl UnboundPartitionSpecInterface for PartitionSpec { + fn fields(&self) -> &[PartitionField] { + &self.fields + } + + fn into_unbound(self) -> UnboundPartitionSpec { + self.into() + } +} + +impl UnboundPartitionSpecInterface for UnboundPartitionSpec { + fn fields(&self) -> &[UnboundPartitionField] { + &self.fields + } + + fn into_unbound(self) -> UnboundPartitionSpec { + self + } +} + +impl UnboundPartitionSpecInterface for SchemalessPartitionSpec { + fn fields(&self) -> &[PartitionField] { + &self.fields + } + + fn into_unbound(self) -> UnboundPartitionSpec { + self.into() + } +} + +/// Trait for common functions between [`PartitionField`] and [`UnboundPartitionField`] +pub trait PartitionFieldInterface { + /// A source column id from the table’s schema + fn source_id(&self) -> i32; + /// A partition name. + fn name(&self) -> &str; + /// A transform that is applied to the source column to produce a partition value. + fn transform(&self) -> &Transform; + /// Convert to unbound partition field + fn into_unbound(self) -> UnboundPartitionField; +} + +impl PartitionFieldInterface for PartitionField { + fn source_id(&self) -> i32 { + self.source_id + } + + fn name(&self) -> &str { + &self.name + } + + fn transform(&self) -> &Transform { + &self.transform + } + + fn into_unbound(self) -> UnboundPartitionField { + self.into() + } +} + +impl PartitionFieldInterface for UnboundPartitionField { + fn source_id(&self) -> i32 { + self.source_id + } + + fn name(&self) -> &str { + &self.name + } + + fn transform(&self) -> &Transform { + &self.transform + } + + fn into_unbound(self) -> UnboundPartitionField { + self + } +} + impl From for UnboundPartitionField { fn from(field: PartitionField) -> Self { UnboundPartitionField { @@ -255,6 +454,24 @@ impl From for UnboundPartitionSpec { } } +impl From for UnboundPartitionSpec { + fn from(spec: SchemalessPartitionSpec) -> Self { + UnboundPartitionSpec { + spec_id: Some(spec.spec_id), + fields: spec.fields.into_iter().map(Into::into).collect(), + } + } +} + +impl From for SchemalessPartitionSpec { + fn from(spec: PartitionSpec) -> Self { + SchemalessPartitionSpec { + spec_id: spec.spec_id, + fields: spec.fields, + } + } +} + /// Create a new UnboundPartitionSpec #[derive(Debug, Default)] pub struct UnboundPartitionSpecBuilder { @@ -326,26 +543,29 @@ impl UnboundPartitionSpecBuilder { /// Create valid partition specs for a given schema. #[derive(Debug)] -pub struct PartitionSpecBuilder<'a> { +pub struct PartitionSpecBuilder { spec_id: Option, last_assigned_field_id: i32, fields: Vec, - schema: &'a Schema, + schema: SchemaRef, } -impl<'a> PartitionSpecBuilder<'a> { +impl PartitionSpecBuilder { /// Create a new partition spec builder with the given schema. - pub fn new(schema: &'a Schema) -> Self { + pub fn new(schema: impl Into) -> Self { Self { spec_id: None, fields: vec![], last_assigned_field_id: UNPARTITIONED_LAST_ASSIGNED_ID, - schema, + schema: schema.into(), } } /// Create a new partition spec builder from an existing unbound partition spec. - pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema) -> Result { + pub fn new_from_unbound( + unbound: UnboundPartitionSpec, + schema: impl Into, + ) -> Result { let mut builder = Self::new(schema).with_spec_id(unbound.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID)); @@ -408,8 +628,8 @@ impl<'a> PartitionSpecBuilder<'a> { pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result { self.check_name_set_and_unique(&field.name)?; self.check_for_redundant_partitions(field.source_id, &field.transform)?; - Self::check_name_does_not_collide_with_schema(&field, self.schema)?; - Self::check_transform_compatibility(&field, self.schema)?; + Self::check_name_does_not_collide_with_schema(&field, &self.schema)?; + Self::check_transform_compatibility(&field, &self.schema)?; if let Some(partition_field_id) = field.field_id { self.check_partition_id_unique(partition_field_id)?; } @@ -434,9 +654,12 @@ impl<'a> PartitionSpecBuilder<'a> { /// Build a bound partition spec with the given schema. pub fn build(self) -> Result { let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?; + let partition_type = Self::partition_type(&fields, &self.schema)?; Ok(PartitionSpec { spec_id: self.spec_id.unwrap_or(DEFAULT_PARTITION_SPEC_ID), fields, + partition_type, + schema: self.schema, }) } @@ -485,6 +708,32 @@ impl<'a> PartitionSpecBuilder<'a> { Ok(bound_fields) } + /// Returns the partition type of this partition spec. + fn partition_type(fields: &Vec, schema: &Schema) -> Result { + let mut struct_fields = Vec::with_capacity(fields.len()); + for partition_field in fields { + let field = schema + .field_by_id(partition_field.source_id) + .ok_or_else(|| { + Error::new( + // This should never occur as check_transform_compatibility + // already ensures that the source field exists in the schema + ErrorKind::Unexpected, + format!( + "No column with source column id {} in schema {:?}", + partition_field.source_id, schema + ), + ) + })?; + let res_type = partition_field.transform.result_type(&field.field_type)?; + let field = + NestedField::optional(partition_field.field_id, &partition_field.name, res_type) + .into(); + struct_fields.push(field); + } + Ok(StructType::new(struct_fields)) + } + /// Ensure that the partition name is unique among columns in the schema. /// Duplicate names are allowed if: /// 1. The column is sourced from the column with the same name. @@ -622,7 +871,7 @@ trait CorePartitionSpecValidator { fn fields(&self) -> &Vec; } -impl CorePartitionSpecValidator for PartitionSpecBuilder<'_> { +impl CorePartitionSpecValidator for PartitionSpecBuilder { fn fields(&self) -> &Vec { &self.fields } @@ -637,7 +886,7 @@ impl CorePartitionSpecValidator for UnboundPartitionSpecBuilder { #[cfg(test)] mod tests { use super::*; - use crate::spec::Type; + use crate::spec::{PrimitiveType, Type}; #[test] fn test_partition_spec() { @@ -663,7 +912,7 @@ mod tests { } "#; - let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap(); assert_eq!(4, partition_spec.fields[0].source_id); assert_eq!(1000, partition_spec.fields[0].field_id); assert_eq!("ts_day", partition_spec.fields[0].name); @@ -695,7 +944,7 @@ mod tests { ]) .build() .unwrap(); - let partition_spec = PartitionSpec::builder(&schema) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); @@ -704,7 +953,7 @@ mod tests { "Empty partition spec should be unpartitioned" ); - let partition_spec = PartitionSpec::builder(&schema) + let partition_spec = PartitionSpec::builder(schema.clone()) .add_unbound_fields(vec![ UnboundPartitionField::builder() .source_id(1) @@ -726,7 +975,7 @@ mod tests { "Partition spec with one non void transform should not be unpartitioned" ); - let partition_spec = PartitionSpec::builder(&schema) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![ UnboundPartitionField::builder() @@ -809,6 +1058,32 @@ mod tests { assert_eq!(Transform::Day, partition_spec.fields[0].transform); } + #[test] + fn test_new_unpartition() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .build() + .unwrap(); + let partition_type = partition_spec.partition_type(); + assert_eq!(0, partition_type.fields().len()); + + let unpartition_spec = PartitionSpec::unpartition_spec(schema); + assert_eq!(partition_spec, unpartition_spec); + } + #[test] fn test_partition_type() { let spec = r#" @@ -833,7 +1108,7 @@ mod tests { } "#; - let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap(); let schema = Schema::builder() .with_fields(vec![ NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) @@ -909,7 +1184,7 @@ mod tests { } "#; - let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap(); let schema = Schema::builder() .with_fields(vec![ NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) @@ -976,7 +1251,7 @@ mod tests { } "#; - let partition_spec: PartitionSpec = serde_json::from_str(spec).unwrap(); + let partition_spec: SchemalessPartitionSpec = serde_json::from_str(spec).unwrap(); let schema = Schema::builder() .with_fields(vec![ NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) @@ -994,6 +1269,50 @@ mod tests { assert!(partition_spec.partition_type(&schema).is_err()); } + #[test] + fn test_schemaless_bind_schema_keeps_field_ids_and_spec_id() { + let schema: Schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(99) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(1010), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1001), + name: "name_void".to_string(), + transform: Transform::Void, + }) + .unwrap() + .build() + .unwrap(); + + let schemaless_partition_spec = SchemalessPartitionSpec::from(partition_spec.clone()); + let bound_partition_spec = schemaless_partition_spec.bind(schema).unwrap(); + + assert_eq!(partition_spec, bound_partition_spec); + assert_eq!(partition_spec.fields[0].field_id, 1010); + assert_eq!(partition_spec.fields[1].field_id, 1001); + assert_eq!(bound_partition_spec.spec_id(), 99); + } + #[test] fn test_builder_disallow_duplicate_names() { UnboundPartitionSpec::builder() @@ -1018,7 +1337,7 @@ mod tests { ]) .build() .unwrap(); - PartitionSpec::builder(&schema) + PartitionSpec::builder(schema.clone()) .add_unbound_field(UnboundPartitionField { source_id: 1, field_id: Some(1000), @@ -1056,7 +1375,7 @@ mod tests { ]) .build() .unwrap(); - let spec = PartitionSpec::builder(&schema) + let spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1104,12 +1423,12 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder(&schema) + PartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); - let spec = PartitionSpec::builder(&schema) + let spec = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_partition_field("id", "id_bucket[16]", Transform::Bucket(16)) .unwrap() @@ -1118,12 +1437,19 @@ mod tests { assert_eq!(spec, PartitionSpec { spec_id: 1, + schema: schema.into(), fields: vec![PartitionField { source_id: 1, field_id: 1000, name: "id_bucket[16]".to_string(), transform: Transform::Bucket(16), - }] + }], + partition_type: StructType::new(vec![NestedField::optional( + 1000, + "id_bucket[16]", + Type::Primitive(PrimitiveType::Int) + ) + .into()]) }); } @@ -1139,12 +1465,12 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder(&schema) + PartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); - let err = PartitionSpec::builder(&schema) + let err = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1172,12 +1498,12 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder(&schema) + PartitionSpec::builder(schema.clone()) .with_spec_id(1) .build() .unwrap(); - PartitionSpec::builder(&schema) + PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1190,7 +1516,7 @@ mod tests { .unwrap(); // Not OK for different source id - PartitionSpec::builder(&schema) + PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 2, @@ -1224,7 +1550,7 @@ mod tests { .unwrap(); // Valid - PartitionSpec::builder(&schema) + PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_fields(vec![ UnboundPartitionField { @@ -1245,7 +1571,7 @@ mod tests { .unwrap(); // Invalid - PartitionSpec::builder(&schema) + PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_fields(vec![ UnboundPartitionField { @@ -1291,7 +1617,7 @@ mod tests { .build() .unwrap(); - PartitionSpec::builder(&schema) + PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1342,7 +1668,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = PartitionSpec::builder(&schema) + let partition_spec_1 = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1354,7 +1680,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = PartitionSpec::builder(&schema) + let partition_spec_2 = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1381,7 +1707,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = PartitionSpec::builder(&schema) + let partition_spec_1 = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1393,7 +1719,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = PartitionSpec::builder(&schema) + let partition_spec_2 = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1424,7 +1750,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = PartitionSpec::builder(&schema) + let partition_spec_1 = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1436,7 +1762,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = PartitionSpec::builder(&schema) + let partition_spec_2 = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 2, @@ -1467,7 +1793,7 @@ mod tests { .build() .unwrap(); - let partition_spec_1 = PartitionSpec::builder(&schema) + let partition_spec_1 = PartitionSpec::builder(schema.clone()) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1486,7 +1812,7 @@ mod tests { .build() .unwrap(); - let partition_spec_2 = PartitionSpec::builder(&schema) + let partition_spec_2 = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 2, @@ -1510,7 +1836,7 @@ mod tests { #[test] fn test_highest_field_id_unpartitioned() { - let spec = PartitionSpec::builder(&Schema::builder().with_fields(vec![]).build().unwrap()) + let spec = PartitionSpec::builder(Schema::builder().with_fields(vec![]).build().unwrap()) .with_spec_id(1) .build() .unwrap(); @@ -1534,7 +1860,7 @@ mod tests { .build() .unwrap(); - let spec = PartitionSpec::builder(&schema) + let spec = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1572,7 +1898,7 @@ mod tests { .build() .unwrap(); - let spec = PartitionSpec::builder(&schema) + let spec = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1612,7 +1938,7 @@ mod tests { .build() .unwrap(); - let spec = PartitionSpec::builder(&schema) + let spec = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, @@ -1652,7 +1978,7 @@ mod tests { .build() .unwrap(); - let spec = PartitionSpec::builder(&schema) + let spec = PartitionSpec::builder(schema) .with_spec_id(1) .add_unbound_field(UnboundPartitionField { source_id: 1, diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index cde709375..7707b13b1 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -31,8 +31,8 @@ use uuid::Uuid; use super::snapshot::SnapshotReference; use super::{ - PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, Snapshot, SnapshotRef, SnapshotRetention, - SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID, + PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SchemalessPartitionSpecRef, Snapshot, + SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID, }; use crate::error::{timestamp_ms_to_utc, Result}; use crate::{Error, ErrorKind, TableCreation}; @@ -118,9 +118,9 @@ pub struct TableMetadata { /// ID of the table’s current schema. pub(crate) current_schema_id: i32, /// A list of partition specs, stored as full partition spec objects. - pub(crate) partition_specs: HashMap, + pub(crate) partition_specs: HashMap, /// ID of the “current” spec that writers should use by default. - pub(crate) default_spec_id: i32, + pub(crate) default_spec: PartitionSpecRef, /// An integer; the highest assigned partition field ID across all partition specs for the table. pub(crate) last_partition_id: i32, ///A string to string map of table properties. This is used to control settings that @@ -222,21 +222,26 @@ impl TableMetadata { /// Returns all partition specs. #[inline] - pub fn partition_specs_iter(&self) -> impl Iterator { + pub fn partition_specs_iter(&self) -> impl Iterator { self.partition_specs.values() } /// Lookup partition spec by id. #[inline] - pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&PartitionSpecRef> { + pub fn partition_spec_by_id(&self, spec_id: i32) -> Option<&SchemalessPartitionSpecRef> { self.partition_specs.get(&spec_id) } /// Get default partition spec #[inline] pub fn default_partition_spec(&self) -> &PartitionSpecRef { - self.partition_spec_by_id(self.default_spec_id) - .expect("Default partition spec id set, but not found in table metadata") + &self.default_spec + } + + #[inline] + /// Returns spec id of the "current" partition spec. + pub fn default_partition_spec_id(&self) -> i32 { + self.default_spec.spec_id() } /// Returns all snapshots @@ -352,29 +357,18 @@ impl TableMetadata { Ok(self) } - /// If the default partition spec is specified but the spec is not present in specs, add it + /// If the default partition spec is not present in specs, add it fn try_normalize_partition_spec(&mut self) -> Result<()> { - if self.partition_spec_by_id(self.default_spec_id).is_some() { - return Ok(()); - } - - if self.default_spec_id != DEFAULT_PARTITION_SPEC_ID { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "No partition spec exists with the default spec id {}.", - self.default_spec_id - ), - )); + if self + .partition_spec_by_id(self.default_spec.spec_id()) + .is_none() + { + self.partition_specs.insert( + self.default_spec.spec_id(), + Arc::new(Arc::unwrap_or_clone(self.default_spec.clone()).into_schemaless()), + ); } - let partition_spec = PartitionSpec { - spec_id: DEFAULT_PARTITION_SPEC_ID, - fields: vec![], - }; - self.partition_specs - .insert(DEFAULT_PARTITION_SPEC_ID, Arc::new(partition_spec)); - Ok(()) } @@ -565,6 +559,8 @@ impl TableMetadataBuilder { properties, } = table_creation; + let schema: Arc = Arc::new(schema); + let unpartition_spec = PartitionSpec::unpartition_spec(schema.clone()); let partition_specs = match partition_spec { Some(_) => { return Err(Error::new( @@ -573,11 +569,8 @@ impl TableMetadataBuilder { )) } None => HashMap::from([( - DEFAULT_PARTITION_SPEC_ID, - Arc::new(PartitionSpec { - spec_id: DEFAULT_PARTITION_SPEC_ID, - fields: vec![], - }), + unpartition_spec.spec_id(), + Arc::new(unpartition_spec.clone().into_schemaless()), )]), }; @@ -607,9 +600,9 @@ impl TableMetadataBuilder { last_updated_ms: Utc::now().timestamp_millis(), last_column_id: schema.highest_field_id(), current_schema_id: schema.schema_id(), - schemas: HashMap::from([(schema.schema_id(), Arc::new(schema))]), + schemas: HashMap::from([(schema.schema_id(), schema)]), partition_specs, - default_spec_id: DEFAULT_PARTITION_SPEC_ID, + default_spec: PartitionSpecRef::new(unpartition_spec), last_partition_id: 0, properties, current_snapshot_id: None, @@ -661,8 +654,8 @@ pub(super) mod _serde { use crate::spec::schema::_serde::{SchemaV1, SchemaV2}; use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2}; use crate::spec::{ - PartitionField, PartitionSpec, Schema, Snapshot, SnapshotReference, SnapshotRetention, - SortOrder, + PartitionField, PartitionSpec, Schema, SchemaRef, SchemalessPartitionSpec, Snapshot, + SnapshotReference, SnapshotRetention, SortOrder, }; use crate::{Error, ErrorKind}; @@ -685,7 +678,7 @@ pub(super) mod _serde { pub last_column_id: i32, pub schemas: Vec, pub current_schema_id: i32, - pub partition_specs: Vec, + pub partition_specs: Vec, pub default_spec_id: i32, pub last_partition_id: i32, #[serde(skip_serializing_if = "Option::is_none")] @@ -721,7 +714,7 @@ pub(super) mod _serde { pub current_schema_id: Option, pub partition_spec: Vec, #[serde(skip_serializing_if = "Option::is_none")] - pub partition_specs: Option>, + pub partition_specs: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub default_spec_id: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -809,6 +802,44 @@ pub(super) mod _serde { .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?)))) .collect::, Error>>()?, ); + + let current_schema: &SchemaRef = + schemas.get(&value.current_schema_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "No schema exists with the current schema id {}.", + value.current_schema_id + ), + ) + })?; + let partition_specs = HashMap::from_iter( + value + .partition_specs + .into_iter() + .map(|x| (x.spec_id(), Arc::new(x))), + ); + let default_spec_id = value.default_spec_id; + let default_spec = partition_specs + .get(&value.default_spec_id) + .map(|schemaless_spec| { + (*schemaless_spec.clone()) + .clone() + .bind(current_schema.clone()) + }) + .transpose()? + .or_else(|| { + (DEFAULT_PARTITION_SPEC_ID == default_spec_id) + .then(|| PartitionSpec::unpartition_spec(current_schema.clone())) + }) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Default partition spec {default_spec_id} not found"), + ) + })? + .into(); + let mut metadata = TableMetadata { format_version: FormatVersion::V2, table_uuid: value.table_uuid, @@ -818,13 +849,8 @@ pub(super) mod _serde { last_column_id: value.last_column_id, current_schema_id: value.current_schema_id, schemas, - partition_specs: HashMap::from_iter( - value - .partition_specs - .into_iter() - .map(|x| (x.spec_id(), Arc::new(x))), - ), - default_spec_id: value.default_spec_id, + partition_specs, + default_spec, last_partition_id: value.last_partition_id, properties: value.properties.unwrap_or_default(), current_snapshot_id, @@ -876,6 +902,7 @@ pub(super) mod _serde { } else { value.current_snapshot_id }; + let schemas = value .schemas .map(|schemas| { @@ -900,18 +927,49 @@ pub(super) mod _serde { }) .transpose()? .unwrap_or_default(); - let partition_specs = HashMap::from_iter( - value - .partition_specs - .unwrap_or_else(|| { - vec![PartitionSpec { - spec_id: DEFAULT_PARTITION_SPEC_ID, - fields: value.partition_spec, - }] - }) - .into_iter() - .map(|x| (x.spec_id(), Arc::new(x))), - ); + let current_schema_id = value + .current_schema_id + .unwrap_or_else(|| schemas.keys().copied().max().unwrap_or_default()); + let current_schema = schemas + .get(¤t_schema_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "No schema exists with the current schema id {}.", + current_schema_id + ), + ) + })? + .clone(); + + let partition_specs = match value.partition_specs { + Some(partition_specs) => partition_specs, + None => vec![PartitionSpec::builder(current_schema.clone()) + .with_spec_id(DEFAULT_PARTITION_SPEC_ID) + .add_unbound_fields(value.partition_spec.into_iter().map(|f| f.into_unbound()))? + .build()? + .into_schemaless()], + } + .into_iter() + .map(|x| (x.spec_id(), Arc::new(x))) + .collect::>(); + + let default_spec_id = value + .default_spec_id + .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()); + let default_spec = partition_specs + .get(&default_spec_id) + .map(|x| Arc::unwrap_or_clone(x.clone()).bind(current_schema.clone())) + .transpose()? + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Default partition spec {default_spec_id} not found"), + ) + })? + .into(); + let mut metadata = TableMetadata { format_version: FormatVersion::V1, table_uuid: value.table_uuid.unwrap_or_default(), @@ -919,12 +977,8 @@ pub(super) mod _serde { last_sequence_number: 0, last_updated_ms: value.last_updated_ms, last_column_id: value.last_column_id, - current_schema_id: value - .current_schema_id - .unwrap_or_else(|| schemas.keys().copied().max().unwrap_or_default()), - default_spec_id: value - .default_spec_id - .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()), + current_schema_id, + default_spec, last_partition_id: value .last_partition_id .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()), @@ -998,7 +1052,7 @@ pub(super) mod _serde { .into_values() .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone())) .collect(), - default_spec_id: v.default_spec_id, + default_spec_id: v.default_spec.spec_id(), last_partition_id: v.last_partition_id, properties: Some(v.properties), current_snapshot_id: v.current_snapshot_id.or(Some(-1)), @@ -1067,18 +1121,14 @@ pub(super) mod _serde { .collect(), ), current_schema_id: Some(v.current_schema_id), - partition_spec: v - .partition_specs - .get(&v.default_spec_id) - .map(|x| x.fields().to_vec()) - .unwrap_or_default(), + partition_spec: v.default_spec.fields().to_vec(), partition_specs: Some( v.partition_specs .into_values() .map(|x| Arc::try_unwrap(x).unwrap_or_else(|s| s.as_ref().clone())) .collect(), ), - default_spec_id: Some(v.default_spec_id), + default_spec_id: Some(v.default_spec.spec_id()), last_partition_id: Some(v.last_partition_id), properties: if v.properties.is_empty() { None @@ -1195,9 +1245,9 @@ mod tests { use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder}; use crate::spec::table_metadata::TableMetadata; use crate::spec::{ - NestedField, NullOrder, Operation, PartitionField, PartitionSpec, PrimitiveType, Schema, - Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, - Summary, Transform, Type, UnboundPartitionField, + NestedField, NullOrder, Operation, PartitionSpec, PrimitiveType, Schema, Snapshot, + SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, + Transform, Type, UnboundPartitionField, }; use crate::TableCreation; @@ -1238,6 +1288,12 @@ mod tests { "name": "struct_name", "required": true, "type": "fixed[1]" + }, + { + "id": 4, + "name": "ts", + "required": true, + "type": "timestamp" } ] } @@ -1279,23 +1335,32 @@ mod tests { let schema = Schema::builder() .with_schema_id(1) - .with_fields(vec![Arc::new(NestedField::required( - 1, - "struct_name", - Type::Primitive(PrimitiveType::Fixed(1)), - ))]) + .with_fields(vec![ + Arc::new(NestedField::required( + 1, + "struct_name", + Type::Primitive(PrimitiveType::Fixed(1)), + )), + Arc::new(NestedField::required( + 4, + "ts", + Type::Primitive(PrimitiveType::Timestamp), + )), + ]) .build() .unwrap(); - let partition_spec = PartitionSpec { - spec_id: 0, - fields: vec![PartitionField { + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .add_unbound_field(UnboundPartitionField { name: "ts_day".to_string(), transform: Transform::Day, source_id: 4, - field_id: 1000, - }], - }; + field_id: Some(1000), + }) + .unwrap() + .build() + .unwrap(); let expected = TableMetadata { format_version: FormatVersion::V2, @@ -1305,8 +1370,11 @@ mod tests { last_column_id: 1, schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]), current_schema_id: 1, - partition_specs: HashMap::from_iter(vec![(0, partition_spec.into())]), - default_spec_id: 0, + partition_specs: HashMap::from_iter(vec![( + 0, + partition_spec.clone().into_schemaless().into(), + )]), + default_spec: partition_spec.into(), last_partition_id: 1000, default_sort_order_id: 0, sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]), @@ -1445,7 +1513,8 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder(&schema) + let schema = Arc::new(schema); + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(0) .add_partition_field("vendor_id", "vendor_id", Transform::Identity) .unwrap() @@ -1472,10 +1541,10 @@ mod tests { location: "/home/iceberg/warehouse/nyc/taxis".to_string(), last_updated_ms: 1662532818843, last_column_id: 5, - schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]), + schemas: HashMap::from_iter(vec![(0, schema)]), current_schema_id: 0, - partition_specs: HashMap::from_iter(vec![(0, partition_spec.into())]), - default_spec_id: 0, + partition_specs: HashMap::from_iter(vec![(0, partition_spec.clone().into_schemaless().into())]), + default_spec: Arc::new(partition_spec), last_partition_id: 1000, default_sort_order_id: 0, sort_orders: HashMap::from_iter(vec![(0, sort_order.into())]), @@ -1514,6 +1583,12 @@ mod tests { "name": "struct_name", "required": true, "type": "fixed[1]" + }, + { + "id": 4, + "name": "ts", + "required": true, + "type": "timestamp" } ] } @@ -1614,6 +1689,12 @@ mod tests { "name": "struct_name", "required": true, "type": "fixed[1]" + }, + { + "id": 4, + "name": "ts", + "required": true, + "type": "timestamp" } ] } @@ -1699,6 +1780,12 @@ mod tests { "name": "struct_name", "required": true, "type": "fixed[1]" + }, + { + "id": 4, + "name": "ts", + "required": true, + "type": "timestamp" } ] } @@ -1828,7 +1915,7 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder(&schema1) + let partition_spec = PartitionSpec::builder(schema2.clone()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "x".to_string(), @@ -1889,8 +1976,11 @@ mod tests { last_column_id: 3, schemas: HashMap::from_iter(vec![(0, Arc::new(schema1)), (1, Arc::new(schema2))]), current_schema_id: 1, - partition_specs: HashMap::from_iter(vec![(0, partition_spec.into())]), - default_spec_id: 0, + partition_specs: HashMap::from_iter(vec![( + 0, + partition_spec.clone().into_schemaless().into(), + )]), + default_spec: Arc::new(partition_spec), last_partition_id: 1000, default_sort_order_id: 3, sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]), @@ -1951,7 +2041,7 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder(&schema) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "x".to_string(), @@ -1988,8 +2078,11 @@ mod tests { last_column_id: 3, schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]), current_schema_id: 0, - partition_specs: HashMap::from_iter(vec![(0, partition_spec.into())]), - default_spec_id: 0, + partition_specs: HashMap::from_iter(vec![( + 0, + partition_spec.clone().into_schemaless().into(), + )]), + default_spec: Arc::new(partition_spec), last_partition_id: 1000, default_sort_order_id: 3, sort_orders: HashMap::from_iter(vec![(3, sort_order.into())]), @@ -2031,7 +2124,7 @@ mod tests { .build() .unwrap(); - let partition_spec = PartitionSpec::builder(&schema) + let partition_spec = PartitionSpec::builder(schema.clone()) .with_spec_id(0) .add_unbound_field(UnboundPartitionField { name: "x".to_string(), @@ -2051,8 +2144,11 @@ mod tests { last_column_id: 3, schemas: HashMap::from_iter(vec![(0, Arc::new(schema))]), current_schema_id: 0, - partition_specs: HashMap::from_iter(vec![(0, partition_spec.into())]), - default_spec_id: 0, + partition_specs: HashMap::from_iter(vec![( + 0, + partition_spec.clone().into_schemaless().into(), + )]), + default_spec: Arc::new(partition_spec), last_partition_id: 0, default_sort_order_id: 0, // Sort order is added during deserialization for V2 compatibility @@ -2165,16 +2261,22 @@ mod tests { fn test_default_partition_spec() { let default_spec_id = 1234; let mut table_meta_data = get_test_table_metadata("TableMetadataV2Valid.json"); - table_meta_data.default_spec_id = default_spec_id; + let partition_spec = + PartitionSpec::unpartition_spec(table_meta_data.current_schema().clone()); + table_meta_data.default_spec = partition_spec.clone().into(); table_meta_data .partition_specs - .insert(default_spec_id, Arc::new(PartitionSpec::default())); + .insert(default_spec_id, Arc::new(partition_spec.into_schemaless())); assert_eq!( - table_meta_data.default_partition_spec(), - table_meta_data + (*table_meta_data.default_partition_spec().clone()) + .clone() + .into_schemaless(), + (*table_meta_data .partition_spec_by_id(default_spec_id) .unwrap() + .clone()) + .clone() ); } #[test] @@ -2225,10 +2327,11 @@ mod tests { HashMap::from([( 0, Arc::new( - PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap()) + PartitionSpec::builder(table_metadata.schemas.get(&0).unwrap().clone()) .with_spec_id(0) .build() .unwrap() + .into_schemaless() ) )]) ); diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs b/crates/iceberg/src/writer/file_writer/location_generator.rs index 44326190d..1d5dedda1 100644 --- a/crates/iceberg/src/writer/file_writer/location_generator.rs +++ b/crates/iceberg/src/writer/file_writer/location_generator.rs @@ -132,7 +132,7 @@ pub(crate) mod test { use uuid::Uuid; use super::LocationGenerator; - use crate::spec::{FormatVersion, TableMetadata}; + use crate::spec::{FormatVersion, PartitionSpec, TableMetadata}; use crate::writer::file_writer::location_generator::{ FileNameGenerator, WRITE_DATA_LOCATION, WRITE_FOLDER_STORAGE_LOCATION, }; @@ -156,6 +156,7 @@ pub(crate) mod test { #[test] fn test_default_location_generate() { + let schema = crate::spec::Schema::builder().build().unwrap(); let mut table_metadata = TableMetadata { format_version: FormatVersion::V2, table_uuid: Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(), @@ -165,7 +166,7 @@ pub(crate) mod test { schemas: HashMap::new(), current_schema_id: 1, partition_specs: HashMap::new(), - default_spec_id: 1, + default_spec: PartitionSpec::unpartition_spec(schema).into(), last_partition_id: 1000, default_sort_order_id: 0, sort_orders: HashMap::from_iter(vec![]),