diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/config/ConfigurationProvider.java b/kernel/kernel-api/src/main/java/io/delta/kernel/config/ConfigurationProvider.java new file mode 100644 index 0000000000..b462b9fb7d --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/config/ConfigurationProvider.java @@ -0,0 +1,61 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.config; + +import io.delta.kernel.annotation.Evolving; +import java.util.NoSuchElementException; +import java.util.Optional; + +/** + * A generic interface for retrieving configuration values. + * + * @since 3.3.0 + */ +@Evolving +public interface ConfigurationProvider { + /** + * Retrieves the configuration value for the given key. + * + * @param key the configuration key + * @return the configuration value associated with the key, if it exists + * @throws NoSuchElementException if the key is not found + */ + String get(String key) throws NoSuchElementException; + + /** + * Retrieves the configuration value for the given key. If it doesn't exist, returns {@link + * Optional#empty}. + * + * @param key the configuration key + * @return the configuration value associated with the key, if it exists + */ + Optional getOptional(String key); + + /** + * Retrieves the configuration value for the given key, ensuring that the value is not null. + * + * @param key the configuration key + * @return the configuration value associated with the key, guaranteed to be non-null + * @throws NoSuchElementException if the key is not found in the configuration + * @throws IllegalStateException if the key exists but its value is null + */ + default String getNonNull(String key) throws NoSuchElementException, IllegalStateException { + final String value = get(key); + if (value == null) throw new IllegalStateException(String.format("%s is null", key)); + return value; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/AbstractCommitCoordinatorBuilder.java b/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/AbstractCommitCoordinatorBuilder.java new file mode 100644 index 0000000000..fb05a8fe6f --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/AbstractCommitCoordinatorBuilder.java @@ -0,0 +1,143 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.coordinatedcommits; + +import io.delta.kernel.annotation.Evolving; +import io.delta.kernel.config.ConfigurationProvider; +import io.delta.kernel.internal.DeltaErrors; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; +import java.util.NoSuchElementException; + +/** + * A builder class to create a {@link CommitCoordinatorClient} (CCC). Engines may choose not to use + * this builder and to instantiate CCCs directly. + * + *

Subclasses of this class must provide a no-argument constructor. + * + *

Note: These builder classes are optional. The {@link + * io.delta.kernel.engine.Engine#getCommitCoordinatorClient} API does not prescribe how to create + * the underlying CCC and does not require a builder. + * + *

The benefit of implementing a builder for your CCC is that your Engine may then invoke {@link + * #buildCommitCoordinatorClient} to (1) instantiate your builder, and then (2) build a new CCC via + * reflection. + * + *

From a user perspective, this means users just need to: + * + *

    + *
  1. specify the commit coordinator name to commit coordinator builder mapping in their Engine's + * configuration (e.g. "foo" to "x.y.z.FooCCBuilder"). Note that Engine is then expected to + * include that mapping in the session-level configuration that is passed in to the {@link + * #buildCommitCoordinatorClient} API call. + *
  2. include the x.y.z.FooCCBuilder and respective client implementation on their classpath + *
+ * + * and then the client can be instantiated at runtime using reflection. + * + * @since 3.3.0 + */ +@Evolving +public abstract class AbstractCommitCoordinatorBuilder { + + /////////////////////////// + // Static Helper Methods // + /////////////////////////// + + /** + * Builds the underlying {@link CommitCoordinatorClient} associated with the given commit + * coordinator name. + * + * + * + * @param commitCoordinatorName the commit coordinator name + * @param sessionConfig The session-level configuration that may represent environment-specific + * configurations such as {@code HadoopConf} or {@code SparkConf}. This sessionConfig is used + * to look up the right builder className to instantiate for the given commit coordinator + * name. It can also be used by builders to lookup per-session configuration values unique to + * that builder. + * @param commitCoordinatorConf the parsed value of the Delta table property {@link + * io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_COORDINATOR_CONF} which represents + * the configuration properties the commit coordinator client needs to be correctly + * instantiated and to communicate to the commit coordinator. + * @return the {@link CommitCoordinatorClient} corresponding to the given commit coordinator name + */ + public static CommitCoordinatorClient buildCommitCoordinatorClient( + String commitCoordinatorName, + ConfigurationProvider sessionConfig, + Map commitCoordinatorConf) { + final String builderConfKey = getCommitCoordinatorBuilderConfKey(commitCoordinatorName); + + final String builderClassName; + try { + builderClassName = sessionConfig.getNonNull(builderConfKey); + } catch (NoSuchElementException | IllegalStateException ex) { + throw DeltaErrors.unknownCommitCoordinator(commitCoordinatorName, builderConfKey); + } + + try { + return Class.forName(builderClassName) + .asSubclass(AbstractCommitCoordinatorBuilder.class) + .getConstructor() + .newInstance() + .build(sessionConfig, commitCoordinatorConf); + } catch (ClassNotFoundException + | NoSuchMethodException + | InstantiationException + | IllegalAccessException + | InvocationTargetException e) { + throw DeltaErrors.couldNotInstantiateCommitCoordinatorClient( + commitCoordinatorName, builderClassName, e); + } + } + + /** Returns the builder configuration key for the given commit coordinator name */ + public static String getCommitCoordinatorBuilderConfKey(String ccName) { + return String.format("io.delta.kernel.commitCoordinatorBuilder.%s.impl", ccName); + } + + //////////////////// + // Member Methods // + //////////////////// + + /** Subclasses of this class must provide a no-argument constructor. */ + public AbstractCommitCoordinatorBuilder() {} + + /** @return the commit coordinator name */ + public abstract String getName(); + + /** + * Build the {@link CommitCoordinatorClient}. This may be a new instance or a cached one. + * + * @param sessionConfig The session-level configuration that may represent environment-specific + * configurations such as {@code HadoopConf} or {@code SparkConf}. This sessionConfig can be + * used by builders to lookup per-session configuration values unique to that builder. + * @param commitCoordinatorConf the parsed value of the Delta table property {@link + * io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_COORDINATOR_CONF} which represents + * the configuration properties the commit coordinator client needs to be correctly + * instantiated and to communicate to the commit coordinator. + * @return the {@link CommitCoordinatorClient} + */ + public abstract CommitCoordinatorClient build( + ConfigurationProvider sessionConfig, Map commitCoordinatorConf); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/CommitCoordinatorClient.java b/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/CommitCoordinatorClient.java index 5870f570f8..40eebfde2c 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/CommitCoordinatorClient.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/CommitCoordinatorClient.java @@ -149,10 +149,11 @@ GetCommitsResponse getCommits( * * @param engine The {@link Engine} instance to use, if needed. * @param tableDescriptor The descriptor for the table. - * @param version The version until which the commit coordinator client should backfill. - * @param lastKnownBackfilledVersion The last known version that was backfilled before this API - * was called. If it is {@link Optional#empty()}, then the commit coordinator client should - * backfill from the beginning of the table. + * @param version The version (inclusive) until which the commit coordinator client should + * backfill. + * @param lastKnownBackfilledVersion The last known version (inclusive) that was backfilled before + * this API was called. If it is {@link Optional#empty()}, then the commit coordinator client + * should backfill from the beginning of the table. * @throws IOException if there is an IO error while backfilling the commits. */ void backfillToVersion( diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java b/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java index 827b31db91..468b0f6e43 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/coordinatedcommits/TableDescriptor.java @@ -28,6 +28,8 @@ * The complete descriptor of a Coordinated Commits (CC) Delta table, including its logPath, table * identifier, and table CC configuration. * + *

The table identifier is not required for path-based tables. + * * @since 3.3.0 */ @Evolving diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index 6a3dc23fd4..e86f61ba71 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -138,7 +138,26 @@ public static KernelException invalidVersionRange(long startVersion, long endVer return new KernelException(message); } - /* ------------------------ PROTOCOL EXCEPTIONS ----------------------------- */ + /* ------------------------ COORDINATED COMMITS EXCEPTIONS - START ------------------------ */ + + public static KernelException unknownCommitCoordinator(String ccName, String ccBuilderConfKey) { + return new KernelException( + String.format( + "Unknown commit coordinator: '%s'. Please ensure that session config '%s' is set.", + ccName, ccBuilderConfKey)); + } + + public static KernelException couldNotInstantiateCommitCoordinatorClient( + String ccName, String ccBuilderClassName, Exception ex) { + return new KernelException( + String.format( + "Could not instantiate Commit Coordinator Client for '%s' using builder class '%s'.", + ccName, ccBuilderClassName), + ex); + } + + /* ------------------------ COORDINATED COMMITS EXCEPTIONS - END ------------------------ */ + /* ------------------------ PROTOCOL EXCEPTIONS - START ----------------------------- */ public static KernelException unsupportedReaderProtocol( String tablePath, int tableReaderVersion) { @@ -179,6 +198,8 @@ public static KernelException unsupportedWriterFeature(String tablePath, String return new KernelException(message); } + /* ------------------------ PROTOCOL EXCEPTIONS - END ----------------------------- */ + public static KernelException columnInvariantsNotSupported() { String message = "This version of Delta Kernel does not support writing to tables with "