From f3c9e0c5a62f6d4ed980f69c54f7e0ce80ddc59b Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Tue, 2 May 2023 16:05:56 +0200 Subject: [PATCH] Core: Add REST API for committing changes against multiple tables --- .../org/apache/iceberg/BaseTransaction.java | 4 + .../apache/iceberg/catalog/TableCommit.java | 52 +++++++ .../apache/iceberg/rest/CatalogHandlers.java | 2 +- .../org/apache/iceberg/rest/RESTCatalog.java | 13 ++ .../iceberg/rest/RESTSessionCatalog.java | 20 +++ .../rest/requests/UpdateTableRequest.java | 2 +- .../iceberg/rest/RESTCatalogAdapter.java | 48 ++++++- .../apache/iceberg/rest/TestRESTCatalog.java | 131 ++++++++++++++++++ 8 files changed, 269 insertions(+), 3 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/catalog/TableCommit.java diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index cef487931b0e..61da776f4c44 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -107,6 +107,10 @@ public String tableName() { } public TableMetadata startMetadata() { + return base; + } + + public TableMetadata currentMetadata() { return current; } diff --git a/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java b/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java new file mode 100644 index 000000000000..a90df56c84f4 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/catalog/TableCommit.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.catalog; + +import java.util.List; +import org.apache.iceberg.MetadataUpdate; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.UpdateRequirement; +import org.apache.iceberg.UpdateRequirements; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.immutables.value.Value; + +@Value.Immutable +public interface TableCommit { + TableIdentifier identifier(); + + List requirements(); + + List updates(); + + static TableCommit create(TableIdentifier identifier, TableMetadata base, TableMetadata updated) { + Preconditions.checkArgument(null != identifier, "Invalid table identifier: null"); + Preconditions.checkArgument(null != base && null != updated, "Invalid table metadata: null"); + Preconditions.checkArgument( + base.uuid().equals(updated.uuid()), + "UUID of base (%s) and updated (%s) table metadata does not match", + base.uuid(), + updated.uuid()); + + return ImmutableTableCommit.builder() + .identifier(identifier) + .requirements(UpdateRequirements.forUpdateTable(base, updated.changes())) + .updates(updated.changes()) + .build(); + } +} diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index eef6204fffdb..6fddb0875721 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -314,7 +314,7 @@ private static TableMetadata create(TableOperations ops, UpdateTableRequest requ return ops.current(); } - private static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { + static TableMetadata commit(TableOperations ops, UpdateTableRequest request) { AtomicBoolean isRetry = new AtomicBoolean(false); try { Tasks.foreach(ops) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java index 71195b9585ef..63b660c46aa3 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java @@ -33,16 +33,19 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable, Closeable { private final RESTSessionCatalog sessionCatalog; private final Catalog delegate; private final SupportsNamespaces nsDelegate; + private final SessionCatalog.SessionContext context; public RESTCatalog() { this( @@ -60,6 +63,7 @@ public RESTCatalog( this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null); this.delegate = sessionCatalog.asCatalog(context); this.nsDelegate = (SupportsNamespaces) delegate; + this.context = context; } @Override @@ -248,4 +252,13 @@ public void setConf(Object conf) { public void close() throws IOException { sessionCatalog.close(); } + + public void commitTransaction(List commits) { + sessionCatalog.commitTransaction(context, commits); + } + + public void commitTransaction(TableCommit... commits) { + sessionCatalog.commitTransaction( + context, ImmutableList.builder().add(commits).build()); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index b83794acca9c..9da4e6e0fbaf 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -52,6 +52,7 @@ import org.apache.iceberg.catalog.BaseSessionCatalog; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -63,13 +64,16 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.apache.iceberg.rest.auth.OAuth2Util; import org.apache.iceberg.rest.auth.OAuth2Util.AuthSession; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; import org.apache.iceberg.rest.requests.UpdateNamespacePropertiesRequest; +import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.CreateNamespaceResponse; import org.apache.iceberg.rest.responses.GetNamespaceResponse; @@ -916,4 +920,20 @@ private Cache newFileIOCloser() { }) .build(); } + + public void commitTransaction(SessionContext context, List commits) { + List tableChanges = Lists.newArrayListWithCapacity(commits.size()); + + for (TableCommit commit : commits) { + tableChanges.add( + new UpdateTableRequest(commit.identifier(), commit.requirements(), commit.updates())); + } + + client.post( + paths.commitTransaction(), + new CommitTransactionRequest(tableChanges), + null, + headers(context), + ErrorHandlers.tableCommitHandler()); + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java b/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java index 3e87be8e9933..3249acfc8f51 100644 --- a/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java +++ b/core/src/main/java/org/apache/iceberg/rest/requests/UpdateTableRequest.java @@ -48,7 +48,7 @@ public UpdateTableRequest( this.updates = updates; } - UpdateTableRequest( + public UpdateTableRequest( TableIdentifier identifier, List requirements, List updates) { diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java index c6d41818441c..2990fcca7899 100644 --- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java +++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java @@ -22,6 +22,11 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.BaseTransaction; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.Transactions; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -40,6 +45,8 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.rest.requests.CommitTransactionRequest; import org.apache.iceberg.rest.requests.CreateNamespaceRequest; import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.RenameTableRequest; @@ -130,7 +137,9 @@ enum Route { HTTPMethod.POST, "v1/namespaces/{namespace}/tables/{table}/metrics", ReportMetricsRequest.class, - null); + null), + COMMIT_TRANSACTION( + HTTPMethod.POST, "v1/transactions/commit", CommitTransactionRequest.class, null); private final HTTPMethod method; private final int requiredLength; @@ -357,12 +366,49 @@ public T handleRequest( return null; } + case COMMIT_TRANSACTION: + { + CommitTransactionRequest request = castRequest(CommitTransactionRequest.class, body); + commitTransaction(catalog, request); + return null; + } + default: } return null; } + /** + * This is a very simplistic approach that only validates the requirements for each table and does + * not do any other conflict detection. Therefore, it does not guarantee true transactional + * atomicity, which is left to the implementation details of a REST server. + */ + private static void commitTransaction(Catalog catalog, CommitTransactionRequest request) { + List transactions = Lists.newArrayList(); + + for (UpdateTableRequest tableChange : request.tableChanges()) { + Table table = catalog.loadTable(tableChange.identifier()); + if (table instanceof BaseTable) { + Transaction transaction = + Transactions.newTransaction( + tableChange.identifier().toString(), ((BaseTable) table).operations()); + transactions.add(transaction); + + BaseTransaction.TransactionTable txTable = + (BaseTransaction.TransactionTable) transaction.table(); + + // this performs validations and makes temporary commits that are in-memory + CatalogHandlers.commit(txTable.operations(), tableChange); + } else { + throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable"); + } + } + + // only commit if validations passed previously + transactions.forEach(Transaction::commitTransaction); + } + public T execute( HTTPMethod method, String path, diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index 02468f3d9b8a..6a5edf6a28c7 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -20,6 +20,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; @@ -38,6 +39,7 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTransaction; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileScanTask; @@ -46,9 +48,16 @@ import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.CatalogTests; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.jdbc.JdbcCatalog; import org.apache.iceberg.metrics.MetricsReport; @@ -1742,4 +1751,126 @@ public void testCatalogTokenRefreshDisabledWithCredential() { eq(catalogHeaders), any()); } + + @Test + public void diffAgainstSingleTable() { + Namespace namespace = Namespace.of("namespace"); + TableIdentifier identifier = TableIdentifier.of(namespace, "multipleDiffsAgainstSingleTable"); + + Table table = catalog().buildTable(identifier, SCHEMA).create(); + Transaction transaction = table.newTransaction(); + + UpdateSchema updateSchema = + transaction.updateSchema().addColumn("new_col", Types.LongType.get()); + Schema expectedSchema = updateSchema.apply(); + updateSchema.commit(); + + UpdatePartitionSpec updateSpec = + transaction.updateSpec().addField("shard", Expressions.bucket("id", 16)); + PartitionSpec expectedSpec = updateSpec.apply(); + updateSpec.commit(); + + TableCommit tableCommit = + TableCommit.create( + identifier, + ((BaseTransaction) transaction).startMetadata(), + ((BaseTransaction) transaction).currentMetadata()); + + restCatalog.commitTransaction(tableCommit); + + Table loaded = catalog().loadTable(identifier); + assertThat(loaded.schema().asStruct()).isEqualTo(expectedSchema.asStruct()); + assertThat(loaded.spec().fields()).isEqualTo(expectedSpec.fields()); + } + + @Test + public void multipleDiffsAgainstMultipleTables() { + Namespace namespace = Namespace.of("multiDiffNamespace"); + TableIdentifier identifier1 = TableIdentifier.of(namespace, "multiDiffTable1"); + TableIdentifier identifier2 = TableIdentifier.of(namespace, "multiDiffTable2"); + + Table table1 = catalog().buildTable(identifier1, SCHEMA).create(); + Table table2 = catalog().buildTable(identifier2, SCHEMA).create(); + Transaction t1Transaction = table1.newTransaction(); + Transaction t2Transaction = table2.newTransaction(); + + UpdateSchema updateSchema = + t1Transaction.updateSchema().addColumn("new_col", Types.LongType.get()); + Schema expectedSchema = updateSchema.apply(); + updateSchema.commit(); + + UpdateSchema updateSchema2 = + t2Transaction.updateSchema().addColumn("new_col2", Types.LongType.get()); + Schema expectedSchema2 = updateSchema2.apply(); + updateSchema2.commit(); + + TableCommit tableCommit1 = + TableCommit.create( + identifier1, + ((BaseTransaction) t1Transaction).startMetadata(), + ((BaseTransaction) t1Transaction).currentMetadata()); + + TableCommit tableCommit2 = + TableCommit.create( + identifier2, + ((BaseTransaction) t2Transaction).startMetadata(), + ((BaseTransaction) t2Transaction).currentMetadata()); + + restCatalog.commitTransaction(tableCommit1, tableCommit2); + + assertThat(catalog().loadTable(identifier1).schema().asStruct()) + .isEqualTo(expectedSchema.asStruct()); + + assertThat(catalog().loadTable(identifier2).schema().asStruct()) + .isEqualTo(expectedSchema2.asStruct()); + } + + @Test + public void multipleDiffsAgainstMultipleTablesLastFails() { + Namespace namespace = Namespace.of("multiDiffNamespace"); + TableIdentifier identifier1 = TableIdentifier.of(namespace, "multiDiffTable1"); + TableIdentifier identifier2 = TableIdentifier.of(namespace, "multiDiffTable2"); + + catalog().createTable(identifier1, SCHEMA); + catalog().createTable(identifier2, SCHEMA); + + Table table1 = catalog().loadTable(identifier1); + Table table2 = catalog().loadTable(identifier2); + Schema originalSchemaOne = table1.schema(); + + Transaction t1Transaction = catalog().loadTable(identifier1).newTransaction(); + t1Transaction.updateSchema().addColumn("new_col1", Types.LongType.get()).commit(); + + Transaction t2Transaction = catalog().loadTable(identifier2).newTransaction(); + t2Transaction.updateSchema().renameColumn("data", "new-column").commit(); + + // delete the colum that is being renamed in the above TX to cause a conflict + table2.updateSchema().deleteColumn("data").commit(); + Schema updatedSchemaTwo = table2.schema(); + + TableCommit tableCommit1 = + TableCommit.create( + identifier1, + ((BaseTransaction) t1Transaction).startMetadata(), + ((BaseTransaction) t1Transaction).currentMetadata()); + + TableCommit tableCommit2 = + TableCommit.create( + identifier2, + ((BaseTransaction) t2Transaction).startMetadata(), + ((BaseTransaction) t2Transaction).currentMetadata()); + + assertThatThrownBy(() -> restCatalog.commitTransaction(tableCommit1, tableCommit2)) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Requirement failed: current schema changed: expected id 0 != 1"); + + Schema schema1 = catalog().loadTable(identifier1).schema(); + assertThat(schema1.asStruct()).isEqualTo(originalSchemaOne.asStruct()); + + Schema schema2 = catalog().loadTable(identifier2).schema(); + assertThat(schema2.asStruct()).isEqualTo(updatedSchemaTwo.asStruct()); + assertThat(schema2.findField("data")).isNull(); + assertThat(schema2.findField("new-column")).isNull(); + assertThat(schema2.columns()).hasSize(1); + } }