diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index e98890a86..eee58fc19 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -293,12 +293,8 @@ async fn test_create_table() { assert_eq!(table.metadata().format_version(), FormatVersion::V2); assert!(table.metadata().current_snapshot().is_none()); assert!(table.metadata().history().is_empty()); - assert!(table.metadata().default_sort_order().unwrap().is_unsorted()); - assert!(table - .metadata() - .default_partition_spec() - .unwrap() - .is_unpartitioned()); + assert!(table.metadata().default_sort_order().is_unsorted()); + assert!(table.metadata().default_partition_spec().is_unpartitioned()); } #[tokio::test] diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 3b89a4e6a..731072a5a 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -252,7 +252,7 @@ mod tests { async fn setup_manifest_files(&mut self) { let current_snapshot = self.table.metadata().current_snapshot().unwrap(); let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); - let current_partition_spec = self.table.metadata().default_partition_spec().unwrap(); + let current_partition_spec = self.table.metadata().default_partition_spec(); // Write data files let data_file_manifest = ManifestWriter::new( diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs index f1cb86ab3..bc7f10a0e 100644 --- a/crates/iceberg/src/scan.rs +++ b/crates/iceberg/src/scan.rs @@ -979,7 +979,7 @@ mod tests { .parent_snapshot(self.table.metadata()) .unwrap(); let current_schema = current_snapshot.schema(self.table.metadata()).unwrap(); - let current_partition_spec = self.table.metadata().default_partition_spec().unwrap(); + let current_partition_spec = self.table.metadata().default_partition_spec(); // Write data files let data_file_manifest = ManifestWriter::new( diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs index 5e50a175c..ebe7381bc 100644 --- a/crates/iceberg/src/spec/sort.rs +++ b/crates/iceberg/src/spec/sort.rs @@ -112,7 +112,7 @@ pub struct SortOrder { } impl SortOrder { - const UNSORTED_ORDER_ID: i64 = 0; + pub(crate) const UNSORTED_ORDER_ID: i64 = 0; /// Create sort order builder pub fn builder() -> SortOrderBuilder { diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 16deaac22..cde709375 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -29,20 +29,68 @@ use serde::{Deserialize, Serialize}; use serde_repr::{Deserialize_repr, Serialize_repr}; use uuid::Uuid; -use super::snapshot::{Snapshot, SnapshotReference, SnapshotRetention}; +use super::snapshot::SnapshotReference; use super::{ - PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef, SortOrder, SortOrderRef, - DEFAULT_PARTITION_SPEC_ID, + PartitionSpec, PartitionSpecRef, SchemaId, SchemaRef, Snapshot, SnapshotRef, SnapshotRetention, + SortOrder, SortOrderRef, DEFAULT_PARTITION_SPEC_ID, }; use crate::error::{timestamp_ms_to_utc, Result}; use crate::{Error, ErrorKind, TableCreation}; static MAIN_BRANCH: &str = "main"; -static DEFAULT_SORT_ORDER_ID: i64 = 0; +pub(crate) static ONE_MINUTE_MS: i64 = 60_000; pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1; pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0; +/// Reserved table property for table format version. +/// +/// Iceberg will default a new table's format version to the latest stable and recommended +/// version. This reserved property keyword allows users to override the Iceberg format version of +/// the table metadata. +/// +/// If this table property exists when creating a table, the table will use the specified format +/// version. If a table updates this property, it will try to upgrade to the specified format +/// version. +pub const PROPERTY_FORMAT_VERSION: &str = "format-version"; +/// Reserved table property for table UUID. +pub const PROPERTY_UUID: &str = "uuid"; +/// Reserved table property for the total number of snapshots. +pub const PROPERTY_SNAPSHOT_COUNT: &str = "snapshot-count"; +/// Reserved table property for current snapshot summary. +pub const PROPERTY_CURRENT_SNAPSHOT_SUMMARY: &str = "current-snapshot-summary"; +/// Reserved table property for current snapshot id. +pub const PROPERTY_CURRENT_SNAPSHOT_ID: &str = "current-snapshot-id"; +/// Reserved table property for current snapshot timestamp. +pub const PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP: &str = "current-snapshot-timestamp-ms"; +/// Reserved table property for the JSON representation of current schema. +pub const PROPERTY_CURRENT_SCHEMA: &str = "current-schema"; +/// Reserved table property for the JSON representation of current(default) partition spec. +pub const PROPERTY_DEFAULT_PARTITION_SPEC: &str = "default-partition-spec"; +/// Reserved table property for the JSON representation of current(default) sort order. +pub const PROPERTY_DEFAULT_SORT_ORDER: &str = "default-sort-order"; + +/// Property key for max number of previous versions to keep. +pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previous-versions-max"; +/// Default value for max number of previous versions to keep. +pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100; + +/// Reserved Iceberg table properties list. +/// +/// Reserved table properties are only used to control behaviors when creating or updating a +/// table. The value of these properties are not persisted as a part of the table metadata. +pub const RESERVED_PROPERTIES: [&str; 9] = [ + PROPERTY_FORMAT_VERSION, + PROPERTY_UUID, + PROPERTY_SNAPSHOT_COUNT, + PROPERTY_CURRENT_SNAPSHOT_ID, + PROPERTY_CURRENT_SNAPSHOT_SUMMARY, + PROPERTY_CURRENT_SNAPSHOT_TIMESTAMP, + PROPERTY_CURRENT_SCHEMA, + PROPERTY_DEFAULT_PARTITION_SPEC, + PROPERTY_DEFAULT_SORT_ORDER, +]; + /// Reference to [`TableMetadata`]. pub type TableMetadataRef = Arc; @@ -186,15 +234,9 @@ impl TableMetadata { /// Get default partition spec #[inline] - pub fn default_partition_spec(&self) -> Option<&PartitionSpecRef> { - if self.default_spec_id == DEFAULT_PARTITION_SPEC_ID { - self.partition_spec_by_id(DEFAULT_PARTITION_SPEC_ID) - } else { - Some( - self.partition_spec_by_id(self.default_spec_id) - .expect("Default partition spec id set, but not found in table metadata"), - ) - } + pub fn default_partition_spec(&self) -> &PartitionSpecRef { + self.partition_spec_by_id(self.default_spec_id) + .expect("Default partition spec id set, but not found in table metadata") } /// Returns all snapshots @@ -215,6 +257,12 @@ impl TableMetadata { &self.snapshot_log } + /// Returns the metadata log. + #[inline] + pub fn metadata_log(&self) -> &[MetadataLog] { + &self.metadata_log + } + /// Get current snapshot #[inline] pub fn current_snapshot(&self) -> Option<&SnapshotRef> { @@ -224,6 +272,16 @@ impl TableMetadata { }) } + /// Get the snapshot for a reference + /// Returns an option if the `ref_name` is not found + #[inline] + pub fn snapshot_for_ref(&self, ref_name: &str) -> Option<&SnapshotRef> { + self.refs.get(ref_name).map(|r| { + self.snapshot_by_id(r.snapshot_id) + .unwrap_or_else(|| panic!("Snapshot id of ref {} doesn't exist", ref_name)) + }) + } + /// Return all sort orders. #[inline] pub fn sort_orders_iter(&self) -> impl Iterator { @@ -238,16 +296,10 @@ impl TableMetadata { /// Returns default sort order id. #[inline] - pub fn default_sort_order(&self) -> Option<&SortOrderRef> { - if self.default_sort_order_id == DEFAULT_SORT_ORDER_ID { - self.sort_orders.get(&DEFAULT_SORT_ORDER_ID) - } else { - Some( - self.sort_orders - .get(&self.default_sort_order_id) - .expect("Default order id has been set, but not found in table metadata!"), - ) - } + pub fn default_sort_order(&self) -> &SortOrderRef { + self.sort_orders + .get(&self.default_sort_order_id) + .expect("Default order id has been set, but not found in table metadata!") } /// Returns properties of table. @@ -278,6 +330,219 @@ impl TableMetadata { self.snapshots .insert(snapshot.snapshot_id(), Arc::new(snapshot)); } + + /// Normalize this partition spec. + /// + /// This is an internal method + /// meant to be called after constructing table metadata from untrusted sources. + /// We run this method after json deserialization. + /// All constructors for `TableMetadata` which are part of `iceberg-rust` + /// should return normalized `TableMetadata`. + pub(super) fn try_normalize(&mut self) -> Result<&mut Self> { + self.validate_current_schema()?; + self.normalize_current_snapshot()?; + self.validate_refs()?; + self.validate_chronological_snapshot_logs()?; + self.validate_chronological_metadata_logs()?; + // Normalize location (remove trailing slash) + self.location = self.location.trim_end_matches('/').to_string(); + self.validate_format_version_specifics()?; + self.try_normalize_partition_spec()?; + self.try_normalize_sort_order()?; + Ok(self) + } + + /// If the default partition spec is specified but the spec is not present in specs, add it + fn try_normalize_partition_spec(&mut self) -> Result<()> { + if self.partition_spec_by_id(self.default_spec_id).is_some() { + return Ok(()); + } + + if self.default_spec_id != DEFAULT_PARTITION_SPEC_ID { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "No partition spec exists with the default spec id {}.", + self.default_spec_id + ), + )); + } + + let partition_spec = PartitionSpec { + spec_id: DEFAULT_PARTITION_SPEC_ID, + fields: vec![], + }; + self.partition_specs + .insert(DEFAULT_PARTITION_SPEC_ID, Arc::new(partition_spec)); + + Ok(()) + } + + /// If the default sort order is unsorted but the sort order is not present, add it + fn try_normalize_sort_order(&mut self) -> Result<()> { + if self.sort_order_by_id(self.default_sort_order_id).is_some() { + return Ok(()); + } + + if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "No sort order exists with the default sort order id {}.", + self.default_sort_order_id + ), + )); + } + + let sort_order = SortOrder::unsorted_order(); + self.sort_orders + .insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order)); + Ok(()) + } + + /// Validate the current schema is set and exists. + fn validate_current_schema(&self) -> Result<()> { + if self.schema_by_id(self.current_schema_id).is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "No schema exists with the current schema id {}.", + self.current_schema_id + ), + )); + } + Ok(()) + } + + /// If current snapshot is Some(-1) then set it to None. + fn normalize_current_snapshot(&mut self) -> Result<()> { + if let Some(current_snapshot_id) = self.current_snapshot_id { + if current_snapshot_id == EMPTY_SNAPSHOT_ID { + self.current_snapshot_id = None; + } else if self.snapshot_by_id(current_snapshot_id).is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Snapshot for current snapshot id {} does not exist in the existing snapshots list", + current_snapshot_id + ), + )); + } + } + Ok(()) + } + + /// Validate that all refs are valid (snapshot exists) + fn validate_refs(&self) -> Result<()> { + for (name, snapshot_ref) in self.refs.iter() { + if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Snapshot for reference {name} does not exist in the existing snapshots list" + ), + )); + } + } + + let main_ref = self.refs.get(MAIN_BRANCH); + if self.current_snapshot_id.is_some() { + if let Some(main_ref) = main_ref { + if main_ref.snapshot_id != self.current_snapshot_id.unwrap_or_default() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Current snapshot id does not match main branch ({:?} != {:?})", + self.current_snapshot_id.unwrap_or_default(), + main_ref.snapshot_id + ), + )); + } + } + } else if main_ref.is_some() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Current snapshot is not set, but main branch exists", + )); + } + + Ok(()) + } + + /// Validate that for V1 Metadata the last_sequence_number is 0 + fn validate_format_version_specifics(&self) -> Result<()> { + if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Last sequence number must be 0 in v1. Found {}", + self.last_sequence_number + ), + )); + } + + Ok(()) + } + + /// Validate snapshots logs are chronological and last updated is after the last snapshot log. + fn validate_chronological_snapshot_logs(&self) -> Result<()> { + for window in self.snapshot_log.windows(2) { + let (prev, curr) = (&window[0], &window[1]); + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + "Expected sorted snapshot log entries", + )); + } + } + + if let Some(last) = self.snapshot_log.last() { + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid update timestamp {}: before last snapshot log entry at {}", + self.last_updated_ms, last.timestamp_ms + ), + )); + } + } + Ok(()) + } + + fn validate_chronological_metadata_logs(&self) -> Result<()> { + for window in self.metadata_log.windows(2) { + let (prev, curr) = (&window[0], &window[1]); + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if curr.timestamp_ms - prev.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + "Expected sorted metadata log entries", + )); + } + } + + if let Some(last) = self.metadata_log.last() { + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if self.last_updated_ms - last.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid update timestamp {}: before last metadata log entry at {}", + self.last_updated_ms, last.timestamp_ms + ), + )); + } + } + + Ok(()) + } } /// Manipulating table metadata. @@ -324,15 +589,12 @@ impl TableMetadataBuilder { )) } None => HashMap::from([( - DEFAULT_SORT_ORDER_ID, - Arc::new(SortOrder { - order_id: DEFAULT_SORT_ORDER_ID, - fields: vec![], - }), + SortOrder::UNSORTED_ORDER_ID, + Arc::new(SortOrder::unsorted_order()), )]), }; - let table_metadata = TableMetadata { + let mut table_metadata = TableMetadata { format_version: FormatVersion::V2, table_uuid: Uuid::now_v7(), location: location.ok_or_else(|| { @@ -355,10 +617,12 @@ impl TableMetadataBuilder { snapshot_log: vec![], sort_orders, metadata_log: vec![], - default_sort_order_id: DEFAULT_SORT_ORDER_ID, + default_sort_order_id: SortOrder::UNSORTED_ORDER_ID, refs: Default::default(), }; + table_metadata.try_normalize()?; + Ok(Self(table_metadata)) } @@ -375,6 +639,7 @@ impl TableMetadataBuilder { } pub(super) mod _serde { + use std::borrow::BorrowMut; /// This is a helper module that defines types to help with serialization/deserialization. /// For deserialization the input first gets read into either the [TableMetadataV1] or [TableMetadataV2] struct /// and then converted into the [TableMetadata] struct. Serialization works the other way around. @@ -386,19 +651,18 @@ pub(super) mod _serde { /// [TableMetadataV1] and [TableMetadataV2] are internal struct that are only used for serialization and deserialization. use std::sync::Arc; - use itertools::Itertools; use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::{ FormatVersion, MetadataLog, SnapshotLog, TableMetadata, DEFAULT_PARTITION_SPEC_ID, - DEFAULT_SORT_ORDER_ID, MAIN_BRANCH, + MAIN_BRANCH, }; use crate::spec::schema::_serde::{SchemaV1, SchemaV2}; use crate::spec::snapshot::_serde::{SnapshotV1, SnapshotV2}; use crate::spec::{ PartitionField, PartitionSpec, Schema, Snapshot, SnapshotReference, SnapshotRetention, - SortOrder, EMPTY_SNAPSHOT_ID, + SortOrder, }; use crate::{Error, ErrorKind}; @@ -545,24 +809,14 @@ pub(super) mod _serde { .map(|schema| Ok((schema.schema_id, Arc::new(schema.try_into()?)))) .collect::, Error>>()?, ); - Ok(TableMetadata { + let mut metadata = TableMetadata { format_version: FormatVersion::V2, table_uuid: value.table_uuid, location: value.location, last_sequence_number: value.last_sequence_number, last_updated_ms: value.last_updated_ms, last_column_id: value.last_column_id, - current_schema_id: if schemas.keys().contains(&value.current_schema_id) { - Ok(value.current_schema_id) - } else { - Err(self::Error::new( - ErrorKind::DataInvalid, - format!( - "No schema exists with the current schema id {}.", - value.current_schema_id - ), - )) - }?, + current_schema_id: value.current_schema_id, schemas, partition_specs: HashMap::from_iter( value @@ -607,13 +861,21 @@ pub(super) mod _serde { HashMap::new() } }), - }) + }; + + metadata.borrow_mut().try_normalize()?; + Ok(metadata) } } impl TryFrom for TableMetadata { type Error = Error; fn try_from(value: TableMetadataV1) -> Result { + let current_snapshot_id = if let &Some(-1) = &value.current_snapshot_id { + None + } else { + value.current_snapshot_id + }; let schemas = value .schemas .map(|schemas| { @@ -650,7 +912,7 @@ pub(super) mod _serde { .into_iter() .map(|x| (x.spec_id(), Arc::new(x))), ); - Ok(TableMetadata { + let mut metadata = TableMetadata { format_version: FormatVersion::V1, table_uuid: value.table_uuid.unwrap_or_default(), location: value.location, @@ -668,17 +930,8 @@ pub(super) mod _serde { .unwrap_or_else(|| partition_specs.keys().copied().max().unwrap_or_default()), partition_specs, schemas, - properties: value.properties.unwrap_or_default(), - current_snapshot_id: if let &Some(id) = &value.current_snapshot_id { - if id == EMPTY_SNAPSHOT_ID { - None - } else { - Some(id) - } - } else { - value.current_snapshot_id - }, + current_snapshot_id, snapshots: value .snapshots .map(|snapshots| { @@ -699,16 +952,25 @@ pub(super) mod _serde { ), None => HashMap::new(), }, - default_sort_order_id: value.default_sort_order_id.unwrap_or(DEFAULT_SORT_ORDER_ID), - refs: HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference { - snapshot_id: value.current_snapshot_id.unwrap_or_default(), - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, - })]), - }) + default_sort_order_id: value + .default_sort_order_id + .unwrap_or(SortOrder::UNSORTED_ORDER_ID), + refs: if let Some(snapshot_id) = current_snapshot_id { + HashMap::from_iter(vec![(MAIN_BRANCH.to_string(), SnapshotReference { + snapshot_id, + retention: SnapshotRetention::Branch { + min_snapshots_to_keep: None, + max_snapshot_age_ms: None, + max_ref_age_ms: None, + }, + })]) + } else { + HashMap::new() + }, + }; + + metadata.borrow_mut().try_normalize()?; + Ok(metadata) } } @@ -983,7 +1245,7 @@ mod tests { "current-schema-id" : 1, "partition-specs": [ { - "spec-id": 1, + "spec-id": 0, "fields": [ { "source-id": 4, @@ -994,7 +1256,7 @@ mod tests { ] } ], - "default-spec-id": 1, + "default-spec-id": 0, "last-partition-id": 1000, "properties": { "commit.retry.num-retries": "1" @@ -1005,7 +1267,12 @@ mod tests { "timestamp-ms": 1515100 } ], - "sort-orders": [], + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], "default-sort-order-id": 0 } "#; @@ -1021,7 +1288,7 @@ mod tests { .unwrap(); let partition_spec = PartitionSpec { - spec_id: 1, + spec_id: 0, fields: vec![PartitionField { name: "ts_day".to_string(), transform: Transform::Day, @@ -1038,11 +1305,11 @@ mod tests { last_column_id: 1, schemas: HashMap::from_iter(vec![(1, Arc::new(schema))]), current_schema_id: 1, - partition_specs: HashMap::from_iter(vec![(1, partition_spec.into())]), - default_spec_id: 1, + partition_specs: HashMap::from_iter(vec![(0, partition_spec.into())]), + default_spec_id: 0, last_partition_id: 1000, default_sort_order_id: 0, - sort_orders: HashMap::from_iter(vec![]), + sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]), snapshots: HashMap::default(), current_snapshot_id: None, last_sequence_number: 1, @@ -1227,6 +1494,280 @@ mod tests { check_table_metadata_serde(data, expected); } + #[test] + fn test_current_snapshot_id_must_match_main_branch() { + let data = r#" + { + "format-version" : 2, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 1, + "name": "struct_name", + "required": true, + "type": "fixed[1]" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 0, + "last-partition-id": 1000, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "default-sort-order-id": 0, + "current-snapshot-id" : 1, + "refs" : { + "main" : { + "snapshot-id" : 2, + "type" : "branch" + } + }, + "snapshots" : [ { + "snapshot-id" : 1, + "timestamp-ms" : 1662532818843, + "sequence-number" : 0, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1662532784305", + "added-data-files" : "4", + "added-records" : "4", + "added-files-size" : "6001" + }, + "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro", + "schema-id" : 0 + }, + { + "snapshot-id" : 2, + "timestamp-ms" : 1662532818844, + "sequence-number" : 0, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1662532784305", + "added-data-files" : "4", + "added-records" : "4", + "added-files-size" : "6001" + }, + "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro", + "schema-id" : 0 + } ] + } + "#; + + let err = serde_json::from_str::(data).unwrap_err(); + assert!(err + .to_string() + .contains("Current snapshot id does not match main branch")); + } + + #[test] + fn test_main_without_current() { + let data = r#" + { + "format-version" : 2, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 1, + "name": "struct_name", + "required": true, + "type": "fixed[1]" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 0, + "last-partition-id": 1000, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "default-sort-order-id": 0, + "refs" : { + "main" : { + "snapshot-id" : 1, + "type" : "branch" + } + }, + "snapshots" : [ { + "snapshot-id" : 1, + "timestamp-ms" : 1662532818843, + "sequence-number" : 0, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1662532784305", + "added-data-files" : "4", + "added-records" : "4", + "added-files-size" : "6001" + }, + "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro", + "schema-id" : 0 + } ] + } + "#; + + let err = serde_json::from_str::(data).unwrap_err(); + assert!(err + .to_string() + .contains("Current snapshot is not set, but main branch exists")); + } + + #[test] + fn test_branch_snapshot_missing() { + let data = r#" + { + "format-version" : 2, + "table-uuid": "fb072c92-a02b-11e9-ae9c-1bb7bc9eca94", + "location": "s3://b/wh/data.db/table", + "last-sequence-number" : 1, + "last-updated-ms": 1515100955770, + "last-column-id": 1, + "schemas": [ + { + "schema-id" : 1, + "type" : "struct", + "fields" :[ + { + "id": 1, + "name": "struct_name", + "required": true, + "type": "fixed[1]" + } + ] + } + ], + "current-schema-id" : 1, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "source-id": 4, + "field-id": 1000, + "name": "ts_day", + "transform": "day" + } + ] + } + ], + "default-spec-id": 0, + "last-partition-id": 1000, + "properties": { + "commit.retry.num-retries": "1" + }, + "metadata-log": [ + { + "metadata-file": "s3://bucket/.../v1.json", + "timestamp-ms": 1515100 + } + ], + "sort-orders": [ + { + "order-id": 0, + "fields": [] + } + ], + "default-sort-order-id": 0, + "refs" : { + "main" : { + "snapshot-id" : 1, + "type" : "branch" + }, + "foo" : { + "snapshot-id" : 2, + "type" : "branch" + } + }, + "snapshots" : [ { + "snapshot-id" : 1, + "timestamp-ms" : 1662532818843, + "sequence-number" : 0, + "summary" : { + "operation" : "append", + "spark.app.id" : "local-1662532784305", + "added-data-files" : "4", + "added-records" : "4", + "added-files-size" : "6001" + }, + "manifest-list" : "/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro", + "schema-id" : 0 + } ] + } + "#; + + let err = serde_json::from_str::(data).unwrap_err(); + assert!(err + .to_string() + .contains("Snapshot for reference foo does not exist in the existing snapshots list")); + } + #[test] fn test_invalid_table_uuid() -> Result<()> { let data = r#" @@ -1514,21 +2055,15 @@ mod tests { default_spec_id: 0, last_partition_id: 0, default_sort_order_id: 0, - sort_orders: HashMap::new(), + // Sort order is added during deserialization for V2 compatibility + sort_orders: HashMap::from_iter(vec![(0, SortOrder::unsorted_order().into())]), snapshots: HashMap::new(), current_snapshot_id: None, last_sequence_number: 0, properties: HashMap::new(), snapshot_log: vec![], metadata_log: Vec::new(), - refs: HashMap::from_iter(vec![("main".to_string(), SnapshotReference { - snapshot_id: -1, - retention: SnapshotRetention::Branch { - min_snapshots_to_keep: None, - max_snapshot_age_ms: None, - max_ref_age_ms: None, - }, - })]), + refs: HashMap::new(), }; check_table_metadata_serde(&metadata, expected); @@ -1637,7 +2172,9 @@ mod tests { assert_eq!( table_meta_data.default_partition_spec(), - table_meta_data.partition_spec_by_id(default_spec_id) + table_meta_data + .partition_spec_by_id(default_spec_id) + .unwrap() ); } #[test] @@ -1651,7 +2188,10 @@ mod tests { assert_eq!( table_meta_data.default_sort_order(), - table_meta_data.sort_orders.get(&default_sort_order_id) + table_meta_data + .sort_orders + .get(&default_sort_order_id) + .unwrap() ) } @@ -1703,17 +2243,4 @@ mod tests { )]) ); } - - #[test] - fn test_table_builder_from_table_metadata() { - let table_metadata = get_test_table_metadata("TableMetadataV2Valid.json"); - let table_metadata_builder = TableMetadataBuilder::new(table_metadata); - let uuid = Uuid::new_v4(); - let table_metadata = table_metadata_builder - .assign_uuid(uuid) - .unwrap() - .build() - .unwrap(); - assert_eq!(table_metadata.uuid(), uuid); - } } diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index d416383d7..db7c3f28f 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -157,16 +157,7 @@ impl<'a> ReplaceSortOrderAction<'a> { current_schema_id: self.tx.table.metadata().current_schema().schema_id() as i64, }, TableRequirement::DefaultSortOrderIdMatch { - default_sort_order_id: self - .tx - .table - .metadata() - .default_sort_order() - .ok_or(Error::new( - ErrorKind::Unexpected, - "default sort order impossible to be none", - ))? - .order_id, + default_sort_order_id: self.tx.table.metadata().default_sort_order().order_id, }, ];