diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 21d25fb03..aaf6f2e9a 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -373,7 +373,7 @@ mod tests { let expected_sorted_order = SortOrder::builder() .with_order_id(0) .with_fields(vec![]) - .build(&expected_schema) + .build(expected_schema) .unwrap(); assert_eq!( diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 60416692e..73986bcc4 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -44,10 +44,9 @@ pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1; pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0; /// Property key for the format version. -pub const PROPERTY_FORMAT_VERSION: &'static str = "format-version"; +pub const PROPERTY_FORMAT_VERSION: &str = "format-version"; /// Property key for max number of previous versions to keep. -pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &'static str = - "write.metadata.previous-versions-max"; +pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previous-versions-max"; /// Default value for max number of previous versions to keep. pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100; @@ -137,9 +136,9 @@ impl TableMetadata { format_version: FormatVersion, properties: HashMap, ) -> Result { - super::table_metadata_builder::TableMetadataBuilder::new( + Self::builder( schema, - spec.into(), + spec, sort_order, location, format_version, @@ -149,19 +148,32 @@ impl TableMetadata { .map(|x| x.metadata) } + /// Create a new table metadata builder + pub fn builder( + schema: Schema, + spec: impl Into, + sort_order: SortOrder, + location: String, + format_version: FormatVersion, + properties: HashMap, + ) -> Result { + TableMetadataBuilder::new( + schema, + spec.into(), + sort_order, + location, + format_version, + properties, + ) + } + /// Convert this Table Metadata into a builder for modification. /// /// `current_file_location` is the location where the current version /// of the metadata file is stored. This is used to update the metadata log. #[must_use] - pub fn into_builder( - self, - current_file_location: impl Into, - ) -> super::table_metadata_builder::TableMetadataBuilder { - super::table_metadata_builder::TableMetadataBuilder::new_from_metadata( - self, - current_file_location, - ) + pub fn into_builder(self, current_file_location: impl Into) -> TableMetadataBuilder { + TableMetadataBuilder::new_from_metadata(self, current_file_location) } /// Returns format version of this metadata. @@ -277,7 +289,7 @@ impl TableMetadata { pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> { self.refs.get(ref_name).map(|r| { self.snapshot_by_id(r.snapshot_id) - .expect(format!("Snapshot id of ref {} doesn't exist", ref_name).as_str()) + .unwrap_or_else(|| panic!("Snapshot id of ref {} doesn't exist", ref_name)) }) } @@ -397,16 +409,14 @@ impl TableMetadata { if let Some(current_snapshot_id) = self.current_snapshot_id { if current_snapshot_id == EMPTY_SNAPSHOT_ID { self.current_snapshot_id = None; - } else { - if self.snapshot_by_id(current_snapshot_id).is_none() { - return Err(Error::new( + } else if self.snapshot_by_id(current_snapshot_id).is_none() { + return Err(Error::new( ErrorKind::DataInvalid, format!( "Snapshot for current snapshot id {} does not exist in the existing snapshots list", current_snapshot_id ), )); - } } } Ok(()) @@ -438,30 +448,27 @@ impl TableMetadata { )); } } - } else { - if main_ref.is_none() { - return Err(Error::new( - ErrorKind::DataInvalid, - "Current snapshot is not set, but main branch exists", - )); - } + } else if main_ref.is_some() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Current snapshot is not set, but main branch exists", + )); } Ok(()) } fn validate_format_version_specifics(&self) -> Result<()> { - if self.format_version < FormatVersion::V2 { - if self.last_sequence_number != 0 { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Last sequence number must be 0 in v1. Found {}", - self.last_sequence_number - ), - )); - } + if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Last sequence number must be 0 in v1. Found {}", + self.last_sequence_number + ), + )); } + Ok(()) } @@ -1132,7 +1139,7 @@ mod tests { "current-schema-id" : 1, "partition-specs": [ { - "spec-id": 1, + "spec-id": 0, "fields": [ { "source-id": 4, @@ -1143,7 +1150,7 @@ mod tests { ] } ], - "default-spec-id": 1, + "default-spec-id": 0, "last-partition-id": 1000, "properties": { "commit.retry.num-retries": "1" @@ -1154,7 +1161,12 @@ mod tests { "timestamp-ms": 1515100 } ], - "sort-orders": [], + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], "default-sort-order-id": 0 } "#; @@ -1170,7 +1182,7 @@ mod tests { .unwrap(); let partition_spec = PartitionSpec { - spec_id: 1, + spec_id: 0, fields: vec![PartitionField { name: "ts_day".to_string(), transform: Transform::Day, @@ -1187,11 +1199,11 @@ mod tests { last_column_id: 1, schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]), current_schema_id: 1, - partition_specs: HashMap::from_iter(vec![(1, partition_spec.into())]), - default_spec_id: 1, + partition_specs: HashMap::from_iter(vec![(0, partition_spec.into())]), + default_spec_id: 0, last_partition_id: 1000, default_sort_order_id: 0, - sort_orders: HashMap::from_iter(vec![]), + sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]), snapshots: HashMap::default(), current_snapshot_id: None, last_sequence_number: 1, @@ -1663,21 +1675,15 @@ mod tests { default_spec_id: 0, last_partition_id: 0, default_sort_order_id: 0, - sort_orders: HashMap::new(), + // Sort order is added during deserialization for V2 compatibility + sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]), snapshots: HashMap::new(), current_snapshot_id: None, last_sequence_number: 0, properties: HashMap::new(), snapshot_log: vec![], metadata_log: Vec::new(), - refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { - snapshot_id: -1, - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, - })]), + refs: HashMap::new(), }; check_table_metadata_serde(&metadata, expected); diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index fcfbe31db..d2d72cd49 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -45,7 +45,6 @@ pub struct TableMetadataBuildResult { impl TableMetadataBuilder { const LAST_ADDED: i32 = -1; - #[must_use] /// Create a `TableMetadata` object from scratch. /// /// This method re-assign ids of fields in the schema, schema.id, sort_order.id and @@ -76,7 +75,7 @@ impl TableMetadataBuilder { current_schema_id: -1, // Overwritten immediately by add_and_set_current_schema schemas: HashMap::new(), partition_specs: HashMap::new(), - default_spec_id: DEFAULT_PARTITION_SPEC_ID, + default_spec_id: DEFAULT_PARTITION_SPEC_ID, // Overwritten immediately by add_default_partition_spec last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID, properties: HashMap::new(), current_snapshot_id: None, @@ -160,7 +159,7 @@ impl TableMetadataBuilder { /// Get the current schema with all changes applied up to this point. #[inline] pub fn current_schema(&self) -> &SchemaRef { - &self.metadata.current_schema() + self.metadata.current_schema() } /// Get the current last column id @@ -534,7 +533,7 @@ impl TableMetadataBuilder { // Set schema-id let schema = match new_schema_id == schema.schema_id() { true => schema, - false => schema.with_schema_id(new_schema_id.into()), + false => schema.with_schema_id(new_schema_id), }; self.metadata @@ -826,10 +825,10 @@ impl TableMetadataBuilder { let partition_spec = Arc::unwrap_or_clone(self.get_default_partition_spec()?); let sort_order = Arc::unwrap_or_clone(self.get_default_sort_order()?); - partition_spec.to_unbound().bind(&schema)?; + partition_spec.to_unbound().bind(schema)?; SortOrder::builder() .with_fields(sort_order.fields) - .build(&schema)?; + .build(schema)?; self.expire_metadata_log(); self.update_snapshot_log()?; @@ -857,7 +856,7 @@ impl TableMetadataBuilder { if self.metadata.metadata_log.len() > max_size { self.metadata .metadata_log - .drain(0..self.metadata.metadata_log.len() - max_size as usize); + .drain(0..self.metadata.metadata_log.len() - max_size); } } @@ -958,7 +957,7 @@ impl TableMetadataBuilder { // Re-build partition spec with new ids let mut fresh_spec = PartitionSpecBuilder::new(&fresh_schema); - for field in spec.fields().to_owned() { + for field in spec.fields() { let source_field_name = previous_id_to_name.get(&field.source_id).ok_or_else(|| { Error::new( ErrorKind::DataInvalid, @@ -969,7 +968,7 @@ impl TableMetadataBuilder { ) })?; fresh_spec = - fresh_spec.add_partition_field(source_field_name, field.name, field.transform)?; + fresh_spec.add_partition_field(source_field_name, &field.name, field.transform)?; } let fresh_spec = fresh_spec.build()?; @@ -1007,7 +1006,7 @@ impl TableMetadataBuilder { fn handle_set_format_version_property(self, format_version: &str) -> Result { // format_version is either "1" or "2" and should not be persisted in the properties. let format_version = - serde_json::from_str::(&format_version).map_err(|_e| { + serde_json::from_str::(format_version).map_err(|_e| { Error::new( ErrorKind::DataInvalid, format!( @@ -1093,7 +1092,6 @@ impl TableMetadataBuilder { .unwrap_or_else(|| { self.get_highest_spec_id() .unwrap_or(DEFAULT_PARTITION_SPEC_ID) - + 1 }) } @@ -1124,3 +1122,9 @@ impl TableMetadataBuilder { self.metadata.sort_orders.keys().max().copied() } } + +impl From for TableMetadata { + fn from(result: TableMetadataBuildResult) -> Self { + result.metadata + } +}