Skip to content

Commit

Permalink
API,Core: Multi-Table transactions API and support for REST
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed May 11, 2023
1 parent 2a06bb5 commit fa0a20c
Show file tree
Hide file tree
Showing 20 changed files with 2,266 additions and 9 deletions.
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ protected Schema tableSchema() {
return schema;
}

protected TableScanContext context() {
public TableScanContext context() {
return context;
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public String tableName() {
}

public TableMetadata startMetadata() {
return base;
}

public TableMetadata currentMetadata() {
return current;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.rest.RESTCatalogTransaction;
import org.apache.iceberg.rest.RESTSessionCatalog;

public abstract class BaseSessionCatalog implements SessionCatalog {
private final Cache<String, Catalog> catalogs =
Expand Down Expand Up @@ -62,7 +65,13 @@ public <T> T withContext(SessionContext context, Function<Catalog, T> task) {
return task.apply(asCatalog(context));
}

public class AsCatalog implements Catalog, SupportsNamespaces {
public void commitTransaction(SessionContext context, List<TableCommit> commits) {
throw new UnsupportedOperationException(
"commitTransaction is not supported by catalog " + name());
}

public class AsCatalog implements Catalog, SupportsNamespaces, SupportsCatalogTransactions {

private final SessionContext context;

private AsCatalog(SessionContext context) {
Expand Down Expand Up @@ -159,5 +168,19 @@ public boolean removeProperties(Namespace namespace, Set<String> removals) {
public boolean namespaceExists(Namespace namespace) {
return BaseSessionCatalog.this.namespaceExists(context, namespace);
}

@Override
public CatalogTransaction createTransaction(CatalogTransaction.IsolationLevel isolationLevel) {
Preconditions.checkState(
BaseSessionCatalog.this instanceof RESTSessionCatalog,
"Only RESTSessionCatalog currently supports CatalogTransactions");

return new RESTCatalogTransaction(
this, (RESTSessionCatalog) BaseSessionCatalog.this, context, isolationLevel);
}

public void commitTransaction(List<TableCommit> commits) {
BaseSessionCatalog.this.commitTransaction(context, commits);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.catalog;

public interface CatalogTransaction {

enum IsolationLevel {

/**
* All reads that are being made will see the last committed values that existed when the table
* was loaded first inside the catalog transaction. Subsequent changes to a table that happened
* outside the catalog transaction after the table was read will not be seen to prevent <b>read
* skew</b> (reading a table multiple times within the catalog transaction should always return
* the same results). <br>
* <br>
* Will successfully commit only if the values updated by the transaction do not conflict with
* other concurrent updates. <br>
* <br>
*
* <p>Note that under SNAPSHOT isolation a <b>write skew anomaly</b> is acceptable and
* permitted. In a <b>write skew anomaly</b>, two transactions (T1 and T2) concurrently read an
* overlapping data set (e.g. values V1 and V2), concurrently make disjoint updates (e.g. T1
* updates V1, T2 updates V2), and finally concurrently commit, neither having seen the update
* performed by the other.
*/
SNAPSHOT,

/**
* All reads that are being made will see the last committed values that existed when the table
* was loaded first inside the catalog transaction. Subsequent changes to a table that happened
* outside the catalog transaction after the table was read will not be seen to prevent <b>read
* skew</b>.<br>
* <br>
* All tables participating in the transaction must be in the same state when committing
* compared to when the table was loaded first within the catalog transaction.<br>
* <br>
*
* <p>Note that a <b>write skew anomaly</b> is not possible under SERIALIZABLE isolation, where
* two transactions (T1 and T2) concurrently read an overlapping data set (e.g. values V1 and
* V2), concurrently make disjoint updates (e.g. T1 updates V1, T2 updates V2). This is because
* under SERIALIZABLE isolation either T1 or T2 would have to occur first and be visible to the
* other transaction.
*/
SERIALIZABLE;
}

/**
* Performs an atomic commit of all the pending changes across multiple tables. Engine-specific
* implementations must ensure that all pending changes are applied atomically.
*/
void commitTransaction();

/**
* Returns this catalog transaction as a {@link Catalog} API so that any actions that are called
* through this API are participating in this catalog transaction.
*
* @return This catalog transaction as a {@link Catalog} API. Any actions that are called through
* this API are participating in this catalog transaction.
*/
Catalog asCatalog();

/**
* Returns the current {@link IsolationLevel} for this transaction.
*
* @return The {@link IsolationLevel} for this transaction.
*/
IsolationLevel isolationLevel();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.catalog;

import org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel;

public interface SupportsCatalogTransactions {

/**
* Create a new {@link CatalogTransaction} with the given {@link IsolationLevel}.
*
* @param isolationLevel The isolation level to use.
* @return A new {@link CatalogTransaction}.
*/
CatalogTransaction createTransaction(IsolationLevel isolationLevel);
}
33 changes: 33 additions & 0 deletions core/src/main/java/org/apache/iceberg/catalog/TableCommit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.catalog;

import java.util.List;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.TableMetadata;
import org.immutables.value.Value;

@Value.Immutable
public interface TableCommit {
TableIdentifier identifier();

TableMetadata base();

List<MetadataUpdate> changes();
}
36 changes: 36 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.Transactions;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
Expand All @@ -49,8 +50,10 @@
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.rest.requests.CommitTransactionRequest;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.requests.RenameTableRequest;
Expand Down Expand Up @@ -361,4 +364,37 @@ private static TableMetadata commit(TableOperations ops, UpdateTableRequest requ

return ops.current();
}

/**
* This is a very simplistic approach that only validates the requirements for each table and does
* not do any other conflict detection. Therefore, it does not guarantee true transactional
* atomicity, which is left to the implementation details of a REST server.
*/
public static void commitTransaction(Catalog catalog, CommitTransactionRequest request) {
List<Transaction> transactions = Lists.newArrayList();

for (CommitTransactionRequest.CommitTableRequest tableChange : request.tableChanges()) {
Table table = catalog.loadTable(tableChange.identifier());
if (table instanceof BaseTable) {
UpdateTableRequest updateTableRequest =
new UpdateTableRequest(tableChange.requirements(), tableChange.updates());

Transaction transaction =
Transactions.newTransaction(
tableChange.identifier().toString(), ((BaseTable) table).operations());
transactions.add(transaction);

BaseTransaction.TransactionTable txTable =
(BaseTransaction.TransactionTable) transaction.table();

// this performs validations and makes temporary commits that are in-memory
commit(txTable.operations(), updateTableRequest);
} else {
throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTable");
}
}

// only commit if validations passed previously
transactions.forEach(Transaction::commitTransaction);
}
}
23 changes: 22 additions & 1 deletion core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,31 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.BaseSessionCatalog;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.CatalogTransaction;
import org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.SupportsCatalogTransactions;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class RESTCatalog implements Catalog, SupportsNamespaces, Configurable<Object>, Closeable {
public class RESTCatalog
implements Catalog,
SupportsNamespaces,
Configurable<Object>,
Closeable,
SupportsCatalogTransactions {
private final RESTSessionCatalog sessionCatalog;
private final Catalog delegate;
private final SupportsNamespaces nsDelegate;
private final SupportsCatalogTransactions catalogTxDelegate;

public RESTCatalog() {
this(
Expand All @@ -60,6 +71,7 @@ public RESTCatalog(
this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null);
this.delegate = sessionCatalog.asCatalog(context);
this.nsDelegate = (SupportsNamespaces) delegate;
this.catalogTxDelegate = (SupportsCatalogTransactions) delegate;
}

@Override
Expand Down Expand Up @@ -248,4 +260,13 @@ public void setConf(Object conf) {
public void close() throws IOException {
sessionCatalog.close();
}

@Override
public CatalogTransaction createTransaction(IsolationLevel isolationLevel) {
return catalogTxDelegate.createTransaction(isolationLevel);
}

public void commitTransaction(List<TableCommit> commits) {
((BaseSessionCatalog.AsCatalog) delegate).commitTransaction(commits);
}
}
Loading

0 comments on commit fa0a20c

Please sign in to comment.