From 4193131a79c7b2a5add64019cc448421f412f89d Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Fri, 30 Aug 2024 12:48:11 +0200 Subject: [PATCH] Implement TableMetadataBuilder --- crates/catalog/glue/src/catalog.rs | 4 +- crates/catalog/glue/src/schema.rs | 4 +- crates/catalog/glue/src/utils.rs | 4 +- crates/catalog/hms/src/catalog.rs | 4 +- crates/catalog/memory/src/catalog.rs | 6 +- .../catalog/rest/tests/rest_catalog_test.rs | 8 +- crates/iceberg/src/catalog/mod.rs | 53 +- crates/iceberg/src/error.rs | 6 + crates/iceberg/src/io/object_cache.rs | 2 +- crates/iceberg/src/scan.rs | 2 +- crates/iceberg/src/spec/datatypes.rs | 6 + crates/iceberg/src/spec/mod.rs | 1 + crates/iceberg/src/spec/partition.rs | 67 +- crates/iceberg/src/spec/schema.rs | 330 ++- crates/iceberg/src/spec/snapshot.rs | 28 +- crates/iceberg/src/spec/sort.rs | 53 +- crates/iceberg/src/spec/table_metadata.rs | 800 ++++++-- .../src/spec/table_metadata_builder.rs | 1796 +++++++++++++++++ crates/iceberg/src/transaction.rs | 11 +- 19 files changed, 2950 insertions(+), 235 deletions(-) create mode 100644 crates/iceberg/src/spec/table_metadata_builder.rs diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 18e30f3d0..ad216a3a7 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -355,7 +355,9 @@ impl Catalog for GlueCatalog { } }; - let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?; + let metadata = TableMetadataBuilder::from_table_creation(creation)? + .build()? + .metadata; let metadata_location = create_metadata_location(&location, 0)?; self.file_io diff --git a/crates/catalog/glue/src/schema.rs b/crates/catalog/glue/src/schema.rs index bb676e36e..1b490d13d 100644 --- a/crates/catalog/glue/src/schema.rs +++ b/crates/catalog/glue/src/schema.rs @@ -198,7 +198,9 @@ mod tests { .location("my_location".to_string()) .schema(schema) .build(); - let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + let metadata = TableMetadataBuilder::from_table_creation(table_creation)? + .build()? + .metadata; Ok(metadata) } diff --git a/crates/catalog/glue/src/utils.rs b/crates/catalog/glue/src/utils.rs index a99fb19c7..c43af500b 100644 --- a/crates/catalog/glue/src/utils.rs +++ b/crates/catalog/glue/src/utils.rs @@ -299,7 +299,9 @@ mod tests { .location("my_location".to_string()) .schema(schema) .build(); - let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + let metadata = TableMetadataBuilder::from_table_creation(table_creation)? + .build()? + .metadata; Ok(metadata) } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 6e5db1968..e19cf4ce5 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -346,7 +346,9 @@ impl Catalog for HmsCatalog { } }; - let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?; + let metadata = TableMetadataBuilder::from_table_creation(creation)? + .build()? + .metadata; let metadata_location = create_metadata_location(&location, 0)?; self.file_io diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 05038cb66..aaf6f2e9a 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -194,7 +194,9 @@ impl Catalog for MemoryCatalog { } }; - let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + let metadata = TableMetadataBuilder::from_table_creation(table_creation)? + .build()? + .metadata; let metadata_location = format!( "{}/metadata/{}-{}.metadata.json", &location, @@ -371,7 +373,7 @@ mod tests { let expected_sorted_order = SortOrder::builder() .with_order_id(0) .with_fields(vec![]) - .build(expected_schema.clone()) + .build(expected_schema) .unwrap(); assert_eq!( diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index e98890a86..eee58fc19 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -293,12 +293,8 @@ async fn test_create_table() { assert_eq!(table.metadata().format_version(), FormatVersion::V2); assert!(table.metadata().current_snapshot().is_none()); assert!(table.metadata().history().is_empty()); - assert!(table.metadata().default_sort_order().unwrap().is_unsorted()); - assert!(table - .metadata() - .default_partition_spec() - .unwrap() - .is_unpartitioned()); + assert!(table.metadata().default_sort_order().is_unsorted()); + assert!(table.metadata().default_partition_spec().is_unpartitioned()); } #[tokio::test] diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index aa2311b4a..877db74c8 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -433,8 +433,46 @@ impl TableUpdate { /// Applies the update to the table metadata builder. pub fn apply(self, builder: TableMetadataBuilder) -> Result { match self { - TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid), - _ => unimplemented!(), + TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)), + TableUpdate::AddSchema { + schema, + last_column_id, + } => { + if let Some(last_column_id) = last_column_id { + if builder.last_column_id() < last_column_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid last column ID: {last_column_id} < {} (previous last column ID)", + builder.last_column_id() + ), + )); + } + }; + Ok(builder.add_schema(schema)) + } + TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id), + TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec), + TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id), + TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order), + TableUpdate::SetDefaultSortOrder { sort_order_id } => { + builder.set_default_sort_order(sort_order_id) + } + TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot), + TableUpdate::SetSnapshotRef { + ref_name, + reference, + } => builder.set_ref(&ref_name, reference), + TableUpdate::RemoveSnapshots { snapshot_ids } => { + Ok(builder.remove_snapshots(&snapshot_ids)) + } + TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)), + TableUpdate::SetLocation { location } => Ok(builder.set_location(location)), + TableUpdate::SetProperties { updates } => builder.set_properties(updates), + TableUpdate::RemoveProperties { removals } => Ok(builder.remove_properties(&removals)), + TableUpdate::UpgradeFormatVersion { format_version } => { + builder.upgrade_format_version(format_version) + } } } } @@ -1102,8 +1140,12 @@ mod tests { let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) .unwrap() .build() - .unwrap(); - let table_metadata_builder = TableMetadataBuilder::new(table_metadata); + .unwrap() + .metadata; + let table_metadata_builder = TableMetadataBuilder::new_from_metadata( + table_metadata, + "s3://db/table/metadata/metadata1.gz.json", + ); let uuid = uuid::Uuid::new_v4(); let update = TableUpdate::AssignUuid { uuid }; @@ -1111,7 +1153,8 @@ mod tests { .apply(table_metadata_builder) .unwrap() .build() - .unwrap(); + .unwrap() + .metadata; assert_eq!(updated_metadata.uuid(), uuid); } } diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 2b69b4706..d4910a05f 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -44,6 +44,11 @@ pub enum ErrorKind { /// /// This error is returned when given iceberg feature is not supported. FeatureUnsupported, + /// Validation failed. + /// + /// This error is returned when Table or View Metadata is manipulated + /// in forbidden ways that would produce corrupt metadata. + ValidationFailed, } impl ErrorKind { @@ -59,6 +64,7 @@ impl From for &'static str { ErrorKind::Unexpected => "Unexpected", ErrorKind::DataInvalid => "DataInvalid", ErrorKind::FeatureUnsupported => "FeatureUnsupported", + ErrorKind::ValidationFailed => "ValidationFailed", } } } diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 3b89a4e6a..731072a5a 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -252,7 +252,7 @@ mod tests { async fn setup_manifest_files(&mut self) { let current_snapshot = self.table.metadata().current_snapshot().unwrap(); let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); - let current_partition_spec = self.table.metadata().default_partition_spec().unwrap(); + let current_partition_spec = self.table.metadata().default_partition_spec(); // Write data files let data_file_manifest = ManifestWriter::new( diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index 45d7d4fd1..67805766e 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -979,7 +979,7 @@ mod tests { .parent_snapshot(self.table.metadata()) .unwrap(); let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); - let current_partition_spec = self.table.metadata().default_partition_spec().unwrap(); + let current_partition_spec = self.table.metadata().default_partition_spec(); // Write data files let data_file_manifest = ManifestWriter::new( diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index d38245960..bce10ad5f 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -668,6 +668,12 @@ impl NestedField { self.write_default = Some(value); self } + + /// Set the id of the field. + pub(crate) fn with_id(mut self, id: i32) -> Self { + self.id = id; + self + } } impl fmt::Display for NestedField { diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 793f00d34..9b91d5443 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -25,6 +25,7 @@ mod schema; mod snapshot; mod sort; mod table_metadata; +mod table_metadata_builder; mod transform; mod values; mod view_metadata; diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index 91aaf4951..a889fba8a 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -118,9 +118,63 @@ impl PartitionSpec { /// Turn this partition spec into an unbound partition spec. /// /// The `field_id` is retained as `partition_id` in the unbound partition spec. - pub fn to_unbound(self) -> UnboundPartitionSpec { + pub fn into_unbound(self) -> UnboundPartitionSpec { self.into() } + + /// Check if this partition spec is compatible with another partition spec. + /// + /// Returns true if the partition spec is equal to the other spec with partition field ids ignored and + /// spec_id ignored. The following must be identical: + /// * The number of fields + /// * Field order + /// * Field names + /// * Source column ids + /// * Transforms + pub fn compatible_with(&self, other: &UnboundPartitionSpec) -> bool { + if self.fields.len() != other.fields.len() { + return false; + } + + for (this_field, other_field) in self.fields.iter().zip(&other.fields) { + if this_field.source_id != other_field.source_id + || this_field.transform != other_field.transform + || this_field.name != other_field.name + { + return false; + } + } + + true + } + + /// Check if this partition spec has sequential partition ids. + /// Sequential ids start from 1000 and increment by 1 for each field. + /// This is required for spec version 1 + pub fn has_sequential_ids(&self) -> bool { + for (index, field) in self.fields.iter().enumerate() { + let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64) + .checked_add(1) + .and_then(|id| id.checked_add(index as i64)) + .unwrap_or(i64::MAX); + + if field.field_id as i64 != expected_id { + return false; + } + } + + true + } + + /// Get the highest field id in the partition spec. + /// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999). + pub fn highest_field_id(&self) -> i32 { + self.fields + .iter() + .map(|f| f.field_id) + .max() + .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID) + } } /// Reference to [`UnboundPartitionSpec`]. @@ -171,6 +225,14 @@ impl UnboundPartitionSpec { pub fn fields(&self) -> &[UnboundPartitionField] { &self.fields } + + /// Change the spec id of the partition spec + pub fn with_spec_id(self, spec_id: i32) -> Self { + Self { + spec_id: Some(spec_id), + ..self + } + } } impl From for UnboundPartitionField { @@ -1263,4 +1325,7 @@ mod tests { }] }); } + + #[test] + fn test_has_sequential_ids() {} } diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 106bfb1d8..7c9d19f10 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -39,7 +39,7 @@ use crate::{ensure_data_valid, Error, ErrorKind}; pub type SchemaId = i32; /// Reference to [`Schema`]. pub type SchemaRef = Arc; -const DEFAULT_SCHEMA_ID: SchemaId = 0; +pub(crate) const DEFAULT_SCHEMA_ID: SchemaId = 0; /// Defines schema in iceberg. #[derive(Debug, Serialize, Deserialize, Clone)] @@ -77,6 +77,7 @@ pub struct SchemaBuilder { fields: Vec, alias_to_id: BiHashMap, identifier_field_ids: HashSet, + reassign_field_ids_from: Option, } impl SchemaBuilder { @@ -86,6 +87,16 @@ impl SchemaBuilder { self } + /// Reassign all field-ids (nested) on build. + /// If `start_from` is provided, it will start reassigning from that id (inclusive). + /// If not provided, it will start from 0. + /// + /// All specified aliases and identifier fields will be updated to the new field-ids. + pub fn with_reassigned_field_ids(mut self, start_from: Option) -> Self { + self.reassign_field_ids_from = Some(start_from.unwrap_or(0)); + self + } + /// Set schema id. pub fn with_schema_id(mut self, schema_id: i32) -> Self { self.schema_id = schema_id; @@ -105,13 +116,24 @@ impl SchemaBuilder { } /// Builds the schema. - pub fn build(self) -> Result { - let highest_field_id = self.fields.iter().map(|f| f.id).max().unwrap_or(0); + pub fn build(mut self) -> Result { + let mut highest_field_id = None; + if let Some(start_from) = self.reassign_field_ids_from { + let mut id_reassigner = ReassignFieldIds::new(start_from); + self.fields = id_reassigner.reassign_field_ids(self.fields); + highest_field_id = Some(id_reassigner.next_field_id - 1); + + self.identifier_field_ids = + id_reassigner.apply_to_identifier_fields(self.identifier_field_ids)?; + self.alias_to_id = id_reassigner.apply_to_aliases(self.alias_to_id)?; + } let field_id_to_accessor = self.build_accessors(); let r#struct = StructType::new(self.fields); let id_to_field = index_by_id(&r#struct)?; + let highest_field_id = + highest_field_id.unwrap_or(id_to_field.keys().max().cloned().unwrap_or(0)); Self::validate_identifier_ids( &r#struct, @@ -266,6 +288,7 @@ impl Schema { fields: vec![], identifier_field_ids: HashSet::default(), alias_to_id: BiHashMap::default(), + reassign_field_ids_from: None, } } @@ -276,6 +299,7 @@ impl Schema { fields: self.r#struct.fields().to_vec(), alias_to_id: self.alias_to_id, identifier_field_ids: self.identifier_field_ids, + reassign_field_ids_from: None, } } @@ -343,10 +367,28 @@ impl Schema { self.id_to_name.get(&field_id).map(String::as_str) } + /// Return A HashMap matching field ids to field names. + pub(crate) fn field_id_to_name_map(&self) -> &HashMap { + &self.id_to_name + } + /// Get an accessor for retrieving data in a struct pub fn accessor_by_field_id(&self, field_id: i32) -> Option> { self.field_id_to_accessor.get(&field_id).cloned() } + + /// Check if this schema is identical to another schema semantically - excluding schema id. + pub(crate) fn is_same_schema(&self, other: &SchemaRef) -> bool { + self.as_struct().eq(other.as_struct()) + && self.identifier_field_ids().eq(other.identifier_field_ids()) + } + + /// Change the schema id of this schema. + // This is redundant with the `with_schema_id` method on the builder, but useful + // as it is infallible in contrast to the builder `build()` method. + pub(crate) fn with_schema_id(self, schema_id: SchemaId) -> Self { + Self { schema_id, ..self } + } } impl Display for Schema { @@ -944,6 +986,122 @@ impl SchemaVisitor for PruneColumn { } } +struct ReassignFieldIds { + next_field_id: i32, + old_to_new_id: HashMap, +} + +// We are not using the visitor here, as post order traversal is not desired. +// Instead we want to re-assign all fields on one level first before diving deeper. +impl ReassignFieldIds { + fn new(start_from: i32) -> Self { + Self { + next_field_id: start_from, + old_to_new_id: HashMap::new(), + } + } + + fn reassign_field_ids(&mut self, fields: Vec) -> Vec { + // Visit fields on the same level first + let outer_fields = fields + .into_iter() + .map(|field| { + self.old_to_new_id.insert(field.id, self.next_field_id); + let new_field = Arc::unwrap_or_clone(field).with_id(self.next_field_id); + self.next_field_id += 1; + Arc::new(new_field) + }) + .collect::>(); + + // Now visit nested fields + outer_fields + .into_iter() + .map(|field| { + if field.field_type.is_primitive() { + field + } else { + let mut new_field = Arc::unwrap_or_clone(field); + *new_field.field_type = self.reassign_ids_visit_type(*new_field.field_type); + Arc::new(new_field) + } + }) + .collect() + } + + fn reassign_ids_visit_type(&mut self, field_type: Type) -> Type { + match field_type { + Type::Primitive(s) => Type::Primitive(s), + Type::Struct(s) => { + let new_fields = self.reassign_field_ids(s.fields().to_vec()); + Type::Struct(StructType::new(new_fields)) + } + Type::List(l) => { + self.old_to_new_id + .insert(l.element_field.id, self.next_field_id); + let mut element_field = Arc::unwrap_or_clone(l.element_field); + element_field.id = self.next_field_id; + self.next_field_id += 1; + *element_field.field_type = self.reassign_ids_visit_type(*element_field.field_type); + Type::List(ListType { + element_field: Arc::new(element_field), + }) + } + Type::Map(m) => { + self.old_to_new_id + .insert(m.key_field.id, self.next_field_id); + let mut key_field = Arc::unwrap_or_clone(m.key_field); + key_field.id = self.next_field_id; + self.next_field_id += 1; + *key_field.field_type = self.reassign_ids_visit_type(*key_field.field_type); + + self.old_to_new_id + .insert(m.value_field.id, self.next_field_id); + let mut value_field = Arc::unwrap_or_clone(m.value_field); + value_field.id = self.next_field_id; + self.next_field_id += 1; + *value_field.field_type = self.reassign_ids_visit_type(*value_field.field_type); + + Type::Map(MapType { + key_field: Arc::new(key_field), + value_field: Arc::new(value_field), + }) + } + } + } + + fn apply_to_identifier_fields(&self, field_ids: HashSet) -> Result> { + field_ids + .into_iter() + .map(|id| { + self.old_to_new_id.get(&id).copied().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Identifier Field ID {} not found", id), + ) + }) + }) + .collect() + } + + fn apply_to_aliases(&self, alias: BiHashMap) -> Result> { + alias + .into_iter() + .map(|(name, id)| { + self.old_to_new_id + .get(&id) + .copied() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Field with id {} for alias {} not found", id, name), + ) + }) + .map(|new_id| (name, new_id)) + }) + .collect() + } +} + pub(super) mod _serde { /// This is a helper module that defines types to help with serialization/deserialization. /// For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct @@ -1063,6 +1221,8 @@ pub(super) mod _serde { mod tests { use std::collections::{HashMap, HashSet}; + use bimap::BiHashMap; + use super::DEFAULT_SCHEMA_ID; use crate::spec::datatypes::Type::{List, Map, Primitive, Struct}; use crate::spec::datatypes::{ @@ -1335,6 +1495,12 @@ table { assert_eq!(original_schema, schema); } + #[test] + fn test_highest_field_id() { + let schema = table_schema_nested(); + assert_eq!(17, schema.highest_field_id()); + } + #[test] fn test_schema_index_by_name() { let expected_name_to_id = HashMap::from( @@ -2229,4 +2395,162 @@ table { assert!(result.is_ok()); assert_eq!(result.unwrap(), Type::Struct(schema.as_struct().clone())); } + + #[test] + fn test_reassign_ids() { + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![3]) + .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 3)])) + .with_fields(vec![ + NestedField::optional(5, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(4, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + let reassigned_schema = schema + .into_builder() + .with_reassigned_field_ids(Some(0)) + .build() + .unwrap(); + + let expected = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 1)])) + .with_fields(vec![ + NestedField::optional(0, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(1, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + pretty_assertions::assert_eq!(expected, reassigned_schema); + assert_eq!(reassigned_schema.highest_field_id(), 2); + } + + #[test] + fn test_reassigned_ids_nested() { + let schema = table_schema_nested(); + let reassigned_schema = schema + .into_builder() + .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 2)])) + .with_reassigned_field_ids(Some(0)) + .build() + .unwrap(); + + let expected = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 1)])) + .with_fields(vec![ + NestedField::optional(0, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(1, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + NestedField::required( + 3, + "qux", + Type::List(ListType { + element_field: NestedField::list_element( + 7, + Type::Primitive(PrimitiveType::String), + true, + ) + .into(), + }), + ) + .into(), + NestedField::required( + 4, + "quux", + Type::Map(MapType { + key_field: NestedField::map_key_element( + 8, + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 9, + Type::Map(MapType { + key_field: NestedField::map_key_element( + 10, + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 11, + Type::Primitive(PrimitiveType::Int), + true, + ) + .into(), + }), + true, + ) + .into(), + }), + ) + .into(), + NestedField::required( + 5, + "location", + Type::List(ListType { + element_field: NestedField::list_element( + 12, + Type::Struct(StructType::new(vec![ + NestedField::optional( + 13, + "latitude", + Type::Primitive(PrimitiveType::Float), + ) + .into(), + NestedField::optional( + 14, + "longitude", + Type::Primitive(PrimitiveType::Float), + ) + .into(), + ])), + true, + ) + .into(), + }), + ) + .into(), + NestedField::optional( + 6, + "person", + Type::Struct(StructType::new(vec![ + NestedField::optional(15, "name", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(16, "age", Type::Primitive(PrimitiveType::Int)) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(); + + pretty_assertions::assert_eq!(expected, reassigned_schema); + assert_eq!(reassigned_schema.highest_field_id(), 16); + assert_eq!(reassigned_schema.field_by_id(6).unwrap().name, "person"); + assert_eq!(reassigned_schema.field_by_id(16).unwrap().name, "age"); + } + + #[test] + fn test_reassign_ids_empty_schema() { + let schema = Schema::builder().with_schema_id(1).build().unwrap(); + let reassigned_schema = schema + .clone() + .into_builder() + .with_reassigned_field_ids(Some(0)) + .build() + .unwrap(); + + assert_eq!(schema, reassigned_schema); + assert_eq!(schema.highest_field_id(), 0); + } } diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 704a43b5f..87af3093e 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -22,16 +22,18 @@ use std::collections::HashMap; use std::sync::Arc; use _serde::SnapshotV2; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; -use super::table_metadata::SnapshotLog; -use crate::error::Result; +use crate::error::{timestamp_ms_to_utc, Result}; use crate::io::FileIO; use crate::spec::{ManifestList, SchemaId, SchemaRef, StructType, TableMetadata}; use crate::{Error, ErrorKind}; +/// The ref name of the main branch of the table. +pub const MAIN_BRANCH: &str = "main"; + /// Reference to [`Snapshot`]. pub type SnapshotRef = Arc; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] @@ -125,8 +127,14 @@ impl Snapshot { } /// Get the timestamp of when the snapshot was created #[inline] - pub fn timestamp(&self) -> DateTime { - Utc.timestamp_millis_opt(self.timestamp_ms).unwrap() + pub fn timestamp(&self) -> Result> { + timestamp_ms_to_utc(self.timestamp_ms) + } + + /// Get the timestamp of when the snapshot was created in milliseconds + #[inline] + pub fn timestamp_ms(&self) -> i64 { + self.timestamp_ms } /// Get the schema id of this snapshot. @@ -183,13 +191,6 @@ impl Snapshot { partition_type_provider, ) } - - pub(crate) fn log(&self) -> SnapshotLog { - SnapshotLog { - timestamp_ms: self.timestamp_ms, - snapshot_id: self.snapshot_id, - } - } } pub(super) mod _serde { @@ -386,8 +387,9 @@ mod tests { assert_eq!(3051729675574597004, result.snapshot_id()); assert_eq!( Utc.timestamp_millis_opt(1515100955770).unwrap(), - result.timestamp() + result.timestamp().unwrap() ); + assert_eq!(1515100955770, result.timestamp_ms()); assert_eq!( Summary { operation: Operation::Append, diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs index 82909344a..706a76deb 100644 --- a/crates/iceberg/src/spec/sort.rs +++ b/crates/iceberg/src/spec/sort.rs @@ -112,7 +112,7 @@ pub struct SortOrder { } impl SortOrder { - const UNSORTED_ORDER_ID: i64 = 0; + pub(crate) const UNSORTED_ORDER_ID: i64 = 0; /// Create sort order builder pub fn builder() -> SortOrderBuilder { @@ -133,12 +133,20 @@ impl SortOrder { pub fn is_unsorted(&self) -> bool { self.fields.is_empty() } + + /// Set the order id for the sort order + pub fn with_order_id(&self, order_id: i64) -> SortOrder { + SortOrder { + order_id, + fields: self.fields.clone(), + } + } } impl SortOrderBuilder { /// Creates a new unbound sort order. pub fn build_unbound(&self) -> Result { - let fields = self.fields.clone().unwrap_or_default(); + let fields: Vec = self.fields.clone().unwrap_or_default(); return match (self.order_id, fields.as_slice()) { (Some(SortOrder::UNSORTED_ORDER_ID) | None, []) => Ok(SortOrder::unsorted_order()), (_, []) => Err(Error::new( @@ -160,13 +168,13 @@ impl SortOrderBuilder { } /// Creates a new bound sort order. - pub fn build(&self, schema: Schema) -> Result { + pub fn build(&self, schema: &Schema) -> Result { let unbound_sort_order = self.build_unbound()?; SortOrderBuilder::check_compatibility(unbound_sort_order, schema) } /// Returns the given sort order if it is compatible with the given schema - fn check_compatibility(sort_order: SortOrder, schema: Schema) -> Result { + fn check_compatibility(sort_order: SortOrder, schema: &Schema) -> Result { let sort_fields = &sort_order.fields; for sort_field in sort_fields { match schema.field_by_id(sort_field.source_id) { @@ -290,6 +298,35 @@ mod tests { ) } + #[test] + fn test_build_unbound_returns_correct_default_order_id_for_no_fields() { + assert_eq!( + SortOrder::builder() + .build_unbound() + .expect("Expected an Ok value") + .order_id, + SortOrder::UNSORTED_ORDER_ID + ) + } + + #[test] + fn test_build_unbound_returns_correct_default_order_id_for_fields() { + let sort_field = SortField::builder() + .source_id(2) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .transform(Transform::Identity) + .build(); + assert_ne!( + SortOrder::builder() + .with_sort_field(sort_field.clone()) + .build_unbound() + .expect("Expected an Ok value") + .order_id, + SortOrder::UNSORTED_ORDER_ID + ) + } + #[test] fn test_build_unbound_should_return_unsorted_sort_order() { assert_eq!( @@ -367,7 +404,7 @@ mod tests { .transform(Transform::Identity) .build(), ) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result @@ -406,7 +443,7 @@ mod tests { .transform(Transform::Identity) .build(), ) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result @@ -438,7 +475,7 @@ mod tests { .transform(Transform::Year) .build(), ) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result @@ -468,7 +505,7 @@ mod tests { let sort_order_builder_result = SortOrder::builder() .with_sort_field(sort_field.clone()) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result.expect("Expected an Ok value"), diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index dacd5bcd7..d2d22c847 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -17,7 +17,6 @@ //! Defines the [table metadata](https://iceberg.apache.org/spec/#table-metadata). //! The main struct here is [TableMetadataV2] which defines the data for a table. - use std::cmp::Ordering; use std::collections::HashMap; use std::fmt::{Display, Formatter}; @@ -29,20 +28,28 @@ use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; -use super::snapshot::{Snapshot, SnapshotReference, SnapshotRetention}; +use super::snapshot::SnapshotReference; +pub use super::table_metadata_builder::TableMetadataBuilder; use super::{ - PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef, SortOrder, SortOrderRef, - DEFAULT_PARTITION_SPEC_ID, + PartitionSpec, PartitionSpecRef, Schema, SchemaId, SchemaRef, SnapshotRef, SortOrder, + SortOrderRef, UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, }; use crate::error::{timestamp_ms_to_utc, Result}; -use crate::{Error, ErrorKind, TableCreation}; +use crate::{Error, ErrorKind}; static MAIN_BRANCH: &str = "main"; -static DEFAULT_SORT_ORDER_ID: i64 = 0; +pub(crate) static ONE_MINUTE_MS: i64 = 60_000; pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1; pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0; +/// Property key for the format version. +pub const PROPERTY_FORMAT_VERSION: &str = "format-version"; +/// Property key for max number of previous versions to keep. +pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previous-versions-max"; +/// Default value for max number of previous versions to keep. +pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100; + /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; @@ -117,6 +124,58 @@ pub struct TableMetadata { } impl TableMetadata { + /// Creates a new table metadata. + /// + /// This method re-assign ids of fields in the schema, schema.id, sort_order.id and + /// spec.id. It should only be used to create new table metadata from scratch. + pub fn new( + schema: Schema, + spec: impl Into, + sort_order: SortOrder, + location: String, + format_version: FormatVersion, + properties: HashMap, + ) -> Result { + Self::builder( + schema, + spec, + sort_order, + location, + format_version, + properties, + )? + .build() + .map(|x| x.metadata) + } + + /// Create a new table metadata builder + pub fn builder( + schema: Schema, + spec: impl Into, + sort_order: SortOrder, + location: String, + format_version: FormatVersion, + properties: HashMap, + ) -> Result { + TableMetadataBuilder::new( + schema, + spec.into(), + sort_order, + location, + format_version, + properties, + ) + } + + /// Convert this Table Metadata into a builder for modification. + /// + /// `current_file_location` is the location where the current version + /// of the metadata file is stored. This is used to update the metadata log. + #[must_use] + pub fn into_builder(self, current_file_location: impl Into) -> TableMetadataBuilder { + TableMetadataBuilder::new_from_metadata(self, current_file_location) + } + /// Returns format version of this metadata. #[inline] pub fn format_version(&self) -> FormatVersion { @@ -186,15 +245,9 @@ impl TableMetadata { /// Get default partition spec #[inline] - pub fn default_partition_spec(&self) -> Option<&PartitionSpecRef> { - if self.default_spec_id == DEFAULT_PARTITION_SPEC_ID { - self.partition_spec_by_id(DEFAULT_PARTITION_SPEC_ID) - } else { - Some( - self.partition_spec_by_id(self.default_spec_id) - .expect("Default partition spec id set, but not found in table metadata"), - ) - } + pub fn default_partition_spec(&self) -> &PartitionSpecRef { + self.partition_spec_by_id(self.default_spec_id) + .expect("Default partition spec id set, but not found in table metadata") } /// Returns all snapshots @@ -215,6 +268,12 @@ impl TableMetadata { &self.snapshot_log } + /// Returns the metadata log. + #[inline] + pub fn metadata_log(&self) -> &[MetadataLog] { + &self.metadata_log + } + /// Get current snapshot #[inline] pub fn current_snapshot(&self) -> Option<&SnapshotRef> { @@ -224,6 +283,16 @@ impl TableMetadata { }) } + /// Get the snapshot for a reference + /// Returns an option if the `ref_name` is not found + #[inline] + pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> { + self.refs.get(ref_name).map(|r| { + self.snapshot_by_id(r.snapshot_id) + .unwrap_or_else(|| panic!("Snapshot id of ref {} doesn't exist", ref_name)) + }) + } + /// Return all sort orders. #[inline] pub fn sort_orders_iter(&self) -> impl Iterator { @@ -238,16 +307,10 @@ impl TableMetadata { /// Returns default sort order id. #[inline] - pub fn default_sort_order(&self) -> Option<&SortOrderRef> { - if self.default_sort_order_id == DEFAULT_SORT_ORDER_ID { - self.sort_orders.get(&DEFAULT_SORT_ORDER_ID) - } else { - Some( - self.sort_orders - .get(&self.default_sort_order_id) - .expect("Default order id has been set, but not found in table metadata!"), - ) - } + pub fn default_sort_order(&self) -> &SortOrderRef { + self.sort_orders + .get(&self.default_sort_order_id) + .expect("Default order id has been set, but not found in table metadata!") } /// Returns properties of table. @@ -256,125 +319,221 @@ impl TableMetadata { &self.properties } - /// Append snapshot to table - pub fn append_snapshot(&mut self, snapshot: Snapshot) { - self.last_updated_ms = snapshot.timestamp().timestamp_millis(); - self.last_sequence_number = snapshot.sequence_number(); - - self.refs - .entry(MAIN_BRANCH.to_string()) - .and_modify(|s| { - s.snapshot_id = snapshot.snapshot_id(); - }) - .or_insert_with(|| { - SnapshotReference::new(snapshot.snapshot_id(), SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }) - }); + /// Normalize this partition spec. This is an internal method + /// meant to be called after constructing table metadata from untrusted sources. + /// We run this method after json deserialization. + /// All constructors for `TableMetadata` which are part of `iceberg-rust` + /// should return normalized `TableMetadata`. + pub(super) fn try_normalize(&mut self) -> Result<&mut Self> { + self.validate_current_schema()?; + self.normalize_current_snapshot()?; + self.validate_refs()?; + self.validate_chronological_snapshot_logs()?; + self.validate_chronological_metadata_logs()?; + self.location = self.location.trim_end_matches('/').to_string(); + self.validate_format_version_specifics()?; + self.try_normalize_partition_spec()?; + self.try_normalize_sort_order()?; + Ok(self) + } - self.snapshot_log.push(snapshot.log()); - self.snapshots - .insert(snapshot.snapshot_id(), Arc::new(snapshot)); + fn try_normalize_partition_spec(&mut self) -> Result<()> { + if self.partition_spec_by_id(self.default_spec_id).is_none() { + if self.default_spec_id == DEFAULT_PARTITION_SPEC_ID { + let partition_spec = PartitionSpec { + spec_id: DEFAULT_PARTITION_SPEC_ID, + fields: vec![], + }; + self.partition_specs + .insert(DEFAULT_PARTITION_SPEC_ID, Arc::new(partition_spec)); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "No partition spec exists with the default spec id {}.", + self.default_spec_id + ), + )); + } + } + if self.partition_specs.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Partition specs cannot be null or empty", + )); + } + Ok(()) } -} -/// Manipulating table metadata. -pub struct TableMetadataBuilder(TableMetadata); + fn try_normalize_sort_order(&mut self) -> Result<()> { + if self.sort_order_by_id(self.default_sort_order_id).is_none() { + if self.default_sort_order_id == SortOrder::UNSORTED_ORDER_ID { + let sort_order = SortOrder::unsorted_order(); + self.sort_orders + .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order)); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "No sort order exists with the default sort order id {}.", + self.default_sort_order_id + ), + )); + } + } -impl TableMetadataBuilder { - /// Creates a new table metadata builder from the given table metadata. - pub fn new(origin: TableMetadata) -> Self { - Self(origin) + if self.sort_orders.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Sort orders cannot be null or empty", + )); + } + Ok(()) } - /// Creates a new table metadata builder from the given table creation. - pub fn from_table_creation(table_creation: TableCreation) -> Result { - let TableCreation { - name: _, - location, - schema, - partition_spec, - sort_order, - properties, - } = table_creation; + fn validate_current_schema(&self) -> Result<()> { + if self.schema_by_id(self.current_schema_id).is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "No schema exists with the current schema id {}.", + self.current_schema_id + ), + )); + } + Ok(()) + } - let partition_specs = match partition_spec { - Some(_) => { + /// If current snapshot is Some(-1) then set it to None. + fn normalize_current_snapshot(&mut self) -> Result<()> { + if let Some(current_snapshot_id) = self.current_snapshot_id { + if current_snapshot_id == EMPTY_SNAPSHOT_ID { + self.current_snapshot_id = None; + } else if self.snapshot_by_id(current_snapshot_id).is_none() { return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Can't create table with partition spec now", - )) + ErrorKind::DataInvalid, + format!( + "Snapshot for current snapshot id {} does not exist in the existing snapshots list", + current_snapshot_id + ), + )); } - None => HashMap::from([( - DEFAULT_PARTITION_SPEC_ID, - Arc::new(PartitionSpec { - spec_id: DEFAULT_PARTITION_SPEC_ID, - fields: vec![], - }), - )]), - }; + } + Ok(()) + } - let sort_orders = match sort_order { - Some(_) => { + fn validate_refs(&self) -> Result<()> { + for (name, snapshot_ref) in self.refs.iter() { + if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() { return Err(Error::new( - ErrorKind::FeatureUnsupported, - "Can't create table with sort order now", - )) + ErrorKind::DataInvalid, + format!( + "Snapshot for reference {name} does not exist in the existing snapshots list" + ), + )); } - None => HashMap::from([( - DEFAULT_SORT_ORDER_ID, - Arc::new(SortOrder { - order_id: DEFAULT_SORT_ORDER_ID, - fields: vec![], - }), - )]), - }; + } - let table_metadata = TableMetadata { - format_version: FormatVersion::V2, - table_uuid: Uuid::now_v7(), - location: location.ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - "Can't create table without location", - ) - })?, - last_sequence_number: 0, - last_updated_ms: Utc::now().timestamp_millis(), - last_column_id: schema.highest_field_id(), - current_schema_id: schema.schema_id(), - schemas: HashMap::from([(schema.schema_id(), Arc::new(schema))]), - partition_specs, - default_spec_id: DEFAULT_PARTITION_SPEC_ID, - last_partition_id: 0, - properties, - current_snapshot_id: None, - snapshots: Default::default(), - snapshot_log: vec![], - sort_orders, - metadata_log: vec![], - default_sort_order_id: DEFAULT_SORT_ORDER_ID, - refs: Default::default(), - }; + let main_ref = self.refs.get(MAIN_BRANCH); + if self.current_snapshot_id.is_some() { + if let Some(main_ref) = main_ref { + if main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Current snapshot id does not match main branch ({:?} != {:?})", + self.current_snapshot_id.unwrap_or_default(), + main_ref.snapshot_id + ), + )); + } + } + } else if main_ref.is_some() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Current snapshot is not set, but main branch exists", + )); + } - Ok(Self(table_metadata)) + Ok(()) } - /// Changes uuid of table metadata. - pub fn assign_uuid(mut self, uuid: Uuid) -> Result { - self.0.table_uuid = uuid; - Ok(self) + fn validate_format_version_specifics(&self) -> Result<()> { + if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Last sequence number must be 0 in v1. Found {}", + self.last_sequence_number + ), + )); + } + + Ok(()) } - /// Returns the new table metadata after changes. - pub fn build(self) -> Result { - Ok(self.0) + fn validate_chronological_snapshot_logs(&self) -> Result<()> { + for window in self.snapshot_log.windows(2) { + let (prev, curr) = (&window[0], &window[1]); + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + "Expected sorted snapshot log entries", + )); + } + } + + if let Some(last) = self.snapshot_log.last() { + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid update timestamp {}: before last snapshot log entry at {}", + self.last_updated_ms, last.timestamp_ms + ), + )); + } + } + Ok(()) + } + + fn validate_chronological_metadata_logs(&self) -> Result<()> { + for window in self.metadata_log.windows(2) { + let (prev, curr) = (&window[0], &window[1]); + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + "Expected sorted metadata log entries", + )); + } + } + + if let Some(last) = self.metadata_log.last() { + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid update timestamp {}: before last metadata log entry at {}", + self.last_updated_ms, last.timestamp_ms + ), + )); + } + } + + Ok(()) } } pub(super) mod _serde { + use std::borrow::BorrowMut; /// This is a helper module that defines types to help with serialization/deserialization. /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct /// and then converted into the [TableMetadata] struct. Serialization works the other way around. @@ -386,19 +545,18 @@ pub(super) mod _serde { /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization. use std::sync::Arc; - use itertools::Itertools; use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::{ FormatVersion, MetadataLog, SnapshotLog, TableMetadata, DEFAULT_PARTITION_SPEC_ID, - DEFAULT_SORT_ORDER_ID, MAIN_BRANCH, + MAIN_BRANCH, }; use crate::spec::schema::_serde::{SchemaV1, SchemaV2}; use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2}; use crate::spec::{ PartitionField, PartitionSpec, Schema, Snapshot, SnapshotReference, SnapshotRetention, - SortOrder, EMPTY_SNAPSHOT_ID, + SortOrder, }; use crate::{Error, ErrorKind}; @@ -545,24 +703,14 @@ pub(super) mod _serde { .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?)))) .collect::, Error>>()?, ); - Ok(TableMetadata { + let mut metadata = TableMetadata { format_version: FormatVersion::V2, table_uuid: value.table_uuid, location: value.location, last_sequence_number: value.last_sequence_number, last_updated_ms: value.last_updated_ms, last_column_id: value.last_column_id, - current_schema_id: if schemas.keys().contains(&value.current_schema_id) { - Ok(value.current_schema_id) - } else { - Err(self::Error::new( - ErrorKind::DataInvalid, - format!( - "No schema exists with the current schema id {}.", - value.current_schema_id - ), - )) - }?, + current_schema_id: value.current_schema_id, schemas, partition_specs: HashMap::from_iter( value @@ -607,13 +755,21 @@ pub(super) mod _serde { HashMap::new() } }), - }) + }; + + metadata.borrow_mut().try_normalize()?; + Ok(metadata) } } impl TryFrom for TableMetadata { type Error = Error; fn try_from(value: TableMetadataV1) -> Result { + let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id { + None + } else { + value.current_snapshot_id + }; let schemas = value .schemas .map(|schemas| { @@ -650,7 +806,7 @@ pub(super) mod _serde { .into_iter() .map(|x| (x.spec_id(), Arc::new(x))), ); - Ok(TableMetadata { + let mut metadata = TableMetadata { format_version: FormatVersion::V1, table_uuid: value.table_uuid.unwrap_or_default(), location: value.location, @@ -668,17 +824,8 @@ pub(super) mod _serde { .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()), partition_specs, schemas, - properties: value.properties.unwrap_or_default(), - current_snapshot_id: if let &Some(id) = &value.current_snapshot_id { - if id == EMPTY_SNAPSHOT_ID { - None - } else { - Some(id) - } - } else { - value.current_snapshot_id - }, + current_snapshot_id, snapshots: value .snapshots .map(|snapshots| { @@ -699,16 +846,25 @@ pub(super) mod _serde { ), None => HashMap::new(), }, - default_sort_order_id: value.default_sort_order_id.unwrap_or(DEFAULT_SORT_ORDER_ID), - refs: HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference { - snapshot_id: value.current_snapshot_id.unwrap_or_default(), - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, - })]), - }) + default_sort_order_id: value + .default_sort_order_id + .unwrap_or(SortOrder::UNSORTED_ORDER_ID), + refs: if let Some(snapshot_id) = current_snapshot_id { + HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference { + snapshot_id, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + })]) + } else { + HashMap::new() + }, + }; + + metadata.borrow_mut().try_normalize()?; + Ok(metadata) } } @@ -983,7 +1139,7 @@ mod tests { "current-schema-id" : 1, "partition-specs": [ { - "spec-id": 1, + "spec-id": 0, "fields": [ { "source-id": 4, @@ -994,7 +1150,7 @@ mod tests { ] } ], - "default-spec-id": 1, + "default-spec-id": 0, "last-partition-id": 1000, "properties": { "commit.retry.num-retries": "1" @@ -1005,7 +1161,12 @@ mod tests { "timestamp-ms": 1515100 } ], - "sort-orders": [], + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], "default-sort-order-id": 0 } "#; @@ -1021,7 +1182,7 @@ mod tests { .unwrap(); let partition_spec = PartitionSpec { - spec_id: 1, + spec_id: 0, fields: vec![PartitionField { name: "ts_day".to_string(), transform: Transform::Day, @@ -1038,11 +1199,11 @@ mod tests { last_column_id: 1, schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]), current_schema_id: 1, - partition_specs: HashMap::from_iter(vec![(1, partition_spec.into())]), - default_spec_id: 1, + partition_specs: HashMap::from_iter(vec![(0, partition_spec.into())]), + default_spec_id: 0, last_partition_id: 1000, default_sort_order_id: 0, - sort_orders: HashMap::from_iter(vec![]), + sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]), snapshots: HashMap::default(), current_snapshot_id: None, last_sequence_number: 1, @@ -1227,6 +1388,280 @@ mod tests { check_table_metadata_serde(data, expected); } + #[test] + fn test_current_snapshot_id_must_match_main_branch() { + let data = r#" + { + "format-version" : 2, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 1, + "name": "struct_name", + "required": true, + "type": "fixed[1]" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 0, + "last-partition-id": 1000, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "default-sort-order-id": 0, + "current-snapshot-id" : 1, + "refs" : { + "main" : { + "snapshot-id" : 2, + "type" : "branch" + } + }, + "snapshots" : [ { + "snapshot-id" : 1, + "timestamp-ms" : 1662532818843, + "sequence-number" : 0, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1662532784305", + "added-data-files" : "4", + "added-records" : "4", + "added-files-size" : "6001" + }, + "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro", + "schema-id" : 0 + }, + { + "snapshot-id" : 2, + "timestamp-ms" : 1662532818844, + "sequence-number" : 0, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1662532784305", + "added-data-files" : "4", + "added-records" : "4", + "added-files-size" : "6001" + }, + "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro", + "schema-id" : 0 + } ] + } + "#; + + let err = serde_json::from_str::(data).unwrap_err(); + assert!(err + .to_string() + .contains("Current snapshot id does not match main branch")); + } + + #[test] + fn test_main_without_current() { + let data = r#" + { + "format-version" : 2, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 1, + "name": "struct_name", + "required": true, + "type": "fixed[1]" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 0, + "last-partition-id": 1000, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "default-sort-order-id": 0, + "refs" : { + "main" : { + "snapshot-id" : 1, + "type" : "branch" + } + }, + "snapshots" : [ { + "snapshot-id" : 1, + "timestamp-ms" : 1662532818843, + "sequence-number" : 0, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1662532784305", + "added-data-files" : "4", + "added-records" : "4", + "added-files-size" : "6001" + }, + "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro", + "schema-id" : 0 + } ] + } + "#; + + let err = serde_json::from_str::(data).unwrap_err(); + assert!(err + .to_string() + .contains("Current snapshot is not set, but main branch exists")); + } + + #[test] + fn test_branch_snapshot_missing() { + let data = r#" + { + "format-version" : 2, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 1, + "name": "struct_name", + "required": true, + "type": "fixed[1]" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 0, + "last-partition-id": 1000, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "default-sort-order-id": 0, + "refs" : { + "main" : { + "snapshot-id" : 1, + "type" : "branch" + }, + "foo" : { + "snapshot-id" : 2, + "type" : "branch" + } + }, + "snapshots" : [ { + "snapshot-id" : 1, + "timestamp-ms" : 1662532818843, + "sequence-number" : 0, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1662532784305", + "added-data-files" : "4", + "added-records" : "4", + "added-files-size" : "6001" + }, + "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro", + "schema-id" : 0 + } ] + } + "#; + + let err = serde_json::from_str::(data).unwrap_err(); + assert!(err + .to_string() + .contains("Snapshot for reference foo does not exist in the existing snapshots list")); + } + #[test] fn test_invalid_table_uuid() -> Result<()> { let data = r#" @@ -1514,21 +1949,15 @@ mod tests { default_spec_id: 0, last_partition_id: 0, default_sort_order_id: 0, - sort_orders: HashMap::new(), + // Sort order is added during deserialization for V2 compatibility + sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]), snapshots: HashMap::new(), current_snapshot_id: None, last_sequence_number: 0, properties: HashMap::new(), snapshot_log: vec![], metadata_log: Vec::new(), - refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { - snapshot_id: -1, - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, - })]), + refs: HashMap::new(), }; check_table_metadata_serde(&metadata, expected); @@ -1637,7 +2066,9 @@ mod tests { assert_eq!( table_meta_data.default_partition_spec(), - table_meta_data.partition_spec_by_id(default_spec_id) + table_meta_data + .partition_spec_by_id(default_spec_id) + .unwrap() ); } #[test] @@ -1651,7 +2082,10 @@ mod tests { assert_eq!( table_meta_data.default_sort_order(), - table_meta_data.sort_orders.get(&default_sort_order_id) + table_meta_data + .sort_orders + .get(&default_sort_order_id) + .unwrap() ) } @@ -1666,7 +2100,8 @@ mod tests { let table_metadata = TableMetadataBuilder::from_table_creation(table_creation) .unwrap() .build() - .unwrap(); + .unwrap() + .metadata; assert_eq!(table_metadata.location, "s3://db/table"); assert_eq!(table_metadata.schemas.len(), 1); assert_eq!( @@ -1707,13 +2142,16 @@ mod tests { #[test] fn test_table_builder_from_table_metadata() { let table_metadata = get_test_table_metadata("TableMetadataV2Valid.json"); - let table_metadata_builder = TableMetadataBuilder::new(table_metadata); + let table_metadata_builder = TableMetadataBuilder::new_from_metadata( + table_metadata.clone(), + "TableMetadataV2Valid.json", + ); let uuid = Uuid::new_v4(); let table_metadata = table_metadata_builder .assign_uuid(uuid) - .unwrap() .build() - .unwrap(); + .unwrap() + .metadata; assert_eq!(table_metadata.uuid(), uuid); } } diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs new file mode 100644 index 000000000..d1108fe8f --- /dev/null +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -0,0 +1,1796 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use uuid::Uuid; + +use super::{ + FormatVersion, MetadataLog, PartitionSpec, PartitionSpecBuilder, PartitionSpecRef, Schema, + SchemaRef, Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, TableMetadata, + UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, MAIN_BRANCH, ONE_MINUTE_MS, + PROPERTY_FORMAT_VERSION, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, + UNPARTITIONED_LAST_ASSIGNED_ID, +}; +use crate::error::{Error, ErrorKind, Result}; +use crate::{TableCreation, TableUpdate}; + +/// Manipulating table metadata. +/// +/// For this builder the order of called functions matters. Functions are applied in-order. +/// All operations applied to the `TableMetadata` are tracked in `changes` as a chronologically +/// ordered vec of `TableUpdate`. +/// If an operation does not lead to a change of the `TableMetadata`, the corresponding update +/// is omitted from `changes`. +/// +/// Unlike a typical builder pattern, the order of function calls matters. +/// Some basic rules: +/// - `add_schema` must be called before `set_current_schema`. +/// - If a new partition spec and schema are added, the schema should be added first. +#[derive(Debug, Clone)] +pub struct TableMetadataBuilder { + metadata: TableMetadata, + changes: Vec, + last_added_schema_id: Option, + last_added_spec_id: Option, + last_added_order_id: Option, + // None if this is a new table (from_metadata) method not used + previous_history_entry: Option, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct TableMetadataBuildResult { + pub metadata: TableMetadata, + pub changes: Vec, +} + +impl TableMetadataBuilder { + const LAST_ADDED: i32 = -1; + + /// Create a `TableMetadata` object from scratch. + /// + /// This method re-assign ids of fields in the schema, schema.id, sort_order.id and + /// spec.id. It should only be used to create new table metadata from scratch. + pub fn new( + schema: Schema, + spec: impl Into, + sort_order: SortOrder, + location: String, + format_version: FormatVersion, + properties: HashMap, + ) -> Result { + // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table. + let (fresh_schema, fresh_spec, fresh_sort_order) = + Self::reassign_ids(schema, spec.into(), sort_order)?; + let schema_id = fresh_schema.schema_id(); + + let builder = Self { + metadata: TableMetadata { + format_version, + table_uuid: Uuid::now_v7(), + location: "".to_string(), // Overwritten immediately by set_location + last_sequence_number: 0, + last_updated_ms: 0, // Overwritten by build() if not set before + last_column_id: -1, // Overwritten immediately by add_current_schema + current_schema_id: -1, // Overwritten immediately by add_current_schema + schemas: HashMap::new(), + partition_specs: HashMap::new(), + default_spec_id: Self::LAST_ADDED, // Overwritten immediately by add_default_partition_spec + last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID, + properties: HashMap::new(), + current_snapshot_id: None, + snapshots: HashMap::new(), + snapshot_log: vec![], + sort_orders: HashMap::new(), + metadata_log: vec![], + default_sort_order_id: -1, // Overwritten immediately by add_default_sort_order + refs: HashMap::default(), + }, + changes: vec![], + last_added_schema_id: Some(schema_id), + last_added_spec_id: None, + last_added_order_id: None, + previous_history_entry: None, + }; + + builder + .set_location(location) + .add_current_schema(fresh_schema)? + .add_default_partition_spec(fresh_spec.into_unbound())? + .add_default_sort_order(fresh_sort_order)? + .set_properties(properties) + } + + /// Creates a new table metadata builder from the given metadata to modify it. + /// Previous file location is used to populate the Metadata Log. + #[must_use] + pub fn new_from_metadata( + previous: TableMetadata, + previous_file_location: impl Into, + ) -> Self { + Self { + previous_history_entry: Some(MetadataLog { + metadata_file: previous_file_location.into(), + timestamp_ms: previous.last_updated_ms, + }), + metadata: previous, + changes: Vec::default(), + last_added_schema_id: None, + last_added_spec_id: None, + last_added_order_id: None, + } + } + + /// Creates a new table metadata builder from the given table creation. + pub fn from_table_creation(table_creation: TableCreation) -> Result { + let TableCreation { + name: _, + location, + schema, + partition_spec, + sort_order, + properties, + } = table_creation; + + let location = location.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't create table without location", + ) + })?; + let partition_spec = partition_spec.unwrap_or( + PartitionSpec { + spec_id: DEFAULT_PARTITION_SPEC_ID, + fields: vec![], + } + .into(), + ); + + Self::new( + schema, + partition_spec, + sort_order.unwrap_or(SortOrder::unsorted_order()), + location, + FormatVersion::V1, + properties, + ) + } + + /// Get the current schema with all changes applied up to this point. + #[inline] + pub fn current_schema(&self) -> &SchemaRef { + self.metadata.current_schema() + } + + /// Get the current last column id + #[inline] + pub fn last_column_id(&self) -> i32 { + self.metadata.last_column_id + } + + /// Get the current last updated timestamp + #[inline] + pub fn last_updated_ms(&self) -> i64 { + self.metadata.last_updated_ms + } + + /// Changes uuid of table metadata. + pub fn assign_uuid(mut self, uuid: Uuid) -> Self { + if self.metadata.table_uuid != uuid { + self.metadata.table_uuid = uuid; + self.changes.push(TableUpdate::AssignUuid { uuid }); + } + + self + } + + /// Upgrade `FormatVersion`. Downgrades are not allowed. + /// + /// # Errors + /// - Cannot downgrade to older format versions. + pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result { + if format_version < self.metadata.format_version { + return Err(Error::new( + ErrorKind::ValidationFailed, + format!( + "Cannot downgrade FormatVersion from {} to {}", + self.metadata.format_version, format_version + ), + )); + } + + if format_version != self.metadata.format_version { + self.metadata.format_version = format_version; + self.changes + .push(TableUpdate::UpgradeFormatVersion { format_version }); + } + + Ok(self) + } + + /// Set properties. If a property already exists, it will be overwritten. + /// + /// If a reserved property is set, the corresponding action is performed and the property is not persisted. + /// Currently the following reserved properties are supported: + /// * format-version: Set the format version of the table. + /// + /// # Errors + /// - If format-version property is set to a lower version than the current format version. + pub fn set_properties(mut self, properties: HashMap) -> Result { + // ToDo: Reserved properties? + if properties.is_empty() { + return Ok(self); + } + + self.metadata.properties.extend(properties.clone()); + self.changes.push(TableUpdate::SetProperties { + updates: properties, + }); + + Ok(self) + } + + /// Remove properties from the table metadata. + /// Does nothing if the key is not present. + pub fn remove_properties(mut self, properties: &[String]) -> Self { + for property in properties { + self.metadata.properties.remove(property); + } + + if !properties.is_empty() { + self.changes.push(TableUpdate::RemoveProperties { + removals: properties.to_vec(), + }); + } + + self + } + + /// Set the location of the table metadata, stripping any trailing slashes. + pub fn set_location(mut self, location: String) -> Self { + let location = location.trim_end_matches('/').to_string(); + if self.metadata.location != location { + self.changes.push(TableUpdate::SetLocation { + location: location.clone(), + }); + self.metadata.location = location; + } + + self + } + + /// Add a snapshot to the table metadata. + /// + /// # Errors + /// - No schema has been added to the table metadata. + /// - No partition spec has been added to the table metadata. + /// - No sort order has been added to the table metadata. + /// - Snapshot id already exists. + /// - For format version > 1: the sequence number of the snapshot is loser than the highest sequence number specified so far. + pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result { + if self.metadata.partition_specs.is_empty() { + return Err(Error::new( + ErrorKind::ValidationFailed, + "Attempting to add a snapshot before a partition spec is added", + )); + } + + if self.metadata.sort_orders.is_empty() { + return Err(Error::new( + ErrorKind::ValidationFailed, + "Attempting to add a snapshot before a sort order is added", + )); + } + + if self + .metadata + .snapshots + .contains_key(&snapshot.snapshot_id()) + { + return Err(Error::new( + ErrorKind::ValidationFailed, + format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()), + )); + } + + if self.metadata.format_version != FormatVersion::V1 + && snapshot.sequence_number() <= self.metadata.last_sequence_number + && snapshot.parent_snapshot_id().is_some() + { + return Err(Error::new( + ErrorKind::ValidationFailed, + format!( + "Cannot add snapshot with sequence number {} older than last sequence number {}", + snapshot.sequence_number(), + self.metadata.last_sequence_number + ) + )); + } + + if let Some(last) = self.metadata.snapshot_log.last() { + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid snapshot timestamp {}: before last snapshot timestamp {}", + snapshot.timestamp_ms(), + last.timestamp_ms + ), + )); + } + } + + if snapshot.timestamp_ms() - self.metadata.last_updated_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid snapshot timestamp {}: before last updated timestamp {}", + snapshot.timestamp_ms(), + self.metadata.last_updated_ms + ), + )); + } + + // Mutation happens in next line - must be infallible from here + self.changes.push(TableUpdate::AddSnapshot { + snapshot: snapshot.clone(), + }); + + self.metadata.last_updated_ms = snapshot.timestamp_ms(); + self.metadata.last_sequence_number = snapshot.sequence_number(); + self.metadata + .snapshots + .insert(snapshot.snapshot_id(), snapshot.into()); + + Ok(self) + } + + /// Append a snapshot to the specified branch. + /// If branch is not specified, the snapshot is appended to the main branch. + /// The `ref` must already exist. Retention settings from the `ref` are re-used. + /// + /// # Errors + /// - The ref is unknown. + /// - Any of the preconditions of `self.add_snapshot` are not met. + pub fn append_snapshot(self, snapshot: Snapshot, ref_name: Option<&str>) -> Result { + let ref_name = ref_name.unwrap_or(MAIN_BRANCH); + let mut reference = self + .metadata + .refs + .get(ref_name) + .ok_or_else(|| { + Error::new( + ErrorKind::ValidationFailed, + format!("Cannot append snapshot to unknown ref: '{}'", ref_name), + ) + })? + .clone(); + + reference.snapshot_id = snapshot.snapshot_id(); + + self.add_snapshot(snapshot)?.set_ref(ref_name, reference) + } + + /// Remove snapshots by its ids from the table metadata. + /// Does nothing if a snapshot id is not present. + /// Keeps as changes only the snapshots that were actually removed. + pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self { + let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len()); + + self.metadata.snapshots.retain(|k, _| { + if snapshot_ids.contains(k) { + removed_snapshots.push(*k); + false + } else { + true + } + }); + + if !removed_snapshots.is_empty() { + self.changes.push(TableUpdate::RemoveSnapshots { + snapshot_ids: removed_snapshots, + }); + } + + // Remove refs that are no longer valid + self.metadata + .refs + .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id)); + + self + } + + /// Set a reference to a snapshot. + /// + /// # Errors + /// - The snapshot id is unknown. + pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result { + if self + .metadata + .refs + .get(ref_name) + .is_some_and(|snap_ref| snap_ref.eq(&reference)) + { + return Ok(self); + } + + let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else { + return Err(Error::new( + ErrorKind::ValidationFailed, + format!( + "Cannot set '{ref_name}' to unknown snapshot: '{}'", + reference.snapshot_id + ), + )); + }; + + // Update last_updated_ms to the exact timestamp of the snapshot if it was added in this commit + let is_added_snapshot = self.changes.iter().any(|update| { + matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id()) + }); + if is_added_snapshot { + self.metadata.last_updated_ms = snapshot.timestamp_ms(); + } + + // Current snapshot id is set only for the main branch + if ref_name == MAIN_BRANCH { + self.metadata.current_snapshot_id = Some(snapshot.snapshot_id()); + if self.metadata.last_updated_ms == i64::default() { + self.metadata.last_updated_ms = chrono::Utc::now().timestamp_millis(); + }; + + self.metadata.snapshot_log.push(SnapshotLog { + snapshot_id: snapshot.snapshot_id(), + timestamp_ms: self.metadata.last_updated_ms, + }); + } + + self.changes.push(TableUpdate::SetSnapshotRef { + ref_name: ref_name.to_string(), + reference: reference.clone(), + }); + self.metadata.refs.insert(ref_name.to_string(), reference); + + Ok(self) + } + + /// Remove a reference + /// + /// If `ref_name='main'` the current snapshot id is set to -1. + pub fn remove_ref(mut self, ref_name: &str) -> Self { + if ref_name == MAIN_BRANCH { + self.metadata.current_snapshot_id = Some(i64::from(Self::LAST_ADDED)); + self.metadata.snapshot_log.clear(); + } + + if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH { + self.changes.push(TableUpdate::RemoveSnapshotRef { + ref_name: ref_name.to_string(), + }); + } + + self + } + + /// Add a schema to the table metadata. + /// + /// The provided `schema.schema_id` may not be used. + + // ToDo Discuss: Should we add `new_last_column_id` argument? + // TLDR; I believe not as it acts as an assertion and its purpose (and source) is not clear. We shouldn't add it. + // + // Schemas can contain only old columns or a mix of old and new columns. + // In Java, if `new_last_column_id` set but too low, the function would fail, basically hinting at + // at the schema having been built for an older metadata version. `new_last_column_id` is typically obtained + // in the schema building process. + // + // This assertion is not required if the user controls the flow - he knows for which + // metadata he created a schema. If asserting the `new_last_column_id` was semantically important, it should be part of the schema and + // not be passed around alongside it. + // + // Specifying `new_last_column_id` in java also allows to set `metadata.last_column_id` to any arbitrary value + // even if its not present as a column. I believe this to be undesired behavior. This is not possible with the current Rust interface. + // + // If the schema is built out of sync with the TableMetadata, for example in a REST Catalog setting, the assertion of + // the provided `last_column_id` as part of the `TableUpdate::AddSchema` is still done in its `.apply` method. + pub fn add_schema(mut self, schema: Schema) -> Self { + // fn returns a result because I think we should check field-id <-> type compatibility if the field-id + // is still present in the metadata. This is not done in the Java code. + let new_schema_id = self.reuse_or_create_new_schema_id(&schema); + let schema_found = self.metadata.schemas.contains_key(&new_schema_id); + + if schema_found { + // ToDo Discuss: The Java code is a bit convoluted and I think it might be wrong for an edge case. + // Why is it wrong: The baseline is, that if something changes the state of the builder, it has an effect on it and + // must be recorded in the changes. + // The Java code might or might not change `lastAddedSchemaId`, and does not record this change in `changes`. + // Thus, replaying the changes, would lead to a different result if a schema is added twice in unfavorable + // conditions. + // Here we do it differently, but a check from a Java maintainer would be nice. + + if self.last_added_schema_id != Some(new_schema_id) { + self.changes.push(TableUpdate::AddSchema { + last_column_id: Some(self.metadata.last_column_id), + schema: schema.clone(), + }); + self.last_added_schema_id = Some(new_schema_id); + } + + return self; + } + + // New schemas might contain only old columns. In this case last_column_id should not be + // reduced. + self.metadata.last_column_id = + std::cmp::max(self.metadata.last_column_id, schema.highest_field_id()); + + // Set schema-id + let schema = match new_schema_id == schema.schema_id() { + true => schema, + false => schema.with_schema_id(new_schema_id), + }; + + self.metadata + .schemas + .insert(new_schema_id, schema.clone().into()); + + self.changes.push(TableUpdate::AddSchema { + schema, + last_column_id: Some(self.metadata.last_column_id), + }); + + self.last_added_schema_id = Some(new_schema_id); + + self + } + + /// Set the current schema id. + /// + /// Errors: + /// - provided `schema_id` is -1 but no schema has been added via `add_schema`. + /// - No schema with the provided `schema_id` exists. + pub fn set_current_schema(mut self, mut schema_id: i32) -> Result { + if schema_id == Self::LAST_ADDED { + schema_id = self.last_added_schema_id.ok_or_else(|| { + Error::new( + ErrorKind::ValidationFailed, + "Cannot set current schema to last added schema: no schema has been added.", + ) + })?; + }; + let schema_id = schema_id; // Make immutable + + if schema_id == self.metadata.current_schema_id { + return Ok(self); + } + + let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| { + Error::new( + ErrorKind::ValidationFailed, + format!( + "Cannot set current schema to unknown schema with id: '{}'", + schema_id + ), + ) + })?; + + // Old partition specs should be preserved even if they are not compatible with the new schema, + // so that older metadata can still be interpreted. + // We don't need to re-bind the partition specs to the new schema unchecked, because the + // `PartitionSpec` object holds no reference to the schema itself. + // ToDo Discuss: Java fails to add a schema if checkAndAddPartitionName fails (https://github.com/apache/iceberg/blob/64b36999d7ff716ae2534fb0972fcc10d22a64c2/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L393) + // , i.e. if there is a collision between an old partition spec and a column name in the new schema. + // Is this desired behavior even if the partition spec is no longer active? + + // Old sort orders should be preserved even if they are not compatible with the new schema, + // so that older metadata can still be interpreted. + // We don't need to re-bind the sort orders to the new schema unchecked, because the + // `SortOrder` object holds no reference to the schema itself. + + // The currently active spec and sort-order are checked in the build() method. + + self.metadata.current_schema_id = schema_id; + + if self.last_added_schema_id == Some(schema_id) { + self.changes.push(TableUpdate::SetCurrentSchema { + schema_id: Self::LAST_ADDED, + }); + } else { + self.changes + .push(TableUpdate::SetCurrentSchema { schema_id }); + } + + Ok(self) + } + + /// Add a schema and set it as the current schema. + pub fn add_current_schema(self, schema: Schema) -> Result { + self.add_schema(schema).set_current_schema(Self::LAST_ADDED) + } + + /// Add a partition spec to the table metadata. + /// + /// The spec is bound eagerly to the current schema. + /// If a schema is added in the same set of changes, the schema should be added first. + /// + /// Even if `unbound_spec.spec_id` is provided as `Some`, it may not be used. + /// + /// # Errors + /// - The partition spec cannot be bound to the current schema. + /// - The partition spec has non-sequential field ids and the table format version is 1. + pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result { + let new_spec_id = self.reuse_or_create_new_spec_id(&unbound_spec); + let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id); + let unbound_spec = unbound_spec.with_spec_id(new_spec_id); + + if spec_found { + if self.last_added_spec_id != Some(new_spec_id) { + self.changes + .push(TableUpdate::AddSpec { spec: unbound_spec }); + self.last_added_spec_id = Some(new_spec_id); + } + + return Ok(self); + } + + let schema = self.get_current_schema()?; + let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)? + .with_last_assigned_field_id(self.metadata.last_partition_id) + .with_spec_id(new_spec_id) + .build()?; + + if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() { + return Err(Error::new( + ErrorKind::ValidationFailed, + "Cannot add partition spec with non-sequential field ids to format version 1 table", + )); + } + + let highest_field_id = spec.highest_field_id(); + self.metadata + .partition_specs + .insert(new_spec_id, spec.into()); + self.changes + .push(TableUpdate::AddSpec { spec: unbound_spec }); + + self.last_added_spec_id = Some(new_spec_id); + self.metadata.last_partition_id = + std::cmp::max(self.metadata.last_partition_id, highest_field_id); + + Ok(self) + } + + /// Set the default partition spec. + /// + /// # Errors + /// - spec_id is -1 but no spec has been added via `add_partition_spec`. + /// - No partition spec with the provided `spec_id` exists. + pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result { + if spec_id == Self::LAST_ADDED { + spec_id = self.last_added_spec_id.ok_or_else(|| { + Error::new( + ErrorKind::ValidationFailed, + "Cannot set default partition spec to last added spec: no spec has been added.", + ) + })?; + } + + if self.metadata.default_spec_id == spec_id { + return Ok(self); + } + + if !self.metadata.partition_specs.contains_key(&spec_id) { + return Err(Error::new( + ErrorKind::ValidationFailed, + format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",), + )); + } + + self.metadata.default_spec_id = spec_id; + + if self.last_added_spec_id == Some(spec_id) { + self.changes.push(TableUpdate::SetDefaultSpec { + spec_id: Self::LAST_ADDED, + }); + } else { + self.changes.push(TableUpdate::SetDefaultSpec { spec_id }); + } + + Ok(self) + } + + /// Add a partition spec and set it as the default + pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result { + self.add_partition_spec(unbound_spec)? + .set_default_partition_spec(Self::LAST_ADDED) + } + + /// Add a sort order to the table metadata. + /// + /// The spec is bound eagerly to the current schema and must be valid for it. + /// If a schema is added in the same set of changes, the schema should be added first. + /// + /// Even if `sort_order.order_id` is provided, it may not be used. + /// + /// # Errors + /// - Sort order id to add already exists. + /// - Sort order is incompatible with the current schema. + pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result { + let new_order_id = self.reuse_or_create_new_sort_id(&sort_order); + let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id); + + if sort_order_found { + if self.last_added_order_id != Some(new_order_id) { + self.changes.push(TableUpdate::AddSortOrder { + sort_order: sort_order.clone(), + }); + self.last_added_order_id = Some(new_order_id); + } + + return Ok(self); + } + + // ToDo Discuss: Java builds a fresh spec here: + // https://github.com/apache/iceberg/blob/64b36999d7ff716ae2534fb0972fcc10d22a64c2/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1613 + // For rust we make the assumption + // that the order that is added refers to the current schema and check compatibility with it. + + let schema = self.get_current_schema()?.clone().as_ref().clone(); + let sort_order = SortOrder::builder() + .with_order_id(new_order_id) + .with_fields(sort_order.fields) + .build(&schema) + .map_err(|e| { + Error::new( + ErrorKind::ValidationFailed, + format!("Sort order to add is incompatible with current schema: {e}"), + ) + .with_source(e) + })?; + + self.last_added_order_id = Some(new_order_id); + self.metadata + .sort_orders + .insert(new_order_id, sort_order.clone().into()); + self.changes.push(TableUpdate::AddSortOrder { sort_order }); + + Ok(self) + } + + /// Set the default sort order. If `sort_order_id` is -1, the last added sort order is set as default. + /// + /// # Errors + /// - sort_order_id is -1 but no sort order has been added via `add_sort_order`. + /// - No sort order with the provided `sort_order_id` exists. + pub fn set_default_sort_order(mut self, mut sort_order_id: i64) -> Result { + if sort_order_id == Self::LAST_ADDED as i64 { + sort_order_id = self.last_added_order_id.ok_or_else(|| { + Error::new( + ErrorKind::ValidationFailed, + "Cannot set default sort order to last added order: no order has been added.", + ) + })?; + } + + if self.metadata.default_sort_order_id == sort_order_id { + return Ok(self); + } + + if !self.metadata.sort_orders.contains_key(&sort_order_id) { + return Err(Error::new( + ErrorKind::ValidationFailed, + format!( + "Cannot set default sort order to unknown order with id: '{sort_order_id}'" + ), + )); + } + + self.metadata.default_sort_order_id = sort_order_id; + + if self.last_added_order_id == Some(sort_order_id) { + self.changes.push(TableUpdate::SetDefaultSortOrder { + sort_order_id: Self::LAST_ADDED as i64, + }); + } else { + self.changes + .push(TableUpdate::SetDefaultSortOrder { sort_order_id }); + } + + Ok(self) + } + + /// Add a sort order and set it as the default + fn add_default_sort_order(self, sort_order: SortOrder) -> Result { + self.add_sort_order(sort_order)? + .set_default_sort_order(Self::LAST_ADDED as i64) + } + + /// Build the table metadata. + pub fn build(mut self) -> Result { + if self.metadata.last_updated_ms == i64::default() { + self.metadata.last_updated_ms = chrono::Utc::now().timestamp_millis(); + } + + // Check compatibility of the current schema to the default partition spec and sort order. + // We use the `get_xxx` methods from the builder to avoid using the panicking + // `TableMetadata.default_partition_spec` etc. methods. + let schema = self.get_current_schema()?; + let partition_spec = Arc::unwrap_or_clone(self.get_default_partition_spec()?); + let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?); + + partition_spec.into_unbound().bind(schema)?; + SortOrder::builder() + .with_fields(sort_order.fields) + .build(schema)?; + + self.expire_metadata_log(); + self.update_snapshot_log()?; + self.metadata.try_normalize()?; + + if let Some(hist_entry) = self.previous_history_entry.take() { + self.metadata.metadata_log.push(hist_entry); + } + + Ok(TableMetadataBuildResult { + metadata: self.metadata, + changes: self.changes, + }) + } + + fn expire_metadata_log(&mut self) { + let max_size = self + .metadata + .properties + .get(PROPERTY_FORMAT_VERSION) + .and_then(|v| v.parse::().ok()) + .unwrap_or(PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT) + .max(1); + + if self.metadata.metadata_log.len() > max_size { + self.metadata + .metadata_log + .drain(0..self.metadata.metadata_log.len() - max_size); + } + } + + fn update_snapshot_log(&mut self) -> Result<()> { + let intermediate_snapshots = self.get_intermediate_snapshots(); + let has_removed_snapshots = self + .changes + .iter() + .any(|update| matches!(update, TableUpdate::RemoveSnapshots { .. })); + + if intermediate_snapshots.is_empty() && !has_removed_snapshots { + return Ok(()); + } + + let mut new_snapshot_log = Vec::new(); + for log_entry in &self.metadata.snapshot_log { + let snapshot_id = log_entry.snapshot_id; + if self.metadata.snapshots.contains_key(&snapshot_id) { + if !intermediate_snapshots.contains(&snapshot_id) { + new_snapshot_log.push(log_entry.clone()); + } + } else if has_removed_snapshots { + // any invalid entry causes the history before it to be removed. otherwise, there could be + // history gaps that cause time-travel queries to produce incorrect results. for example, + // if history is [(t1, s1), (t2, s2), (t3, s3)] and s2 is removed, the history cannot be + // [(t1, s1), (t3, s3)] because it appears that s3 was current during the time between t2 + // and t3 when in fact s2 was the current snapshot. + new_snapshot_log.clear(); + } + } + + if let Some(current_snapshot_id) = self.metadata.current_snapshot_id { + let last_id = new_snapshot_log.last().map(|entry| entry.snapshot_id); + if last_id != Some(current_snapshot_id) { + return Err(Error::new( + ErrorKind::ValidationFailed, + "Cannot set invalid snapshot log: latest entry is not the current snapshot", + )); + } + }; + + self.metadata.snapshot_log = new_snapshot_log; + Ok(()) + } + + /// Finds intermediate snapshots that have not been committed as the current snapshot. + /// + /// Transactions can create snapshots that are never the current snapshot because several + /// changes are combined by the transaction into one table metadata update. when each + /// intermediate snapshot is added to table metadata, it is added to the snapshot log, assuming + /// that it will be the current snapshot. when there are multiple snapshot updates, the log must + /// be corrected by suppressing the intermediate snapshot entries. + /// + /// A snapshot is an intermediate snapshot if it was added but is not the current snapshot. + fn get_intermediate_snapshots(&self) -> HashSet { + let added_snapshot_ids = self + .changes + .iter() + .filter_map(|update| match update { + TableUpdate::AddSnapshot { snapshot } => Some(snapshot.snapshot_id()), + _ => None, + }) + .collect::>(); + + self.changes + .iter() + .filter_map(|update| match update { + TableUpdate::SetSnapshotRef { + ref_name, + reference, + } => { + if added_snapshot_ids.contains(&reference.snapshot_id) + && ref_name == MAIN_BRANCH + && reference.snapshot_id != self.metadata.current_snapshot_id.unwrap_or(-1) + { + Some(reference.snapshot_id) + } else { + None + } + } + _ => None, + }) + .collect() + } + + fn reassign_ids( + schema: Schema, + spec: UnboundPartitionSpec, + sort_order: SortOrder, + ) -> Result<(Schema, PartitionSpec, SortOrder)> { + // Re-assign field ids and schema ids for a new table. + let previous_id_to_name = schema.field_id_to_name_map().clone(); + let fresh_schema = schema + .into_builder() + .with_schema_id(DEFAULT_SCHEMA_ID) + .with_reassigned_field_ids(None) + .build()?; + + // Re-build partition spec with new ids + let mut fresh_spec = PartitionSpecBuilder::new(&fresh_schema); + for field in spec.fields() { + let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot find source column with id {} for partition column {} in schema.", + field.source_id, field.name + ), + ) + })?; + fresh_spec = + fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?; + } + let fresh_spec = fresh_spec.build()?; + + // Re-build sort order with new ids + let mut fresh_order = SortOrder::builder(); + for mut field in sort_order.fields { + let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot find source column with id {} for sort column in schema.", + field.source_id + ), + ) + })?; + let new_field_id = fresh_schema + .field_by_name(source_field_name) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "Cannot find source column with name {} for sort column in re-assigned schema.", + source_field_name + ), + ) + })?.id; + field.source_id = new_field_id; + fresh_order.with_sort_field(field); + } + let fresh_sort_order = fresh_order.build(&fresh_schema)?; + + Ok((fresh_schema, fresh_spec, fresh_sort_order)) + } + + fn reuse_or_create_new_schema_id(&self, new_schema: &Schema) -> i32 { + self.metadata + .schemas + .iter() + .find_map(|(id, schema)| new_schema.is_same_schema(schema).then_some(*id)) + .unwrap_or_else(|| self.get_highest_schema_id() + 1) + } + + fn get_highest_schema_id(&self) -> i32 { + *self + .metadata + .schemas + .keys() + .max() + .unwrap_or(&self.metadata.current_schema_id) + } + + fn get_current_schema(&self) -> Result<&SchemaRef> { + self.metadata + .schemas + .get(&self.metadata.current_schema_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Current schema with id '{}' not found in table metadata.", + self.metadata.current_schema_id + ), + ) + }) + } + + fn get_default_partition_spec(&self) -> Result { + self.metadata + .partition_specs + .get(&self.metadata.default_spec_id) + .cloned() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Default partition spec with id '{}' not found in table metadata.", + self.metadata.default_spec_id + ), + ) + }) + } + + fn get_default_sort_order(&self) -> Result { + self.metadata + .sort_orders + .get(&self.metadata.default_sort_order_id) + .cloned() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Default sort order with id '{}' not found in table metadata.", + self.metadata.default_sort_order_id + ), + ) + }) + } + + /// If a compatible spec already exists, use the same ID. Otherwise, use 1 more than the highest ID. + fn reuse_or_create_new_spec_id(&self, new_spec: &UnboundPartitionSpec) -> i32 { + self.metadata + .partition_specs + .iter() + .find_map(|(id, old_spec)| old_spec.compatible_with(new_spec).then_some(*id)) + .unwrap_or_else(|| { + self.get_highest_spec_id() + .map(|id| id + 1) + .unwrap_or(DEFAULT_PARTITION_SPEC_ID) + }) + } + + fn get_highest_spec_id(&self) -> Option { + self.metadata.partition_specs.keys().max().copied() + } + + /// If a compatible sort-order already exists, use the same ID. Otherwise, use 1 more than the highest ID. + fn reuse_or_create_new_sort_id(&self, new_sort_order: &SortOrder) -> i64 { + if new_sort_order.is_unsorted() { + return SortOrder::unsorted_order().order_id; + } + + self.metadata + .sort_orders + .iter() + .find_map(|(id, sort_order)| { + sort_order.fields.eq(&new_sort_order.fields).then_some(*id) + }) + .unwrap_or_else(|| { + self.highest_sort_order_id() + .unwrap_or(SortOrder::unsorted_order().order_id) + + 1 + }) + } + + fn highest_sort_order_id(&self) -> Option { + self.metadata.sort_orders.keys().max().copied() + } +} + +impl From for TableMetadata { + fn from(result: TableMetadataBuildResult) -> Self { + result.metadata + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::spec::{ + NestedField, NullOrder, Operation, PrimitiveType, Schema, SnapshotRetention, SortDirection, + SortField, StructType, Summary, Transform, Type, UnboundPartitionField, + }; + + const TEST_LOCATION: &str = "s3://bucket/test/location"; + const LAST_ASSIGNED_COLUMN_ID: i32 = 2; + + fn schema() -> Schema { + Schema::builder() + .with_fields(vec![ + NestedField::required(0, "x", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(1, "y", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "z", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap() + } + + fn sort_order() -> SortOrder { + let schema = schema(); + SortOrder::builder() + .with_order_id(1) + .with_sort_field(SortField { + source_id: 2, + transform: Transform::Bucket(4), + direction: SortDirection::Descending, + null_order: NullOrder::First, + }) + .build(&schema) + .unwrap() + } + + fn partition_spec() -> UnboundPartitionSpec { + UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(1, "y", Transform::Identity) + .unwrap() + .build() + } + + fn builder_without_changes(format_version: FormatVersion) -> TableMetadataBuilder { + TableMetadataBuilder::new( + schema(), + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + format_version, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata + .into_builder("s3://bucket/test/location/metadata/metadata1.json") + } + + #[test] + fn test_minimal_build() { + let metadata = TableMetadataBuilder::new( + schema(), + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + FormatVersion::V1, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + assert_eq!(metadata.format_version, FormatVersion::V1); + assert_eq!(metadata.location, TEST_LOCATION); + assert_eq!(metadata.current_schema_id, 0); + assert_eq!(metadata.default_spec_id, 0); + assert_eq!(metadata.default_sort_order_id, 1); + assert_eq!(metadata.last_partition_id, 1000); + assert_eq!(metadata.last_column_id, 2); + assert_eq!(metadata.snapshots.len(), 0); + assert_eq!(metadata.refs.len(), 0); + assert_eq!(metadata.properties.len(), 0); + assert_eq!(metadata.metadata_log.len(), 0); + assert_eq!(metadata.last_sequence_number, 0); + assert_eq!(metadata.last_column_id, LAST_ASSIGNED_COLUMN_ID); + + // Test can serialize v1 + let _ = serde_json::to_string(&metadata).unwrap(); + + // Test can serialize v2 + let metadata = metadata + .into_builder("s3://bucket/test/location/metadata/metadata1.json") + .upgrade_format_version(FormatVersion::V2) + .unwrap() + .build() + .unwrap() + .metadata; + + assert_eq!(metadata.format_version, FormatVersion::V2); + let _ = serde_json::to_string(&metadata).unwrap(); + } + + #[test] + fn test_reassigns_ids() { + let schema = Schema::builder() + .with_schema_id(10) + .with_fields(vec![ + NestedField::required(11, "a", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(12, "b", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required( + 13, + "struct", + Type::Struct(StructType::new(vec![NestedField::required( + 14, + "nested", + Type::Primitive(PrimitiveType::Long), + ) + .into()])), + ) + .into(), + NestedField::required(15, "c", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + let spec = PartitionSpec::builder(&schema) + .with_spec_id(20) + .add_partition_field("a", "a", Transform::Identity) + .unwrap() + .add_partition_field("struct.nested", "nested_partition", Transform::Identity) + .unwrap() + .build() + .unwrap(); + let sort_order = SortOrder::builder() + .with_fields( + vec![SortField { + source_id: 11, + transform: Transform::Identity, + direction: SortDirection::Ascending, + null_order: NullOrder::First, + }] + .into(), + ) + .with_order_id(10) + .build(&schema) + .unwrap(); + + let (fresh_schema, fresh_spec, fresh_sort_order) = + TableMetadataBuilder::reassign_ids(schema, spec.into_unbound(), sort_order).unwrap(); + + let expected_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(0, "a", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(1, "b", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required( + 2, + "struct", + Type::Struct(StructType::new(vec![NestedField::required( + 4, + "nested", + Type::Primitive(PrimitiveType::Long), + ) + .into()])), + ) + .into(), + NestedField::required(3, "c", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let expected_spec = PartitionSpec::builder(&expected_schema) + .with_spec_id(0) + .add_partition_field("a", "a", Transform::Identity) + .unwrap() + .add_partition_field("struct.nested", "nested_partition", Transform::Identity) + .unwrap() + .build() + .unwrap(); + + let expected_sort_order = SortOrder::builder() + .with_fields( + vec![SortField { + source_id: 0, + transform: Transform::Identity, + direction: SortDirection::Ascending, + null_order: NullOrder::First, + }] + .into(), + ) + .with_order_id(1) + .build(&expected_schema) + .unwrap(); + + assert_eq!(fresh_schema, expected_schema); + assert_eq!(fresh_spec, expected_spec); + assert_eq!(fresh_sort_order, expected_sort_order); + } + + #[test] + fn test_ids_are_reassigned_for_new_metadata() { + let schema = schema().into_builder().with_schema_id(10).build().unwrap(); + + let metadata = TableMetadataBuilder::new( + schema, + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + FormatVersion::V1, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + assert_eq!(metadata.current_schema_id, 0); + assert_eq!(metadata.current_schema().schema_id(), 0); + } + + #[test] + fn test_new_metadata_changes() { + let changes = TableMetadataBuilder::new( + schema(), + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + FormatVersion::V1, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .changes; + + pretty_assertions::assert_eq!(changes, vec![ + TableUpdate::SetLocation { + location: TEST_LOCATION.to_string() + }, + TableUpdate::AddSchema { + last_column_id: Some(LAST_ASSIGNED_COLUMN_ID), + schema: schema(), + }, + TableUpdate::SetCurrentSchema { schema_id: -1 }, + TableUpdate::AddSpec { + // Because this is a new tables, field-ids are assigned + // partition_spec() has None set for field-id + spec: PartitionSpec::builder(&schema()) + .with_spec_id(0) + .add_unbound_field(UnboundPartitionField { + name: "y".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: Some(1000) + }) + .unwrap() + .build() + .unwrap() + .into_unbound(), + }, + TableUpdate::SetDefaultSpec { spec_id: -1 }, + TableUpdate::AddSortOrder { + sort_order: sort_order(), + }, + TableUpdate::SetDefaultSortOrder { sort_order_id: -1 }, + ]); + } + + #[test] + fn test_add_partition_spec() { + let builder = builder_without_changes(FormatVersion::V2); + + let added_spec = UnboundPartitionSpec::builder() + .with_spec_id(10) + .add_partition_fields(vec![ + UnboundPartitionField { + // The previous field - has field_id set + name: "y".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: Some(1000), + }, + UnboundPartitionField { + // A new field without field id - should still be without field id in changes + name: "z".to_string(), + transform: Transform::Identity, + source_id: 2, + field_id: None, + }, + ]) + .unwrap() + .build(); + + let build_result = builder + .add_partition_spec(added_spec.clone()) + .unwrap() + .build() + .unwrap(); + + // Spec id should be re-assigned + let expected_change = added_spec.with_spec_id(1); + let expected_spec = PartitionSpec::builder(&schema()) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + name: "y".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: Some(1000), + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + name: "z".to_string(), + transform: Transform::Identity, + source_id: 2, + field_id: Some(1001), + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 1); + assert_eq!( + build_result.metadata.partition_spec_by_id(1), + Some(&Arc::new(expected_spec)) + ); + assert_eq!(build_result.metadata.last_partition_id, 1001); + pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSpec { + spec: expected_change + }); + } + + #[test] + fn test_add_sort_order() { + let builder = builder_without_changes(FormatVersion::V2); + + let added_sort_order = SortOrder::builder() + .with_order_id(10) + .with_fields(vec![SortField { + source_id: 1, + transform: Transform::Identity, + direction: SortDirection::Ascending, + null_order: NullOrder::First, + }]) + .build(&schema()) + .unwrap(); + + let build_result = builder + .add_sort_order(added_sort_order.clone()) + .unwrap() + .build() + .unwrap(); + + let expected_sort_order = added_sort_order.with_order_id(2); + + assert_eq!(build_result.changes.len(), 1); + assert_eq!(build_result.metadata.sort_orders.keys().max(), Some(&2)); + pretty_assertions::assert_eq!( + build_result.metadata.sort_order_by_id(2), + Some(&Arc::new(expected_sort_order.clone())) + ); + pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSortOrder { + sort_order: expected_sort_order + }); + } + + #[test] + fn test_add_compatible_schema() { + let builder = builder_without_changes(FormatVersion::V2); + + let added_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "x", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(1, "y", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "z", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(3, "a", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let build_result = builder + .add_current_schema(added_schema.clone()) + .unwrap() + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 2); + assert_eq!(build_result.metadata.schemas.keys().max(), Some(&1)); + pretty_assertions::assert_eq!( + build_result.metadata.schema_by_id(1), + Some(&Arc::new(added_schema.clone())) + ); + pretty_assertions::assert_eq!(build_result.changes[0], TableUpdate::AddSchema { + last_column_id: Some(3), + schema: added_schema + }); + assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema { + schema_id: -1 + }); + } + + #[test] + fn test_set_current_schema_change_is_minus_one_if_schema_was_added_in_this_change() { + let builder = builder_without_changes(FormatVersion::V2); + + let added_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "x", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(1, "y", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "z", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(3, "a", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let build_result = builder + .add_schema(added_schema.clone()) + .set_current_schema(1) + .unwrap() + .build() + .unwrap(); + + assert_eq!(build_result.changes.len(), 2); + assert_eq!(build_result.changes[1], TableUpdate::SetCurrentSchema { + schema_id: -1 + }); + } + + #[test] + fn test_no_metadata_log_for_create_table() { + let build_result = TableMetadataBuilder::new( + schema(), + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap(); + + assert_eq!(build_result.metadata.metadata_log.len(), 0); + } + + #[test] + fn test_from_metadata_generates_metadata_log() { + let metadata_path = "s3://bucket/test/location/metadata/metadata1.json"; + let builder = TableMetadataBuilder::new( + schema(), + partition_spec(), + sort_order(), + TEST_LOCATION.to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata + .into_builder(metadata_path); + + let builder = builder + .add_default_sort_order(SortOrder::unsorted_order()) + .unwrap(); + + let build_result = builder.build().unwrap(); + + assert_eq!(build_result.metadata.metadata_log.len(), 1); + assert_eq!( + build_result.metadata.metadata_log[0].metadata_file, + metadata_path + ); + } + + #[test] + fn test_set_ref() { + let builder = builder_without_changes(FormatVersion::V2); + + let snapshot = Snapshot::builder() + .with_snapshot_id(1) + .with_timestamp_ms(builder.last_updated_ms() + 1) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list("/snap-1.avro") + .with_summary(Summary { + operation: Operation::Append, + other: HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string(), + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()), + ]), + }) + .build(); + + let builder = builder.add_snapshot(snapshot.clone()).unwrap(); + + assert!(builder + .clone() + .set_ref(MAIN_BRANCH, SnapshotReference { + snapshot_id: 10, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }) + .unwrap_err() + .to_string() + .contains("Cannot set 'main' to unknown snapshot: '10'")); + + let build_result = builder + .set_ref(MAIN_BRANCH, SnapshotReference { + snapshot_id: 1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }) + .unwrap() + .build() + .unwrap(); + assert_eq!(build_result.metadata.snapshots.len(), 1); + assert_eq!( + build_result.metadata.snapshot_by_id(1), + Some(&Arc::new(snapshot.clone())) + ); + assert_eq!(build_result.metadata.snapshot_log, vec![SnapshotLog { + snapshot_id: 1, + timestamp_ms: snapshot.timestamp_ms() + }]) + } + + #[test] + fn test_snapshot_log_skips_intermediates() { + let builder = builder_without_changes(FormatVersion::V2); + + let snapshot_1 = Snapshot::builder() + .with_snapshot_id(1) + .with_timestamp_ms(builder.last_updated_ms() + 1) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list("/snap-1.avro") + .with_summary(Summary { + operation: Operation::Append, + other: HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string(), + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()), + ]), + }) + .build(); + + let snapshot_2 = Snapshot::builder() + .with_snapshot_id(2) + .with_timestamp_ms(builder.last_updated_ms() + 1) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list("/snap-1.avro") + .with_summary(Summary { + operation: Operation::Append, + other: HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string(), + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()), + ]), + }) + .build(); + + let result = builder + .add_snapshot(snapshot_1) + .unwrap() + .set_ref(MAIN_BRANCH, SnapshotReference { + snapshot_id: 1, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: Some(10), + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + }) + .unwrap() + .append_snapshot(snapshot_2.clone(), Some(MAIN_BRANCH)) + .unwrap() + .build() + .unwrap(); + + assert_eq!(result.metadata.snapshot_log, vec![SnapshotLog { + snapshot_id: 2, + timestamp_ms: snapshot_2.timestamp_ms() + }]); + assert_eq!(result.metadata.current_snapshot().unwrap().snapshot_id(), 2); + } + + #[test] + fn test_cannot_add_duplicate_snapshot_id() { + let builder = builder_without_changes(FormatVersion::V2); + + let snapshot = Snapshot::builder() + .with_snapshot_id(2) + .with_timestamp_ms(builder.last_updated_ms() + 1) + .with_sequence_number(0) + .with_schema_id(0) + .with_manifest_list("/snap-1.avro") + .with_summary(Summary { + operation: Operation::Append, + other: HashMap::from_iter(vec![ + ( + "spark.app.id".to_string(), + "local-1662532784305".to_string(), + ), + ("added-data-files".to_string(), "4".to_string()), + ("added-records".to_string(), "4".to_string()), + ("added-files-size".to_string(), "6001".to_string()), + ]), + }) + .build(); + + let builder = builder.add_snapshot(snapshot.clone()).unwrap(); + builder.add_snapshot(snapshot).unwrap_err(); + } + + #[test] + fn test_add_incompatible_current_schema_fails() { + let builder = builder_without_changes(FormatVersion::V2); + + let added_schema = Schema::builder() + .with_schema_id(1) + .with_fields(vec![]) + .build() + .unwrap(); + + let err = builder + .add_current_schema(added_schema) + .unwrap() + .build() + .unwrap_err(); + + assert!(err + .to_string() + .contains("Cannot find partition source field")); + } + + #[test] + fn test_add_partition_spec_for_v1_requires_sequential_ids() { + let builder = builder_without_changes(FormatVersion::V1); + + let added_spec = UnboundPartitionSpec::builder() + .with_spec_id(10) + .add_partition_fields(vec![ + UnboundPartitionField { + name: "y".to_string(), + transform: Transform::Identity, + source_id: 1, + field_id: Some(1000), + }, + UnboundPartitionField { + name: "z".to_string(), + transform: Transform::Identity, + source_id: 2, + field_id: Some(1002), + }, + ]) + .unwrap() + .build(); + + let err = builder.add_partition_spec(added_spec).unwrap_err(); + + assert!(err.to_string().contains( + "Cannot add partition spec with non-sequential field ids to format version 1 table" + )); + } +} diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index d416383d7..db7c3f28f 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -157,16 +157,7 @@ impl<'a> ReplaceSortOrderAction<'a> { current_schema_id: self.tx.table.metadata().current_schema().schema_id() as i64, }, TableRequirement::DefaultSortOrderIdMatch { - default_sort_order_id: self - .tx - .table - .metadata() - .default_sort_order() - .ok_or(Error::new( - ErrorKind::Unexpected, - "default sort order impossible to be none", - ))? - .order_id, + default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id, }, ];