Skip to content

Commit

Permalink
Implement TableMetadataBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
c-thiel committed Aug 30, 2024
1 parent 7aa8bdd commit 4193131
Show file tree
Hide file tree
Showing 19 changed files with 2,950 additions and 235 deletions.
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ impl Catalog for GlueCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;

Ok(metadata)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/glue/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,9 @@ mod tests {
.location("my_location".to_string())
.schema(schema)
.build();
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;

Ok(metadata)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ impl Catalog for HmsCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = create_metadata_location(&location, 0)?;

self.file_io
Expand Down
6 changes: 4 additions & 2 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ impl Catalog for MemoryCatalog {
}
};

let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;
let metadata_location = format!(
"{}/metadata/{}-{}.metadata.json",
&location,
Expand Down Expand Up @@ -371,7 +373,7 @@ mod tests {
let expected_sorted_order = SortOrder::builder()
.with_order_id(0)
.with_fields(vec![])
.build(expected_schema.clone())
.build(expected_schema)
.unwrap();

assert_eq!(
Expand Down
8 changes: 2 additions & 6 deletions crates/catalog/rest/tests/rest_catalog_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,8 @@ async fn test_create_table() {
assert_eq!(table.metadata().format_version(), FormatVersion::V2);
assert!(table.metadata().current_snapshot().is_none());
assert!(table.metadata().history().is_empty());
assert!(table.metadata().default_sort_order().unwrap().is_unsorted());
assert!(table
.metadata()
.default_partition_spec()
.unwrap()
.is_unpartitioned());
assert!(table.metadata().default_sort_order().is_unsorted());
assert!(table.metadata().default_partition_spec().is_unpartitioned());
}

#[tokio::test]
Expand Down
53 changes: 48 additions & 5 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,46 @@ impl TableUpdate {
/// Applies the update to the table metadata builder.
pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
match self {
TableUpdate::AssignUuid { uuid } => builder.assign_uuid(uuid),
_ => unimplemented!(),
TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
TableUpdate::AddSchema {
schema,
last_column_id,
} => {
if let Some(last_column_id) = last_column_id {
if builder.last_column_id() < last_column_id {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Invalid last column ID: {last_column_id} < {} (previous last column ID)",
builder.last_column_id()
),
));
}
};
Ok(builder.add_schema(schema))
}
TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
TableUpdate::AddSortOrder { sort_order } => builder.add_sort_order(sort_order),
TableUpdate::SetDefaultSortOrder { sort_order_id } => {
builder.set_default_sort_order(sort_order_id)
}
TableUpdate::AddSnapshot { snapshot } => builder.add_snapshot(snapshot),
TableUpdate::SetSnapshotRef {
ref_name,
reference,
} => builder.set_ref(&ref_name, reference),
TableUpdate::RemoveSnapshots { snapshot_ids } => {
Ok(builder.remove_snapshots(&snapshot_ids))
}
TableUpdate::RemoveSnapshotRef { ref_name } => Ok(builder.remove_ref(&ref_name)),
TableUpdate::SetLocation { location } => Ok(builder.set_location(location)),
TableUpdate::SetProperties { updates } => builder.set_properties(updates),
TableUpdate::RemoveProperties { removals } => Ok(builder.remove_properties(&removals)),
TableUpdate::UpgradeFormatVersion { format_version } => {
builder.upgrade_format_version(format_version)
}
}
}
}
Expand Down Expand Up @@ -1102,16 +1140,21 @@ mod tests {
let table_metadata = TableMetadataBuilder::from_table_creation(table_creation)
.unwrap()
.build()
.unwrap();
let table_metadata_builder = TableMetadataBuilder::new(table_metadata);
.unwrap()
.metadata;
let table_metadata_builder = TableMetadataBuilder::new_from_metadata(
table_metadata,
"s3://db/table/metadata/metadata1.gz.json",
);

let uuid = uuid::Uuid::new_v4();
let update = TableUpdate::AssignUuid { uuid };
let updated_metadata = update
.apply(table_metadata_builder)
.unwrap()
.build()
.unwrap();
.unwrap()
.metadata;
assert_eq!(updated_metadata.uuid(), uuid);
}
}
6 changes: 6 additions & 0 deletions crates/iceberg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ pub enum ErrorKind {
///
/// This error is returned when given iceberg feature is not supported.
FeatureUnsupported,
/// Validation failed.
///
/// This error is returned when Table or View Metadata is manipulated
/// in forbidden ways that would produce corrupt metadata.
ValidationFailed,
}

impl ErrorKind {
Expand All @@ -59,6 +64,7 @@ impl From<ErrorKind> for &'static str {
ErrorKind::Unexpected => "Unexpected",
ErrorKind::DataInvalid => "DataInvalid",
ErrorKind::FeatureUnsupported => "FeatureUnsupported",
ErrorKind::ValidationFailed => "ValidationFailed",
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ mod tests {
async fn setup_manifest_files(&mut self) {
let current_snapshot = self.table.metadata().current_snapshot().unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec().unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec();

// Write data files
let data_file_manifest = ManifestWriter::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ mod tests {
.parent_snapshot(self.table.metadata())
.unwrap();
let current_schema = current_snapshot.schema(self.table.metadata()).unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec().unwrap();
let current_partition_spec = self.table.metadata().default_partition_spec();

// Write data files
let data_file_manifest = ManifestWriter::new(
Expand Down
6 changes: 6 additions & 0 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,12 @@ impl NestedField {
self.write_default = Some(value);
self
}

/// Set the id of the field.
pub(crate) fn with_id(mut self, id: i32) -> Self {
self.id = id;
self
}
}

impl fmt::Display for NestedField {
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/spec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod schema;
mod snapshot;
mod sort;
mod table_metadata;
mod table_metadata_builder;
mod transform;
mod values;
mod view_metadata;
Expand Down
67 changes: 66 additions & 1 deletion crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,63 @@ impl PartitionSpec {
/// Turn this partition spec into an unbound partition spec.
///
/// The `field_id` is retained as `partition_id` in the unbound partition spec.
pub fn to_unbound(self) -> UnboundPartitionSpec {
pub fn into_unbound(self) -> UnboundPartitionSpec {
self.into()
}

/// Check if this partition spec is compatible with another partition spec.
///
/// Returns true if the partition spec is equal to the other spec with partition field ids ignored and
/// spec_id ignored. The following must be identical:
/// * The number of fields
/// * Field order
/// * Field names
/// * Source column ids
/// * Transforms
pub fn compatible_with(&self, other: &UnboundPartitionSpec) -> bool {
if self.fields.len() != other.fields.len() {
return false;
}

for (this_field, other_field) in self.fields.iter().zip(&other.fields) {
if this_field.source_id != other_field.source_id
|| this_field.transform != other_field.transform
|| this_field.name != other_field.name
{
return false;
}
}

true
}

/// Check if this partition spec has sequential partition ids.
/// Sequential ids start from 1000 and increment by 1 for each field.
/// This is required for spec version 1
pub fn has_sequential_ids(&self) -> bool {
for (index, field) in self.fields.iter().enumerate() {
let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64)
.checked_add(1)
.and_then(|id| id.checked_add(index as i64))
.unwrap_or(i64::MAX);

if field.field_id as i64 != expected_id {
return false;
}
}

true
}

/// Get the highest field id in the partition spec.
/// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999).
pub fn highest_field_id(&self) -> i32 {
self.fields
.iter()
.map(|f| f.field_id)
.max()
.unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID)
}
}

/// Reference to [`UnboundPartitionSpec`].
Expand Down Expand Up @@ -171,6 +225,14 @@ impl UnboundPartitionSpec {
pub fn fields(&self) -> &[UnboundPartitionField] {
&self.fields
}

/// Change the spec id of the partition spec
pub fn with_spec_id(self, spec_id: i32) -> Self {
Self {
spec_id: Some(spec_id),
..self
}
}
}

impl From<PartitionField> for UnboundPartitionField {
Expand Down Expand Up @@ -1263,4 +1325,7 @@ mod tests {
}]
});
}

#[test]
fn test_has_sequential_ids() {}
}
Loading

0 comments on commit 4193131

Please sign in to comment.