Skip to content

Commit

Permalink
Nessie: Extract Catalog client code to NessieClient for Trino Consump…
Browse files Browse the repository at this point in the history
…tion (#4491)

* Nessie: simplify code in Nessie catalog

This adds a new `NessieIcebergClient` class that encapsulates some of
the features that have been used across the `NessieCatalog`.
It also slightly improves the error messages to include the reference or
the table name when something fails.
  • Loading branch information
nastra authored Apr 19, 2022
1 parent 5ded91e commit 9618147
Show file tree
Hide file tree
Showing 6 changed files with 386 additions and 297 deletions.
252 changes: 17 additions & 235 deletions nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseMetastoreCatalog;
Expand All @@ -35,41 +33,20 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.Tasks;
import org.projectnessie.client.NessieClientBuilder;
import org.projectnessie.client.NessieConfigConstants;
import org.projectnessie.client.api.CommitMultipleOperationsBuilder;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.http.HttpClientBuilder;
import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.BaseNessieClientServerException;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNamespaceAlreadyExistsException;
import org.projectnessie.error.NessieNamespaceNotEmptyException;
import org.projectnessie.error.NessieNamespaceNotFoundException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.error.NessieReferenceNotFoundException;
import org.projectnessie.model.Branch;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.GetNamespacesResponse;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.Operation;
import org.projectnessie.model.Reference;
import org.projectnessie.model.TableReference;
import org.projectnessie.model.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -86,10 +63,9 @@ public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable

private static final Logger LOG = LoggerFactory.getLogger(NessieCatalog.class);
private static final Joiner SLASH = Joiner.on("/");
private NessieApiV1 api;
private NessieIcebergClient client;
private String warehouseLocation;
private Configuration config;
private UpdateableReference reference;
private String name;
private FileIO fileIO;
private Map<String, String> catalogOptions;
Expand All @@ -106,10 +82,6 @@ public void initialize(String inputName, Map<String, String> options) {
// remove nessie prefix
final Function<String, String> removePrefix = x -> x.replace(NessieUtil.NESSIE_CONFIG_PREFIX, "");

this.api = createNessieClientBuilder(options.get(NessieConfigConstants.CONF_NESSIE_CLIENT_BUILDER_IMPL))
.fromConfig(x -> options.get(removePrefix.apply(x)))
.build(NessieApiV1.class);

this.warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION);
if (warehouseLocation == null) {
// Explicitly log a warning, otherwise the thrown exception can get list in the "silent-ish catch"
Expand All @@ -133,7 +105,10 @@ public void initialize(String inputName, Map<String, String> options) {
throw new IllegalStateException("Parameter 'warehouse' not set, Nessie can't store data.");
}
final String requestedRef = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF));
this.reference = loadReference(requestedRef, null);
NessieApiV1 api = createNessieClientBuilder(options.get(NessieConfigConstants.CONF_NESSIE_CLIENT_BUILDER_IMPL))
.fromConfig(x -> options.get(removePrefix.apply(x)))
.build(NessieApiV1.class);
this.client = new NessieIcebergClient(api, requestedRef, null, catalogOptions);
}

private static NessieClientBuilder<?> createNessieClientBuilder(String customBuilder) {
Expand All @@ -152,7 +127,7 @@ private static NessieClientBuilder<?> createNessieClientBuilder(String customBui

@Override
public void close() {
api.close();
client.close();
}

@Override
Expand All @@ -165,14 +140,9 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
TableReference tr = TableReference.parse(tableIdentifier.name());
Preconditions.checkArgument(!tr.hasTimestamp(), "Invalid table name: # is only allowed for hashes (reference by " +
"timestamp is not supported)");
UpdateableReference newReference = this.reference;
if (tr.getReference() != null) {
newReference = loadReference(tr.getReference(), tr.getHash());
}
return new NessieTableOperations(
ContentKey.of(org.projectnessie.model.Namespace.of(tableIdentifier.namespace().levels()), tr.getName()),
newReference,
api,
client.withReference(tr.getReference(), tr.getHash()),
fileIO,
catalogOptions);
}
Expand All @@ -187,136 +157,27 @@ protected String defaultWarehouseLocation(TableIdentifier table) {

@Override
public List<TableIdentifier> listTables(Namespace namespace) {
return tableStream(namespace).collect(Collectors.toList());
return client.listTables(namespace);
}

@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
reference.checkMutable();

IcebergTable existingTable = table(identifier);
if (existingTable == null) {
return false;
}

if (purge) {
LOG.info("Purging data for table {} was set to true but is ignored", identifier.toString());
}

CommitMultipleOperationsBuilder commitBuilderBase = api.commitMultipleOperations()
.commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg delete table %s", identifier),
catalogOptions))
.operation(Operation.Delete.of(NessieUtil.toKey(identifier)));

// We try to drop the table. Simple retry after ref update.
boolean threw = true;
try {
Tasks.foreach(commitBuilderBase)
.retry(5)
.stopRetryOn(NessieNotFoundException.class)
.throwFailureWhenFinished()
.onFailure((o, exception) -> refresh())
.run(commitBuilder -> {
Branch branch = commitBuilder
.branch(reference.getAsBranch())
.commit();
reference.updateReference(branch);
}, BaseNessieClientServerException.class);
threw = false;
} catch (NessieConflictException e) {
LOG.error("Cannot drop table: failed after retry (update ref and retry)", e);
} catch (NessieNotFoundException e) {
LOG.error("Cannot drop table: ref is no longer valid.", e);
} catch (BaseNessieClientServerException e) {
LOG.error("Cannot drop table: unknown error", e);
}
return !threw;
return client.dropTable(identifier, purge);
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier toOriginal) {
reference.checkMutable();

TableIdentifier to = NessieUtil.removeCatalogName(toOriginal, name());

IcebergTable existingFromTable = table(from);
if (existingFromTable == null) {
throw new NoSuchTableException("table %s doesn't exists", from.name());
}
IcebergTable existingToTable = table(to);
if (existingToTable != null) {
throw new AlreadyExistsException("table %s already exists", to.name());
}

CommitMultipleOperationsBuilder operations = api.commitMultipleOperations()
.commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg rename table from '%s' to '%s'",
from, to), catalogOptions))
.operation(Operation.Put.of(NessieUtil.toKey(to), existingFromTable, existingFromTable))
.operation(Operation.Delete.of(NessieUtil.toKey(from)));

try {
Tasks.foreach(operations)
.retry(5)
.stopRetryOn(NessieNotFoundException.class)
.throwFailureWhenFinished()
.onFailure((o, exception) -> refresh())
.run(ops -> {
Branch branch = ops
.branch(reference.getAsBranch())
.commit();
reference.updateReference(branch);
}, BaseNessieClientServerException.class);
} catch (NessieNotFoundException e) {
// important note: the NotFoundException refers to the ref only. If a table was not found it would imply that the
// another commit has deleted the table from underneath us. This would arise as a Conflict exception as opposed to
// a not found exception. This is analogous to a merge conflict in git when a table has been changed by one user
// and removed by another.
throw new RuntimeException("Failed to drop table as ref is no longer valid.", e);
} catch (BaseNessieClientServerException e) {
throw new CommitFailedException(e, "Failed to rename table: the current reference is not up to date.");
} catch (HttpClientException ex) {
// Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant
// to catch all kinds of network errors (e.g. connection reset). Network code implementation
// details and all kinds of network devices can induce unexpected behavior. So better be
// safe than sorry.
throw new CommitStateUnknownException(ex);
}
// Intentionally just "throw through" Nessie's HttpClientException here and do not "special case"
// just the "timeout" variant to propagate all kinds of network errors (e.g. connection reset).
// Network code implementation details and all kinds of network devices can induce unexpected
// behavior. So better be safe than sorry.
public void renameTable(TableIdentifier from, TableIdentifier to) {
client.renameTable(from, NessieUtil.removeCatalogName(to, name()));
}

@Override
public void createNamespace(Namespace namespace, Map<String, String> metadata) {
try {
reference.checkMutable();
api.createNamespace()
.reference(reference.getReference())
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
.create();
refresh();
} catch (NessieNamespaceAlreadyExistsException e) {
throw new AlreadyExistsException(e, "Namespace '%s' already exists.", namespace);
} catch (NessieNotFoundException e) {
throw new RuntimeException(String.format("Cannot create Namespace '%s': ref is no longer valid.", namespace), e);
}
client.createNamespace(namespace, metadata);
}

@Override
public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
try {
GetNamespacesResponse response = api.getMultipleNamespaces()
.reference(reference.getReference())
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
.get();
return response.getNamespaces().stream()
.map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
.collect(Collectors.toList());
} catch (NessieReferenceNotFoundException e) {
throw new RuntimeException(
String.format("Cannot list Namespaces starting from '%s': ref is no longer valid.", namespace), e);
}
return client.listNamespaces(namespace);
}

/**
Expand All @@ -328,37 +189,12 @@ public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespac
*/
@Override
public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
try {
api.getNamespace()
.reference(reference.getReference())
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
.get();
} catch (NessieNamespaceNotFoundException e) {
throw new NoSuchNamespaceException(e, "Namespace '%s' does not exist.", namespace);
} catch (NessieReferenceNotFoundException e) {
throw new RuntimeException(String.format("Cannot load Namespace '%s': ref is no longer valid.", namespace), e);
}
return ImmutableMap.of();
return client.loadNamespaceMetadata(namespace);
}

@Override
public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
try {
reference.checkMutable();
api.deleteNamespace()
.reference(reference.getReference())
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
.delete();
refresh();
return true;
} catch (NessieNamespaceNotFoundException e) {
return false;
} catch (NessieNotFoundException e) {
LOG.error("Cannot drop Namespace '{}': ref is no longer valid.", namespace, e);
return false;
} catch (NessieNamespaceNotEmptyException e) {
throw new NamespaceNotEmptyException(e, "Namespace '%s' is not empty. One or more tables exist.", namespace);
}
return client.dropNamespace(namespace);
}

@Override
Expand All @@ -385,71 +221,17 @@ public Configuration getConf() {
return config;
}

public void refresh() throws NessieNotFoundException {
reference.refresh(api);
}

public String currentHash() {
return reference.getHash();
return client.getRef().getHash();
}

@VisibleForTesting
String currentRefName() {
return reference.getName();
return client.getRef().getName();
}

@VisibleForTesting
FileIO fileIO() {
return fileIO;
}

private IcebergTable table(TableIdentifier tableIdentifier) {
try {
ContentKey key = NessieUtil.toKey(tableIdentifier);
Content table = api.getContent().key(key).reference(reference.getReference()).get().get(key);
return table != null ? table.unwrap(IcebergTable.class).orElse(null) : null;
} catch (NessieNotFoundException e) {
return null;
}
}

private UpdateableReference loadReference(String requestedRef, String hash) {
try {
Reference ref = requestedRef == null ? api.getDefaultBranch()
: api.getReference().refName(requestedRef).get();
if (hash != null) {
if (ref instanceof Branch) {
ref = Branch.of(ref.getName(), hash);
} else {
ref = Tag.of(ref.getName(), hash);
}
}
return new UpdateableReference(ref, hash != null);
} catch (NessieNotFoundException ex) {
if (requestedRef != null) {
throw new IllegalArgumentException(String.format(
"Nessie ref '%s' does not exist. This ref must exist before creating a NessieCatalog.",
requestedRef), ex);
}

throw new IllegalArgumentException(String.format(
"Nessie does not have an existing default branch." +
"Either configure an alternative ref via %s or create the default branch on the server.",
NessieConfigConstants.CONF_NESSIE_REF), ex);
}
}

private Stream<TableIdentifier> tableStream(Namespace namespace) {
try {
return api.getEntries()
.reference(reference.getReference())
.get()
.getEntries()
.stream()
.filter(NessieUtil.namespacePredicate(namespace))
.map(NessieUtil::toIdentifier);
} catch (NessieNotFoundException ex) {
throw new NoSuchNamespaceException(ex, "Unable to list tables due to missing ref. %s", reference.getName());
}
}
}
Loading

0 comments on commit 9618147

Please sign in to comment.