Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nessie: Extract Catalog client code to NessieClient for Trino Consumption #4491

Merged
merged 3 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 17 additions & 235 deletions nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseMetastoreCatalog;
Expand All @@ -35,41 +33,20 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.Tasks;
import org.projectnessie.client.NessieClientBuilder;
import org.projectnessie.client.NessieConfigConstants;
import org.projectnessie.client.api.CommitMultipleOperationsBuilder;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.http.HttpClientBuilder;
import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.BaseNessieClientServerException;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNamespaceAlreadyExistsException;
import org.projectnessie.error.NessieNamespaceNotEmptyException;
import org.projectnessie.error.NessieNamespaceNotFoundException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.error.NessieReferenceNotFoundException;
import org.projectnessie.model.Branch;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.GetNamespacesResponse;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.Operation;
import org.projectnessie.model.Reference;
import org.projectnessie.model.TableReference;
import org.projectnessie.model.Tag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -86,10 +63,9 @@ public class NessieCatalog extends BaseMetastoreCatalog implements AutoCloseable

private static final Logger LOG = LoggerFactory.getLogger(NessieCatalog.class);
private static final Joiner SLASH = Joiner.on("/");
private NessieApiV1 api;
private NessieIcebergClient client;
private String warehouseLocation;
private Configuration config;
private UpdateableReference reference;
private String name;
private FileIO fileIO;
private Map<String, String> catalogOptions;
Expand All @@ -106,10 +82,6 @@ public void initialize(String inputName, Map<String, String> options) {
// remove nessie prefix
final Function<String, String> removePrefix = x -> x.replace(NessieUtil.NESSIE_CONFIG_PREFIX, "");

this.api = createNessieClientBuilder(options.get(NessieConfigConstants.CONF_NESSIE_CLIENT_BUILDER_IMPL))
.fromConfig(x -> options.get(removePrefix.apply(x)))
.build(NessieApiV1.class);

this.warehouseLocation = options.get(CatalogProperties.WAREHOUSE_LOCATION);
if (warehouseLocation == null) {
// Explicitly log a warning, otherwise the thrown exception can get list in the "silent-ish catch"
Expand All @@ -133,7 +105,10 @@ public void initialize(String inputName, Map<String, String> options) {
throw new IllegalStateException("Parameter 'warehouse' not set, Nessie can't store data.");
}
final String requestedRef = options.get(removePrefix.apply(NessieConfigConstants.CONF_NESSIE_REF));
this.reference = loadReference(requestedRef, null);
NessieApiV1 api = createNessieClientBuilder(options.get(NessieConfigConstants.CONF_NESSIE_CLIENT_BUILDER_IMPL))
nastra marked this conversation as resolved.
Show resolved Hide resolved
.fromConfig(x -> options.get(removePrefix.apply(x)))
.build(NessieApiV1.class);
this.client = new NessieIcebergClient(api, requestedRef, null, catalogOptions);
}

private static NessieClientBuilder<?> createNessieClientBuilder(String customBuilder) {
Expand All @@ -152,7 +127,7 @@ private static NessieClientBuilder<?> createNessieClientBuilder(String customBui

@Override
public void close() {
api.close();
client.close();
}

@Override
Expand All @@ -165,14 +140,9 @@ protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
TableReference tr = TableReference.parse(tableIdentifier.name());
Preconditions.checkArgument(!tr.hasTimestamp(), "Invalid table name: # is only allowed for hashes (reference by " +
"timestamp is not supported)");
UpdateableReference newReference = this.reference;
if (tr.getReference() != null) {
newReference = loadReference(tr.getReference(), tr.getHash());
}
return new NessieTableOperations(
ContentKey.of(org.projectnessie.model.Namespace.of(tableIdentifier.namespace().levels()), tr.getName()),
newReference,
api,
client.withReference(tr.getReference(), tr.getHash()),
fileIO,
catalogOptions);
}
Expand All @@ -187,136 +157,27 @@ protected String defaultWarehouseLocation(TableIdentifier table) {

@Override
public List<TableIdentifier> listTables(Namespace namespace) {
return tableStream(namespace).collect(Collectors.toList());
return client.listTables(namespace);
}

@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
reference.checkMutable();

IcebergTable existingTable = table(identifier);
if (existingTable == null) {
return false;
}

if (purge) {
LOG.info("Purging data for table {} was set to true but is ignored", identifier.toString());
}

CommitMultipleOperationsBuilder commitBuilderBase = api.commitMultipleOperations()
.commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg delete table %s", identifier),
catalogOptions))
.operation(Operation.Delete.of(NessieUtil.toKey(identifier)));

// We try to drop the table. Simple retry after ref update.
boolean threw = true;
try {
Tasks.foreach(commitBuilderBase)
.retry(5)
.stopRetryOn(NessieNotFoundException.class)
.throwFailureWhenFinished()
.onFailure((o, exception) -> refresh())
.run(commitBuilder -> {
Branch branch = commitBuilder
.branch(reference.getAsBranch())
.commit();
reference.updateReference(branch);
}, BaseNessieClientServerException.class);
threw = false;
} catch (NessieConflictException e) {
LOG.error("Cannot drop table: failed after retry (update ref and retry)", e);
} catch (NessieNotFoundException e) {
LOG.error("Cannot drop table: ref is no longer valid.", e);
} catch (BaseNessieClientServerException e) {
LOG.error("Cannot drop table: unknown error", e);
}
return !threw;
return client.dropTable(identifier, purge);
}

@Override
public void renameTable(TableIdentifier from, TableIdentifier toOriginal) {
reference.checkMutable();

TableIdentifier to = NessieUtil.removeCatalogName(toOriginal, name());

IcebergTable existingFromTable = table(from);
if (existingFromTable == null) {
throw new NoSuchTableException("table %s doesn't exists", from.name());
}
IcebergTable existingToTable = table(to);
if (existingToTable != null) {
throw new AlreadyExistsException("table %s already exists", to.name());
}

CommitMultipleOperationsBuilder operations = api.commitMultipleOperations()
.commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg rename table from '%s' to '%s'",
from, to), catalogOptions))
.operation(Operation.Put.of(NessieUtil.toKey(to), existingFromTable, existingFromTable))
.operation(Operation.Delete.of(NessieUtil.toKey(from)));

try {
Tasks.foreach(operations)
.retry(5)
.stopRetryOn(NessieNotFoundException.class)
.throwFailureWhenFinished()
.onFailure((o, exception) -> refresh())
.run(ops -> {
Branch branch = ops
.branch(reference.getAsBranch())
.commit();
reference.updateReference(branch);
}, BaseNessieClientServerException.class);
} catch (NessieNotFoundException e) {
// important note: the NotFoundException refers to the ref only. If a table was not found it would imply that the
// another commit has deleted the table from underneath us. This would arise as a Conflict exception as opposed to
// a not found exception. This is analogous to a merge conflict in git when a table has been changed by one user
// and removed by another.
throw new RuntimeException("Failed to drop table as ref is no longer valid.", e);
} catch (BaseNessieClientServerException e) {
throw new CommitFailedException(e, "Failed to rename table: the current reference is not up to date.");
} catch (HttpClientException ex) {
// Intentionally catch all nessie-client-exceptions here and not just the "timeout" variant
// to catch all kinds of network errors (e.g. connection reset). Network code implementation
// details and all kinds of network devices can induce unexpected behavior. So better be
// safe than sorry.
throw new CommitStateUnknownException(ex);
}
// Intentionally just "throw through" Nessie's HttpClientException here and do not "special case"
// just the "timeout" variant to propagate all kinds of network errors (e.g. connection reset).
// Network code implementation details and all kinds of network devices can induce unexpected
// behavior. So better be safe than sorry.
public void renameTable(TableIdentifier from, TableIdentifier to) {
client.renameTable(from, NessieUtil.removeCatalogName(to, name()));
}

@Override
public void createNamespace(Namespace namespace, Map<String, String> metadata) {
try {
reference.checkMutable();
api.createNamespace()
.reference(reference.getReference())
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
.create();
refresh();
} catch (NessieNamespaceAlreadyExistsException e) {
throw new AlreadyExistsException(e, "Namespace '%s' already exists.", namespace);
} catch (NessieNotFoundException e) {
throw new RuntimeException(String.format("Cannot create Namespace '%s': ref is no longer valid.", namespace), e);
}
client.createNamespace(namespace, metadata);
}

@Override
public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
try {
GetNamespacesResponse response = api.getMultipleNamespaces()
.reference(reference.getReference())
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
.get();
return response.getNamespaces().stream()
.map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
.collect(Collectors.toList());
} catch (NessieReferenceNotFoundException e) {
throw new RuntimeException(
String.format("Cannot list Namespaces starting from '%s': ref is no longer valid.", namespace), e);
}
return client.listNamespaces(namespace);
}

/**
Expand All @@ -328,37 +189,12 @@ public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespac
*/
@Override
public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws NoSuchNamespaceException {
try {
api.getNamespace()
.reference(reference.getReference())
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
.get();
} catch (NessieNamespaceNotFoundException e) {
throw new NoSuchNamespaceException(e, "Namespace '%s' does not exist.", namespace);
} catch (NessieReferenceNotFoundException e) {
throw new RuntimeException(String.format("Cannot load Namespace '%s': ref is no longer valid.", namespace), e);
}
return ImmutableMap.of();
return client.loadNamespaceMetadata(namespace);
}

@Override
public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
try {
reference.checkMutable();
api.deleteNamespace()
.reference(reference.getReference())
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
.delete();
refresh();
return true;
} catch (NessieNamespaceNotFoundException e) {
return false;
} catch (NessieNotFoundException e) {
LOG.error("Cannot drop Namespace '{}': ref is no longer valid.", namespace, e);
return false;
} catch (NessieNamespaceNotEmptyException e) {
throw new NamespaceNotEmptyException(e, "Namespace '%s' is not empty. One or more tables exist.", namespace);
}
return client.dropNamespace(namespace);
}

@Override
Expand All @@ -385,71 +221,17 @@ public Configuration getConf() {
return config;
}

public void refresh() throws NessieNotFoundException {
reference.refresh(api);
}

public String currentHash() {
return reference.getHash();
return client.getRef().getHash();
}

@VisibleForTesting
String currentRefName() {
return reference.getName();
return client.getRef().getName();
}

@VisibleForTesting
FileIO fileIO() {
return fileIO;
}

private IcebergTable table(TableIdentifier tableIdentifier) {
try {
ContentKey key = NessieUtil.toKey(tableIdentifier);
Content table = api.getContent().key(key).reference(reference.getReference()).get().get(key);
return table != null ? table.unwrap(IcebergTable.class).orElse(null) : null;
} catch (NessieNotFoundException e) {
return null;
}
}

private UpdateableReference loadReference(String requestedRef, String hash) {
try {
Reference ref = requestedRef == null ? api.getDefaultBranch()
: api.getReference().refName(requestedRef).get();
if (hash != null) {
if (ref instanceof Branch) {
ref = Branch.of(ref.getName(), hash);
} else {
ref = Tag.of(ref.getName(), hash);
}
}
return new UpdateableReference(ref, hash != null);
} catch (NessieNotFoundException ex) {
if (requestedRef != null) {
throw new IllegalArgumentException(String.format(
"Nessie ref '%s' does not exist. This ref must exist before creating a NessieCatalog.",
requestedRef), ex);
}

throw new IllegalArgumentException(String.format(
"Nessie does not have an existing default branch." +
"Either configure an alternative ref via %s or create the default branch on the server.",
NessieConfigConstants.CONF_NESSIE_REF), ex);
}
}

private Stream<TableIdentifier> tableStream(Namespace namespace) {
try {
return api.getEntries()
.reference(reference.getReference())
.get()
.getEntries()
.stream()
.filter(NessieUtil.namespacePredicate(namespace))
.map(NessieUtil::toIdentifier);
} catch (NessieNotFoundException ex) {
throw new NoSuchNamespaceException(ex, "Unable to list tables due to missing ref. %s", reference.getName());
}
}
}
Loading