Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] [CC Refactor #5] AbstractCommitCoordinatorBuilderSuite #3821

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3f26bef
Create TableIdentifier.java
scottsand-db Oct 22, 2024
b67adc5
Update Table.java
scottsand-db Oct 22, 2024
9048bef
Update TableImpl.java
scottsand-db Oct 22, 2024
1be607b
Update InCommitTimestampSuite.scala
scottsand-db Oct 22, 2024
5dfd248
Create Nullable.java
scottsand-db Oct 23, 2024
e3822e3
Create TableDescriptor.java
scottsand-db Oct 23, 2024
e80f431
Create CommitCoordinatorClient.java
scottsand-db Oct 23, 2024
6348853
Update Engine.java
scottsand-db Oct 23, 2024
e9bb733
Rename getUnbackfilledCommits to getCommits; we can re-do this later
scottsand-db Oct 25, 2024
9aacdf2
Create ConfigurationProvider.java
scottsand-db Oct 23, 2024
ffca476
Create AbstractCommitCoordinatorBuilder.java
scottsand-db Oct 23, 2024
9fa355f
Create CommitCoordinatorUtils and update DeltaErrors
scottsand-db Oct 23, 2024
ced60b5
javafmt
scottsand-db Oct 25, 2024
a322bf7
Update FileSystemClient.java
scottsand-db Oct 24, 2024
0335b82
Update CommitCoordinatorUtils
scottsand-db Oct 24, 2024
9063549
Create InMemoryCommitCoordinatorClient.scala
scottsand-db Oct 24, 2024
8b805ec
Fix javastyle
scottsand-db Oct 24, 2024
90c6f55
Minor fixes
scottsand-db Oct 24, 2024
4919b65
Add default getOptional ConfigurationProvider implementation
scottsand-db Oct 28, 2024
7334934
Move some utils to AbstractCommitCoordinatorBuilder
scottsand-db Oct 28, 2024
78bc12b
Delete CommitCoordinatorProviderSuite.scala
scottsand-db Oct 28, 2024
fcd2991
Create MapBasedConfigurationProvider
scottsand-db Oct 29, 2024
938ebd1
Create AbstractCommitCoordinatorBuilderSuite.scala
scottsand-db Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.TableImpl;
import java.io.IOException;
import java.util.Optional;

/**
* Represents the Delta Lake table for a given path.
Expand Down Expand Up @@ -57,6 +58,10 @@ static Table forPath(Engine engine, String path) {
return TableImpl.forPath(engine, path);
}

static Table forPathWithTableId(Engine engine, String path, TableIdentifier tableId) {
return TableImpl.forPathWithTableId(engine, path, tableId);
}

/**
* The fully qualified path of this {@link Table} instance.
*
Expand All @@ -66,6 +71,15 @@ static Table forPath(Engine engine, String path) {
*/
String getPath(Engine engine);

/**
* The catalog identifier of this {@link Table} instance.
*
* @param engine {@link Engine} instance.
* @return the table identifier, or {@link Optional#empty()} if none is set.
* @since 3.3.0
*/
Optional<TableIdentifier> getTableIdentifier(Engine engine);

/**
* Get the latest snapshot of the table.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel;

import static java.util.Objects.requireNonNull;

import io.delta.kernel.annotation.Evolving;

/**
* Identifier for a table. e.g. $catalog / $schema / $tableName
*
* @since 3.3
*/
@Evolving
public class TableIdentifier {
/** The namespace of the table. */
private final String[] namespace;

/** The name of the table. */
private final String name;

public TableIdentifier(String[] namespace, String name) {
this.namespace = requireNonNull(namespace, "namespace cannot be null");
if (namespace.length == 0) {
throw new IllegalArgumentException("namespace cannot be empty");
}
this.name = requireNonNull(name, "name cannot be null");
}

/** @return the namespace of the table. e.g. $catalog / $schema */
public String[] getNamespace() {
return namespace;
}

/** @return the name of the table. */
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.annotation;

import java.lang.annotation.*;

/**
* Annotation to indicate that a field, method parameter, method return value, or local variable may
* be null.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.LOCAL_VARIABLE})
public @interface Nullable {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.config;

import io.delta.kernel.annotation.Evolving;
import java.util.NoSuchElementException;
import java.util.Optional;

/**
* A generic interface for retrieving configuration values.
*
* @since 3.3.0
*/
@Evolving
public interface ConfigurationProvider {
/**
* Retrieves the configuration value for the given key.
*
* @param key the configuration key
* @return the configuration value associated with the key, if it exists
* @throws NoSuchElementException if the key is not found
*/
String get(String key) throws NoSuchElementException;

/**
* Retrieves the configuration value for the given key. If it doesn't exist, returns {@link
* Optional#empty}.
*
* @param key the configuration key
* @return the configuration value associated with the key, if it exists
*/
default Optional<String> getOptional(String key) {
try {
final String value = get(key);
return Optional.of(value);
} catch (NoSuchElementException ex) {
return Optional.empty();
}
}

/**
* Retrieves the configuration value for the given key, ensuring that the value is not null.
*
* @param key the configuration key
* @return the configuration value associated with the key, guaranteed to be non-null
* @throws NoSuchElementException if the key is not found in the configuration
* @throws IllegalStateException if the key exists but its value is null
*/
default String getNonNull(String key) throws NoSuchElementException, IllegalStateException {
final String value = get(key);
if (value == null) throw new IllegalStateException(String.format("%s is null", key));
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.coordinatedcommits;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.config.ConfigurationProvider;
import io.delta.kernel.internal.DeltaErrors;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.NoSuchElementException;

/**
* A builder class to create a {@link CommitCoordinatorClient} (CCC).
*
* <p>Subclasses of this class must provide a no-argument constructor.
*
* <p>Note: These builder classed are optional. The {@link
* io.delta.kernel.engine.Engine#getCommitCoordinatorClient} API does not prescribe how to create
* the underlying CCC and does not require a builder.
*
* <p>The benefit of implementing a builder for your CCC is that your Engine may then invoke {@link
* #buildCommitCoordinatorClient} to (1) instantiate your builder, and then (2) build a new CCC via
* reflection.
*
* <p>In short, this builder provides the ability for users to specify at runtime which builder
* implementation (e.g. x.y.z.FooCCBuilder) to use for a given commit coordinator name (e.g. "foo"),
* without imposing any restrictions on how to construct CCCs (i.e. these builders are optional).
*/
@Evolving
public abstract class AbstractCommitCoordinatorBuilder {
/** Subclasses of this class must provide a no-argument constructor. */
public AbstractCommitCoordinatorBuilder() {}

/** @return the commit coordinator name */
public abstract String getName();

/**
* Build the {@link CommitCoordinatorClient}. This may be a new instance or a cached one.
*
* @param sessionConfig The session-level configuration that may represent environment-specific
* configurations such as {@code HadoopConf} or {@code SparkConf}. This sessionConfig can be
* used by builders to lookup per-session configuration values unique to that builder.
* @param commitCoordinatorConf the parsed value of the Delta table property {@link
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF} and represents the
* configuration properties for describing the Delta table to the commit-coordinator.
* @return the {@link CommitCoordinatorClient}
*/
public abstract CommitCoordinatorClient build(
ConfigurationProvider sessionConfig, Map<String, String> commitCoordinatorConf);

///////////////////////////
// Static Helper Methods //
///////////////////////////

/**
* Builds the underlying {@link CommitCoordinatorClient} associated with the given commit
* coordinator name.
*
* <ul>
* <li>Determines the specific builder configuration lookup key.
* <li>Grabs the corresponding builder className for that key from the provided sessionConfig.
* <li>Instantiates a new instance of that {@link AbstractCommitCoordinatorBuilder}.
* <li>Invokes the builder's build method, passing along the sessionConfig and commit
* coordinator config map.
* </ul>
*
* @param commitCoordinatorName the commit coordinator name
* @param sessionConfig The session-level configuration that may represent environment-specific
* configurations such as {@code HadoopConf} or {@code SparkConf}. This sessionConfig is used
* to look up the right builder className to instantiate for the given commit coordinator
* name. It can also be used by builders to lookup per-session configuration values unique to
* that builder.
* @param commitCoordinatorConf the parsed value of the Delta table property {@link
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_TABLE_CONF} and represents the
* configuration properties for describing the Delta table to the commit-coordinator.
* @return the {@link CommitCoordinatorClient} corresponding to the given commit coordinator name
*/
public static CommitCoordinatorClient buildCommitCoordinatorClient(
String commitCoordinatorName,
ConfigurationProvider sessionConfig,
Map<String, String> commitCoordinatorConf) {
final String builderConfKey = getCommitCoordinatorBuilderConfKey(commitCoordinatorName);

final String builderClassName;
try {
builderClassName = sessionConfig.getNonNull(builderConfKey);
} catch (NoSuchElementException | IllegalStateException ex) {
throw DeltaErrors.unknownCommitCoordinator(commitCoordinatorName, builderConfKey);
}

try {
return Class.forName(builderClassName)
.asSubclass(AbstractCommitCoordinatorBuilder.class)
.getConstructor()
.newInstance()
.build(sessionConfig, commitCoordinatorConf);
} catch (ClassNotFoundException
| ClassCastException
| NoSuchMethodException
| InstantiationException
| IllegalAccessException
| InvocationTargetException e) {
throw DeltaErrors.couldNotInstantiateCommitCoordinatorClient(
commitCoordinatorName, builderClassName, e);
}
}

/** Returns the builder configuration key for the given commit coordinator name */
public static String getCommitCoordinatorBuilderConfKey(String ccName) {
return String.format("io.delta.kernel.commitCoordinatorBuilder.%s.impl", ccName);
}
}
Loading
Loading