diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java index 162167360f6f..b219439fa2b9 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java @@ -23,8 +23,6 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.BaseMetastoreCatalog; @@ -35,41 +33,20 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.common.DynMethods; -import org.apache.iceberg.exceptions.AlreadyExistsException; -import org.apache.iceberg.exceptions.CommitFailedException; -import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; -import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.util.Tasks; import org.projectnessie.client.NessieClientBuilder; import org.projectnessie.client.NessieConfigConstants; -import org.projectnessie.client.api.CommitMultipleOperationsBuilder; import org.projectnessie.client.api.NessieApiV1; import org.projectnessie.client.http.HttpClientBuilder; -import org.projectnessie.client.http.HttpClientException; -import org.projectnessie.error.BaseNessieClientServerException; -import org.projectnessie.error.NessieConflictException; -import org.projectnessie.error.NessieNamespaceAlreadyExistsException; -import org.projectnessie.error.NessieNamespaceNotEmptyException; -import org.projectnessie.error.NessieNamespaceNotFoundException; -import org.projectnessie.error.NessieNotFoundException; -import org.projectnessie.error.NessieReferenceNotFoundException; -import org.projectnessie.model.Branch; -import org.projectnessie.model.Content; import org.projectnessie.model.ContentKey; -import org.projectnessie.model.GetNamespacesResponse; -import org.projectnessie.model.IcebergTable; -import org.projectnessie.model.Operation; -import org.projectnessie.model.Reference; import org.projectnessie.model.TableReference; -import org.projectnessie.model.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,10 +63,9 @@ public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable private static final Logger LOG = LoggerFactory.getLogger(NessieCatalog.class); private static final Joiner SLASH = Joiner.on("/"); - private NessieApiV1 api; + private NessieIcebergClient client; private String warehouseLocation; private Configuration config; - private UpdateableReference reference; private String name; private FileIO fileIO; private Map catalogOptions; @@ -106,10 +82,6 @@ public void initialize(String inputName, Map options) { // remove nessie prefix final Function removePrefix = x -> x.replace(NessieUtil.NESSIE_CONFIG_PREFIX, ""); - this.api = createNessieClientBuilder(options.get(NessieConfigConstants.CONF_NESSIE_CLIENT_BUILDER_IMPL)) - .fromConfig(x -> options.get(removePrefix.apply(x))) - .build(NessieApiV1.class); - this.warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION); if (warehouseLocation == null) { // Explicitly log a warning, otherwise the thrown exception can get list in the "silent-ish catch" @@ -133,7 +105,10 @@ public void initialize(String inputName, Map options) { throw new IllegalStateException("Parameter 'warehouse' not set, Nessie can't store data."); } final String requestedRef = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF)); - this.reference = loadReference(requestedRef, null); + NessieApiV1 api = createNessieClientBuilder(options.get(NessieConfigConstants.CONF_NESSIE_CLIENT_BUILDER_IMPL)) + .fromConfig(x -> options.get(removePrefix.apply(x))) + .build(NessieApiV1.class); + this.client = new NessieIcebergClient(api, requestedRef, null, catalogOptions); } private static NessieClientBuilder createNessieClientBuilder(String customBuilder) { @@ -152,7 +127,7 @@ private static NessieClientBuilder createNessieClientBuilder(String customBui @Override public void close() { - api.close(); + client.close(); } @Override @@ -165,14 +140,9 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) { TableReference tr = TableReference.parse(tableIdentifier.name()); Preconditions.checkArgument(!tr.hasTimestamp(), "Invalid table name: # is only allowed for hashes (reference by " + "timestamp is not supported)"); - UpdateableReference newReference = this.reference; - if (tr.getReference() != null) { - newReference = loadReference(tr.getReference(), tr.getHash()); - } return new NessieTableOperations( ContentKey.of(org.projectnessie.model.Namespace.of(tableIdentifier.namespace().levels()), tr.getName()), - newReference, - api, + client.withReference(tr.getReference(), tr.getHash()), fileIO, catalogOptions); } @@ -187,136 +157,27 @@ protected String defaultWarehouseLocation(TableIdentifier table) { @Override public List listTables(Namespace namespace) { - return tableStream(namespace).collect(Collectors.toList()); + return client.listTables(namespace); } @Override public boolean dropTable(TableIdentifier identifier, boolean purge) { - reference.checkMutable(); - - IcebergTable existingTable = table(identifier); - if (existingTable == null) { - return false; - } - - if (purge) { - LOG.info("Purging data for table {} was set to true but is ignored", identifier.toString()); - } - - CommitMultipleOperationsBuilder commitBuilderBase = api.commitMultipleOperations() - .commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg delete table %s", identifier), - catalogOptions)) - .operation(Operation.Delete.of(NessieUtil.toKey(identifier))); - - // We try to drop the table. Simple retry after ref update. - boolean threw = true; - try { - Tasks.foreach(commitBuilderBase) - .retry(5) - .stopRetryOn(NessieNotFoundException.class) - .throwFailureWhenFinished() - .onFailure((o, exception) -> refresh()) - .run(commitBuilder -> { - Branch branch = commitBuilder - .branch(reference.getAsBranch()) - .commit(); - reference.updateReference(branch); - }, BaseNessieClientServerException.class); - threw = false; - } catch (NessieConflictException e) { - LOG.error("Cannot drop table: failed after retry (update ref and retry)", e); - } catch (NessieNotFoundException e) { - LOG.error("Cannot drop table: ref is no longer valid.", e); - } catch (BaseNessieClientServerException e) { - LOG.error("Cannot drop table: unknown error", e); - } - return !threw; + return client.dropTable(identifier, purge); } @Override - public void renameTable(TableIdentifier from, TableIdentifier toOriginal) { - reference.checkMutable(); - - TableIdentifier to = NessieUtil.removeCatalogName(toOriginal, name()); - - IcebergTable existingFromTable = table(from); - if (existingFromTable == null) { - throw new NoSuchTableException("table %s doesn't exists", from.name()); - } - IcebergTable existingToTable = table(to); - if (existingToTable != null) { - throw new AlreadyExistsException("table %s already exists", to.name()); - } - - CommitMultipleOperationsBuilder operations = api.commitMultipleOperations() - .commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg rename table from '%s' to '%s'", - from, to), catalogOptions)) - .operation(Operation.Put.of(NessieUtil.toKey(to), existingFromTable, existingFromTable)) - .operation(Operation.Delete.of(NessieUtil.toKey(from))); - - try { - Tasks.foreach(operations) - .retry(5) - .stopRetryOn(NessieNotFoundException.class) - .throwFailureWhenFinished() - .onFailure((o, exception) -> refresh()) - .run(ops -> { - Branch branch = ops - .branch(reference.getAsBranch()) - .commit(); - reference.updateReference(branch); - }, BaseNessieClientServerException.class); - } catch (NessieNotFoundException e) { - // important note: the NotFoundException refers to the ref only. If a table was not found it would imply that the - // another commit has deleted the table from underneath us. This would arise as a Conflict exception as opposed to - // a not found exception. This is analogous to a merge conflict in git when a table has been changed by one user - // and removed by another. - throw new RuntimeException("Failed to drop table as ref is no longer valid.", e); - } catch (BaseNessieClientServerException e) { - throw new CommitFailedException(e, "Failed to rename table: the current reference is not up to date."); - } catch (HttpClientException ex) { - // Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant - // to catch all kinds of network errors (e.g. connection reset). Network code implementation - // details and all kinds of network devices can induce unexpected behavior. So better be - // safe than sorry. - throw new CommitStateUnknownException(ex); - } - // Intentionally just "throw through" Nessie's HttpClientException here and do not "special case" - // just the "timeout" variant to propagate all kinds of network errors (e.g. connection reset). - // Network code implementation details and all kinds of network devices can induce unexpected - // behavior. So better be safe than sorry. + public void renameTable(TableIdentifier from, TableIdentifier to) { + client.renameTable(from, NessieUtil.removeCatalogName(to, name())); } @Override public void createNamespace(Namespace namespace, Map metadata) { - try { - reference.checkMutable(); - api.createNamespace() - .reference(reference.getReference()) - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .create(); - refresh(); - } catch (NessieNamespaceAlreadyExistsException e) { - throw new AlreadyExistsException(e, "Namespace '%s' already exists.", namespace); - } catch (NessieNotFoundException e) { - throw new RuntimeException(String.format("Cannot create Namespace '%s': ref is no longer valid.", namespace), e); - } + client.createNamespace(namespace, metadata); } @Override public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { - try { - GetNamespacesResponse response = api.getMultipleNamespaces() - .reference(reference.getReference()) - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .get(); - return response.getNamespaces().stream() - .map(ns -> Namespace.of(ns.getElements().toArray(new String[0]))) - .collect(Collectors.toList()); - } catch (NessieReferenceNotFoundException e) { - throw new RuntimeException( - String.format("Cannot list Namespaces starting from '%s': ref is no longer valid.", namespace), e); - } + return client.listNamespaces(namespace); } /** @@ -328,37 +189,12 @@ public List listNamespaces(Namespace namespace) throws NoSuchNamespac */ @Override public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { - try { - api.getNamespace() - .reference(reference.getReference()) - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .get(); - } catch (NessieNamespaceNotFoundException e) { - throw new NoSuchNamespaceException(e, "Namespace '%s' does not exist.", namespace); - } catch (NessieReferenceNotFoundException e) { - throw new RuntimeException(String.format("Cannot load Namespace '%s': ref is no longer valid.", namespace), e); - } - return ImmutableMap.of(); + return client.loadNamespaceMetadata(namespace); } @Override public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { - try { - reference.checkMutable(); - api.deleteNamespace() - .reference(reference.getReference()) - .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) - .delete(); - refresh(); - return true; - } catch (NessieNamespaceNotFoundException e) { - return false; - } catch (NessieNotFoundException e) { - LOG.error("Cannot drop Namespace '{}': ref is no longer valid.", namespace, e); - return false; - } catch (NessieNamespaceNotEmptyException e) { - throw new NamespaceNotEmptyException(e, "Namespace '%s' is not empty. One or more tables exist.", namespace); - } + return client.dropNamespace(namespace); } @Override @@ -385,71 +221,17 @@ public Configuration getConf() { return config; } - public void refresh() throws NessieNotFoundException { - reference.refresh(api); - } - public String currentHash() { - return reference.getHash(); + return client.getRef().getHash(); } @VisibleForTesting String currentRefName() { - return reference.getName(); + return client.getRef().getName(); } @VisibleForTesting FileIO fileIO() { return fileIO; } - - private IcebergTable table(TableIdentifier tableIdentifier) { - try { - ContentKey key = NessieUtil.toKey(tableIdentifier); - Content table = api.getContent().key(key).reference(reference.getReference()).get().get(key); - return table != null ? table.unwrap(IcebergTable.class).orElse(null) : null; - } catch (NessieNotFoundException e) { - return null; - } - } - - private UpdateableReference loadReference(String requestedRef, String hash) { - try { - Reference ref = requestedRef == null ? api.getDefaultBranch() - : api.getReference().refName(requestedRef).get(); - if (hash != null) { - if (ref instanceof Branch) { - ref = Branch.of(ref.getName(), hash); - } else { - ref = Tag.of(ref.getName(), hash); - } - } - return new UpdateableReference(ref, hash != null); - } catch (NessieNotFoundException ex) { - if (requestedRef != null) { - throw new IllegalArgumentException(String.format( - "Nessie ref '%s' does not exist. This ref must exist before creating a NessieCatalog.", - requestedRef), ex); - } - - throw new IllegalArgumentException(String.format( - "Nessie does not have an existing default branch." + - "Either configure an alternative ref via %s or create the default branch on the server.", - NessieConfigConstants.CONF_NESSIE_REF), ex); - } - } - - private Stream tableStream(Namespace namespace) { - try { - return api.getEntries() - .reference(reference.getReference()) - .get() - .getEntries() - .stream() - .filter(NessieUtil.namespacePredicate(namespace)) - .map(NessieUtil::toIdentifier); - } catch (NessieNotFoundException ex) { - throw new NoSuchNamespaceException(ex, "Unable to list tables due to missing ref. %s", reference.getName()); - } - } } diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java new file mode 100644 index 000000000000..f0a4fe73fe09 --- /dev/null +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.nessie; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.CommitStateUnknownException; +import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.util.Tasks; +import org.projectnessie.client.NessieConfigConstants; +import org.projectnessie.client.api.CommitMultipleOperationsBuilder; +import org.projectnessie.client.api.NessieApiV1; +import org.projectnessie.client.http.HttpClientException; +import org.projectnessie.error.BaseNessieClientServerException; +import org.projectnessie.error.NessieConflictException; +import org.projectnessie.error.NessieNamespaceAlreadyExistsException; +import org.projectnessie.error.NessieNamespaceNotEmptyException; +import org.projectnessie.error.NessieNamespaceNotFoundException; +import org.projectnessie.error.NessieNotFoundException; +import org.projectnessie.error.NessieReferenceNotFoundException; +import org.projectnessie.model.Branch; +import org.projectnessie.model.Content; +import org.projectnessie.model.ContentKey; +import org.projectnessie.model.EntriesResponse; +import org.projectnessie.model.GetNamespacesResponse; +import org.projectnessie.model.IcebergTable; +import org.projectnessie.model.Operation; +import org.projectnessie.model.Reference; +import org.projectnessie.model.Tag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NessieIcebergClient implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(NessieIcebergClient.class); + + private final NessieApiV1 api; + private final Supplier reference; + private final Map catalogOptions; + + public NessieIcebergClient( + NessieApiV1 api, String requestedRef, String requestedHash, Map catalogOptions) { + this.api = api; + this.catalogOptions = catalogOptions; + this.reference = () -> loadReference(requestedRef, requestedHash); + } + + public NessieApiV1 getApi() { + return api; + } + + public UpdateableReference getRef() { + return reference.get(); + } + + public void refresh() throws NessieNotFoundException { + getRef().refresh(api); + } + + public NessieIcebergClient withReference(String requestedRef, String hash) { + if (null == requestedRef) { + return this; + } + return new NessieIcebergClient(getApi(), requestedRef, hash, catalogOptions); + } + + private UpdateableReference loadReference(String requestedRef, String hash) { + try { + Reference ref = + requestedRef == null ? api.getDefaultBranch() : api.getReference().refName(requestedRef).get(); + if (hash != null) { + if (ref instanceof Branch) { + ref = Branch.of(ref.getName(), hash); + } else { + ref = Tag.of(ref.getName(), hash); + } + } + return new UpdateableReference(ref, hash != null); + } catch (NessieNotFoundException ex) { + if (requestedRef != null) { + throw new IllegalArgumentException(String.format("Nessie ref '%s' does not exist", requestedRef), ex); + } + + throw new IllegalArgumentException(String.format("Nessie does not have an existing default branch. " + + "Either configure an alternative ref via '%s' or create the default branch on the server.", + NessieConfigConstants.CONF_NESSIE_REF), ex); + } + } + + public List listTables(Namespace namespace) { + try { + return api.getEntries() + .reference(getRef().getReference()) + .get() + .getEntries() + .stream() + .filter(namespacePredicate(namespace)) + .filter(e -> Content.Type.ICEBERG_TABLE == e.getType()) + .map(this::toIdentifier) + .collect(Collectors.toList()); + } catch (NessieNotFoundException ex) { + throw new NoSuchNamespaceException(ex, "Unable to list tables due to missing ref '%s'", getRef().getName()); + } + } + + private Predicate namespacePredicate(Namespace ns) { + if (ns == null) { + return e -> true; + } + + final List namespace = Arrays.asList(ns.levels()); + return e -> { + List names = e.getName().getElements(); + + if (names.size() <= namespace.size()) { + return false; + } + + return namespace.equals(names.subList(0, namespace.size())); + }; + } + + private TableIdentifier toIdentifier(EntriesResponse.Entry entry) { + List elements = entry.getName().getElements(); + return TableIdentifier.of(elements.toArray(new String[elements.size()])); + } + + public IcebergTable table(TableIdentifier tableIdentifier) { + try { + ContentKey key = NessieUtil.toKey(tableIdentifier); + Content table = api.getContent().key(key).reference(getRef().getReference()).get().get(key); + return table != null ? table.unwrap(IcebergTable.class).orElse(null) : null; + } catch (NessieNotFoundException e) { + return null; + } + } + + public void createNamespace(Namespace namespace, Map metadata) { + try { + getRef().checkMutable(); + getApi().createNamespace() + .reference(getRef().getReference()) + .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) + .create(); + refresh(); + } catch (NessieNamespaceAlreadyExistsException e) { + throw new AlreadyExistsException(e, "Namespace already exists: %s", namespace); + } catch (NessieNotFoundException e) { + throw new RuntimeException(String.format("Cannot create Namespace '%s': " + + "ref '%s' is no longer valid.", namespace, getRef().getName()), e); + } + } + + public List listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + try { + GetNamespacesResponse response = getApi().getMultipleNamespaces() + .reference(getRef().getReference()) + .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) + .get(); + return response.getNamespaces().stream() + .map(ns -> Namespace.of(ns.getElements().toArray(new String[0]))) + .collect(Collectors.toList()); + } catch (NessieReferenceNotFoundException e) { + throw new RuntimeException( + String.format("Cannot list Namespaces starting from '%s': " + + "ref '%s' is no longer valid.", namespace, getRef().getName()), e); + } + } + + public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException { + try { + getRef().checkMutable(); + getApi().deleteNamespace() + .reference(getRef().getReference()) + .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) + .delete(); + refresh(); + return true; + } catch (NessieNamespaceNotFoundException e) { + return false; + } catch (NessieNotFoundException e) { + LOG.error("Cannot drop Namespace '{}': ref '{}' is no longer valid.", namespace, getRef().getName(), e); + return false; + } catch (NessieNamespaceNotEmptyException e) { + throw new NamespaceNotEmptyException(e, "Namespace '%s' is not empty. One or more tables exist.", namespace); + } + } + + public Map loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException { + try { + getApi().getNamespace() + .reference(getRef().getReference()) + .namespace(org.projectnessie.model.Namespace.of(namespace.levels())) + .get(); + } catch (NessieNamespaceNotFoundException e) { + throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", namespace); + } catch (NessieReferenceNotFoundException e) { + throw new RuntimeException(String.format("Cannot load Namespace '%s': " + + "ref '%s' is no longer valid.", namespace, getRef().getName()), e); + } + return ImmutableMap.of(); + } + + public void renameTable(TableIdentifier from, TableIdentifier to) { + getRef().checkMutable(); + + IcebergTable existingFromTable = table(from); + if (existingFromTable == null) { + throw new NoSuchTableException("Table does not exist: %s", from.name()); + } + IcebergTable existingToTable = table(to); + if (existingToTable != null) { + throw new AlreadyExistsException("Table already exists: %s", to.name()); + } + + CommitMultipleOperationsBuilder operations = getApi().commitMultipleOperations() + .commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg rename table from '%s' to '%s'", + from, to), catalogOptions)) + .operation(Operation.Put.of(NessieUtil.toKey(to), existingFromTable, existingFromTable)) + .operation(Operation.Delete.of(NessieUtil.toKey(from))); + + try { + Tasks.foreach(operations) + .retry(5) + .stopRetryOn(NessieNotFoundException.class) + .throwFailureWhenFinished() + .onFailure((o, exception) -> refresh()) + .run(ops -> { + Branch branch = ops + .branch(getRef().getAsBranch()) + .commit(); + getRef().updateReference(branch); + }, BaseNessieClientServerException.class); + } catch (NessieNotFoundException e) { + // important note: the NotFoundException refers to the ref only. If a table was not found it would imply that the + // another commit has deleted the table from underneath us. This would arise as a Conflict exception as opposed to + // a not found exception. This is analogous to a merge conflict in git when a table has been changed by one user + // and removed by another. + throw new RuntimeException(String.format("Cannot rename table '%s' to '%s': " + + "ref '%s' no longer exists.", from.name(), to.name(), getRef().getName()), e); + } catch (BaseNessieClientServerException e) { + throw new CommitFailedException(e, "Cannot rename table '%s' to '%s': " + + "the current reference is not up to date.", from.name(), to.name()); + } catch (HttpClientException ex) { + // Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant + // to catch all kinds of network errors (e.g. connection reset). Network code implementation + // details and all kinds of network devices can induce unexpected behavior. So better be + // safe than sorry. + throw new CommitStateUnknownException(ex); + } + // Intentionally just "throw through" Nessie's HttpClientException here and do not "special case" + // just the "timeout" variant to propagate all kinds of network errors (e.g. connection reset). + // Network code implementation details and all kinds of network devices can induce unexpected + // behavior. So better be safe than sorry. + } + + public boolean dropTable(TableIdentifier identifier, boolean purge) { + getRef().checkMutable(); + + IcebergTable existingTable = table(identifier); + if (existingTable == null) { + return false; + } + + if (purge) { + LOG.info("Purging data for table {} was set to true but is ignored", identifier.toString()); + } + + CommitMultipleOperationsBuilder commitBuilderBase = getApi().commitMultipleOperations() + .commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg delete table %s", identifier), + catalogOptions)) + .operation(Operation.Delete.of(NessieUtil.toKey(identifier))); + + // We try to drop the table. Simple retry after ref update. + boolean threw = true; + try { + Tasks.foreach(commitBuilderBase) + .retry(5) + .stopRetryOn(NessieNotFoundException.class) + .throwFailureWhenFinished() + .onFailure((o, exception) -> refresh()) + .run(commitBuilder -> { + Branch branch = commitBuilder + .branch(getRef().getAsBranch()) + .commit(); + getRef().updateReference(branch); + }, BaseNessieClientServerException.class); + threw = false; + } catch (NessieConflictException e) { + LOG.error("Cannot drop table: failed after retry (update ref '{}' and retry)", getRef().getName(), e); + } catch (NessieNotFoundException e) { + LOG.error("Cannot drop table: ref '{}' is no longer valid.", getRef().getName(), e); + } catch (BaseNessieClientServerException e) { + LOG.error("Cannot drop table: unknown error", e); + } + return !threw; + } + + @Override + public void close() { + if (null != api) { + api.close(); + } + } +} diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java index ab781b16591f..ddb7d9ad39df 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java @@ -30,7 +30,6 @@ import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileIO; -import org.projectnessie.client.api.NessieApiV1; import org.projectnessie.client.http.HttpClientException; import org.projectnessie.error.NessieConflictException; import org.projectnessie.error.NessieNotFoundException; @@ -51,9 +50,8 @@ public class NessieTableOperations extends BaseMetastoreTableOperations { private static final Logger LOG = LoggerFactory.getLogger(NessieTableOperations.class); - private final NessieApiV1 api; + private final NessieIcebergClient client; private final ContentKey key; - private final UpdateableReference reference; private IcebergTable table; private final FileIO fileIO; private final Map catalogOptions; @@ -63,13 +61,11 @@ public class NessieTableOperations extends BaseMetastoreTableOperations { */ NessieTableOperations( ContentKey key, - UpdateableReference reference, - NessieApiV1 api, + NessieIcebergClient client, FileIO fileIO, Map catalogOptions) { this.key = key; - this.reference = reference; - this.api = api; + this.client = client; this.fileIO = fileIO; this.catalogOptions = catalogOptions; } @@ -101,29 +97,29 @@ private TableMetadata loadTableMetadata(String metadataLocation) { @Override protected void doRefresh() { try { - reference.refresh(api); + client.refresh(); } catch (NessieNotFoundException e) { - throw new RuntimeException("Failed to refresh as ref is no longer valid.", e); + throw new RuntimeException(String.format("Failed to refresh as ref '%s' " + + "is no longer valid.", client.getRef().getName()), e); } String metadataLocation = null; try { - Content content = api.getContent().key(key).reference(reference.getReference()).get() + Content content = client.getApi().getContent().key(key).reference(client.getRef().getReference()).get() .get(key); - LOG.debug("Content '{}' at '{}': {}", key, reference.getReference(), content); + LOG.debug("Content '{}' at '{}': {}", key, client.getRef().getReference(), content); if (content == null) { if (currentMetadataLocation() != null) { - throw new NoSuchTableException("No such table %s in %s", key, reference.getReference()); + throw new NoSuchTableException("No such table '%s' in '%s'", key, client.getRef().getReference()); } } else { this.table = content.unwrap(IcebergTable.class) - .orElseThrow(() -> - new IllegalStateException("Cannot refresh iceberg table: " + - String.format("Nessie points to a non-Iceberg object for path: %s.", key))); + .orElseThrow(() -> new IllegalStateException(String.format("Cannot refresh iceberg table: " + + "Nessie points to a non-Iceberg object for path: %s.", key))); metadataLocation = table.getMetadataLocation(); } } catch (NessieNotFoundException ex) { if (currentMetadataLocation() != null) { - throw new NoSuchTableException(ex, "No such table %s", key); + throw new NoSuchTableException(ex, "No such table '%s'", key); } } refreshFromMetadataLocation(metadataLocation, 2); @@ -131,7 +127,7 @@ protected void doRefresh() { @Override protected void doCommit(TableMetadata base, TableMetadata metadata) { - reference.checkMutable(); + client.getRef().checkMutable(); String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); @@ -151,23 +147,23 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { .metadataLocation(newMetadataLocation) .build(); - LOG.debug("Committing '{}' against '{}': {}", key, reference.getReference(), newTable); + LOG.debug("Committing '{}' against '{}': {}", key, client.getRef().getReference(), newTable); ImmutableCommitMeta.Builder builder = ImmutableCommitMeta.builder(); builder.message(buildCommitMsg(base, metadata)); if (isSnapshotOperation(base, metadata)) { builder.putProperties("iceberg.operation", snapshot.operation()); } - Branch branch = api.commitMultipleOperations() + Branch branch = client.getApi().commitMultipleOperations() .operation(Operation.Put.of(key, newTable, table)) .commitMeta(NessieUtil.catalogOptions(builder, catalogOptions).build()) - .branch(reference.getAsBranch()) + .branch(client.getRef().getAsBranch()) .commit(); - reference.updateReference(branch); + client.getRef().updateReference(branch); delete = false; } catch (NessieConflictException ex) { throw new CommitFailedException(ex, "Cannot commit: Reference hash is out of date. " + - "Update the reference %s and try again", reference.getName()); + "Update the reference '%s' and try again", client.getRef().getName()); } catch (HttpClientException ex) { // Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant // to catch all kinds of network errors (e.g. connection reset). Network code implementation @@ -177,7 +173,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { throw new CommitStateUnknownException(ex); } catch (NessieNotFoundException ex) { throw new RuntimeException( - String.format("Cannot commit: Reference %s no longer exists", reference.getName()), ex); + String.format("Cannot commit: Reference '%s' no longer exists", client.getRef().getName()), ex); } finally { if (delete) { io().deleteFile(newMetadataLocation); diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java index 9dc964175895..1bb18875d054 100644 --- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java +++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Predicate; import javax.annotation.Nullable; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Namespace; @@ -32,7 +31,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.projectnessie.model.CommitMeta; import org.projectnessie.model.ContentKey; -import org.projectnessie.model.EntriesResponse; import org.projectnessie.model.ImmutableCommitMeta; public final class NessieUtil { @@ -43,30 +41,6 @@ public final class NessieUtil { private NessieUtil() { } - static Predicate namespacePredicate(Namespace ns) { - // TODO: filter to just iceberg tables. - if (ns == null) { - return e -> true; - } - - final List namespace = Arrays.asList(ns.levels()); - Predicate predicate = e -> { - List names = e.getName().getElements(); - - if (names.size() <= namespace.size()) { - return false; - } - - return namespace.equals(names.subList(0, namespace.size())); - }; - return predicate; - } - - static TableIdentifier toIdentifier(EntriesResponse.Entry entry) { - List elements = entry.getName().getElements(); - return TableIdentifier.of(elements.toArray(new String[elements.size()])); - } - static TableIdentifier removeCatalogName(TableIdentifier to, String name) { String[] levels = to.namespace().levels(); diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java index 3a8b40d7b2b6..4177a9071c05 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestBranchVisibility.java @@ -63,7 +63,6 @@ public TestBranchVisibility() { public void before() throws NessieNotFoundException, NessieConflictException { createTable(tableIdentifier1, 1); // table 1 createTable(tableIdentifier2, 1); // table 2 - catalog.refresh(); createBranch("test", catalog.currentHash()); testCatalog = initCatalog("test"); } @@ -72,7 +71,6 @@ public void before() throws NessieNotFoundException, NessieConflictException { public void after() throws NessieNotFoundException, NessieConflictException { catalog.dropTable(tableIdentifier1); catalog.dropTable(tableIdentifier2); - catalog.refresh(); for (Reference reference : api.getAllReferences().get().getReferences()) { if (!reference.getName().equals("main")) { api.deleteBranch().branch((Branch) reference).delete(); @@ -147,7 +145,6 @@ public void testSchemaSnapshot() throws Exception { String metadataOnTest = addRow(catalog, tableIdentifier1, "initial-data", ImmutableMap.of("id0", 4L)); long snapshotIdOnTest = snapshotIdFromMetadata(catalog, metadataOnTest); - catalog.refresh(); String hashOnTest = catalog.currentHash(); createBranch(branch1, hashOnTest, branchTest); diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java index 20d86ea37e40..73c0409b54c6 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java @@ -38,7 +38,6 @@ import org.apache.iceberg.TableOperations; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.assertj.core.api.Assertions; @@ -92,7 +91,6 @@ public void afterEach() throws Exception { // drop the table data if (tableLocation != null) { tableLocation.getFileSystem(hadoopConfig).delete(tableLocation, true); - catalog.refresh(); catalog.dropTable(TABLE_IDENTIFIER, false); } @@ -323,22 +321,31 @@ public void testExistingTableUpdate() { } @Test - public void testFailure() throws NessieNotFoundException, NessieConflictException { + public void testCommitsOutsideOfCatalogApi() throws NessieNotFoundException, NessieConflictException { Table icebergTable = catalog.loadTable(TABLE_IDENTIFIER); Branch branch = (Branch) api.getReference().refName(BRANCH).get(); - - IcebergTable table = getTable(BRANCH, KEY); + Assertions.assertThat(api.getCommitLog().refName(BRANCH).get().getLogEntries()) + .extracting(e -> e.getCommitMeta().getHash()) + .hasSize(1) + .containsExactly(branch.getHash()); IcebergTable value = IcebergTable.of("dummytable.metadata.json", 42, 42, 42, 42, "cid"); - api.commitMultipleOperations().branch(branch) + // we do a separate manual commit outside of the catalog API + Branch commit = api.commitMultipleOperations().branch(branch) .operation(Operation.Put.of(KEY, value)) .commitMeta(CommitMeta.fromMessage("")) .commit(); + Assertions.assertThat(api.getCommitLog().refName(BRANCH).get().getLogEntries()) + .extracting(e -> e.getCommitMeta().getHash()) + .hasSize(2) + .containsExactly(commit.getHash(), branch.getHash()); - Assertions.assertThatThrownBy(() -> icebergTable.updateSchema().addColumn("data", Types.LongType.get()).commit()) - .isInstanceOf(CommitFailedException.class) - .hasMessage( - "Cannot commit: Reference hash is out of date. Update the reference iceberg-table-test and try again"); + icebergTable.updateSchema().addColumn("data", Types.LongType.get()).commit(); + Branch latest = (Branch) api.getReference().refName(BRANCH).get(); + Assertions.assertThat(api.getCommitLog().refName(BRANCH).get().getLogEntries()) + .extracting(e -> e.getCommitMeta().getHash()) + .hasSize(3) + .containsExactly(latest.getHash(), commit.getHash(), branch.getHash()); } @Test