From 58123990c304b72139651a16f45f6505bc868cc3 Mon Sep 17 00:00:00 2001 From: Christian Date: Sun, 8 Sep 2024 18:18:41 +0200 Subject: [PATCH] feat: partition compatibility (#612) * Partition compatability * Partition compatability * Rename compatible_with -> is_compatible_with --- crates/iceberg/src/spec/partition.rs | 414 ++++++++++++++++++++++++++- 1 file changed, 413 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index f262a8c7c..36763df7e 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -118,9 +118,63 @@ impl PartitionSpec { /// Turn this partition spec into an unbound partition spec. /// /// The `field_id` is retained as `partition_id` in the unbound partition spec. - pub fn to_unbound(self) -> UnboundPartitionSpec { + pub fn into_unbound(self) -> UnboundPartitionSpec { self.into() } + + /// Check if this partition spec is compatible with another partition spec. + /// + /// Returns true if the partition spec is equal to the other spec with partition field ids ignored and + /// spec_id ignored. The following must be identical: + /// * The number of fields + /// * Field order + /// * Field names + /// * Source column ids + /// * Transforms + pub fn is_compatible_with(&self, other: &UnboundPartitionSpec) -> bool { + if self.fields.len() != other.fields.len() { + return false; + } + + for (this_field, other_field) in self.fields.iter().zip(&other.fields) { + if this_field.source_id != other_field.source_id + || this_field.transform != other_field.transform + || this_field.name != other_field.name + { + return false; + } + } + + true + } + + /// Check if this partition spec has sequential partition ids. + /// Sequential ids start from 1000 and increment by 1 for each field. + /// This is required for spec version 1 + pub fn has_sequential_ids(&self) -> bool { + for (index, field) in self.fields.iter().enumerate() { + let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64) + .checked_add(1) + .and_then(|id| id.checked_add(index as i64)) + .unwrap_or(i64::MAX); + + if field.field_id as i64 != expected_id { + return false; + } + } + + true + } + + /// Get the highest field id in the partition spec. + /// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999). + pub fn highest_field_id(&self) -> i32 { + self.fields + .iter() + .map(|f| f.field_id) + .max() + .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID) + } } /// Reference to [`UnboundPartitionSpec`]. @@ -171,6 +225,14 @@ impl UnboundPartitionSpec { pub fn fields(&self) -> &[UnboundPartitionField] { &self.fields } + + /// Change the spec id of the partition spec + pub fn with_spec_id(self, spec_id: i32) -> Self { + Self { + spec_id: Some(spec_id), + ..self + } + } } impl From for UnboundPartitionField { @@ -1263,4 +1325,354 @@ mod tests { }] }); } + + #[test] + fn test_is_compatible_with() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + assert!(partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_not_compatible_with_transform_different() { + let schema = Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "id", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(32), + }) + .unwrap() + .build() + .unwrap(); + + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_not_compatible_with_source_id_different() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_not_compatible_with_order_different() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_highest_field_id_unpartitioned() { + let spec = PartitionSpec::builder(&Schema::builder().with_fields(vec![]).build().unwrap()) + .with_spec_id(1) + .build() + .unwrap(); + + assert_eq!(UNPARTITIONED_LAST_ASSIGNED_ID, spec.highest_field_id()); + } + + #[test] + fn test_highest_field_id() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(1001), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1000), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(1001, spec.highest_field_id()); + } + + #[test] + fn test_has_sequential_ids() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(1000), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1001), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(1000, spec.fields[0].field_id); + assert_eq!(1001, spec.fields[1].field_id); + assert!(spec.has_sequential_ids()); + } + + #[test] + fn test_sequential_ids_must_start_at_1000() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(999), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1000), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(999, spec.fields[0].field_id); + assert_eq!(1000, spec.fields[1].field_id); + assert!(!spec.has_sequential_ids()); + } + + #[test] + fn test_sequential_ids_must_have_no_gaps() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(1000), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1002), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(1000, spec.fields[0].field_id); + assert_eq!(1002, spec.fields[1].field_id); + assert!(!spec.has_sequential_ids()); + } }