-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] [CC Refactor #3] Add ConfigurationProvider
and AbstractCommitCoordinatorBuilder
APIs
#3798
base: master
Are you sure you want to change the base?
Changes from all commits
0114cdc
fb35cd2
8d4a24f
8e9f0e7
40f39ce
81ddff4
f49a7f3
67b18db
ee51604
3d99aaa
a2e7768
2e89c29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.delta.kernel.config; | ||
|
||
import io.delta.kernel.annotation.Evolving; | ||
import java.util.NoSuchElementException; | ||
import java.util.Optional; | ||
|
||
/** | ||
* A generic interface for retrieving configuration values. | ||
* | ||
* @since 3.3.0 | ||
*/ | ||
@Evolving | ||
public interface ConfigurationProvider { | ||
/** | ||
* Retrieves the configuration value for the given key. | ||
* | ||
* @param key the configuration key | ||
* @return the configuration value associated with the key, if it exists | ||
* @throws NoSuchElementException if the key is not found | ||
*/ | ||
String get(String key) throws NoSuchElementException; | ||
scottsand-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/** | ||
* Retrieves the configuration value for the given key. If it doesn't exist, returns {@link | ||
* Optional#empty}. | ||
* | ||
* @param key the configuration key | ||
* @return the configuration value associated with the key, if it exists | ||
*/ | ||
Optional<String> getOptional(String key); | ||
|
||
/** | ||
* Retrieves the configuration value for the given key, ensuring that the value is not null. | ||
* | ||
* @param key the configuration key | ||
* @return the configuration value associated with the key, guaranteed to be non-null | ||
* @throws NoSuchElementException if the key is not found in the configuration | ||
* @throws IllegalStateException if the key exists but its value is null | ||
*/ | ||
default String getNonNull(String key) throws NoSuchElementException, IllegalStateException { | ||
final String value = get(key); | ||
if (value == null) throw new IllegalStateException(String.format("%s is null", key)); | ||
return value; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.delta.kernel.coordinatedcommits; | ||
|
||
import io.delta.kernel.annotation.Evolving; | ||
import io.delta.kernel.config.ConfigurationProvider; | ||
import io.delta.kernel.internal.DeltaErrors; | ||
import java.lang.reflect.InvocationTargetException; | ||
import java.util.Map; | ||
import java.util.NoSuchElementException; | ||
|
||
/** | ||
* A builder class to create a {@link CommitCoordinatorClient} (CCC). Engines may choose not to use | ||
* this builder and to instantiate CCCs directly. | ||
* | ||
* <p>Subclasses of this class must provide a no-argument constructor. | ||
* | ||
* <p>Note: These builder classes are optional. The {@link | ||
* io.delta.kernel.engine.Engine#getCommitCoordinatorClient} API does not prescribe how to create | ||
* the underlying CCC and does not require a builder. | ||
* | ||
* <p>The benefit of implementing a builder for your CCC is that your Engine may then invoke {@link | ||
scottsand-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* #buildCommitCoordinatorClient} to (1) instantiate your builder, and then (2) build a new CCC via | ||
* reflection. | ||
* | ||
* <p>From a user perspective, this means users just need to: | ||
* | ||
* <ol> | ||
* <li>specify the commit coordinator name to commit coordinator builder mapping in their Engine's | ||
* configuration (e.g. "foo" to "x.y.z.FooCCBuilder"). Note that Engine is then expected to | ||
* include that mapping in the session-level configuration that is passed in to the {@link | ||
* #buildCommitCoordinatorClient} API call. | ||
* <li>include the x.y.z.FooCCBuilder and respective client implementation on their classpath | ||
* </ol> | ||
* | ||
* and then the client can be instantiated at runtime using reflection. | ||
* | ||
* @since 3.3.0 | ||
*/ | ||
@Evolving | ||
public abstract class AbstractCommitCoordinatorBuilder { | ||
|
||
/////////////////////////// | ||
// Static Helper Methods // | ||
/////////////////////////// | ||
|
||
/** | ||
* Builds the underlying {@link CommitCoordinatorClient} associated with the given commit | ||
* coordinator name. | ||
* | ||
* <ul> | ||
* <li>Determines the specific builder configuration lookup key. | ||
* <li>Grabs the corresponding builder className for that key from the provided sessionConfig. | ||
* <li>Instantiates a new instance of that {@link AbstractCommitCoordinatorBuilder}. | ||
* <li>Invokes the builder's build method, passing along the sessionConfig and commit | ||
* coordinator config map. | ||
* </ul> | ||
* | ||
* @param commitCoordinatorName the commit coordinator name | ||
* @param sessionConfig The session-level configuration that may represent environment-specific | ||
* configurations such as {@code HadoopConf} or {@code SparkConf}. This sessionConfig is used | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to make them implement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They need to pass in something that is a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if you need both hadoopConf and sparkConf? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great question. You can union them to implement the When we go to implement the UC CCC using this API, we can adjust methods as needed. |
||
* to look up the right builder className to instantiate for the given commit coordinator | ||
* name. It can also be used by builders to lookup per-session configuration values unique to | ||
* that builder. | ||
* @param commitCoordinatorConf the parsed value of the Delta table property {@link | ||
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_COORDINATOR_CONF} which represents | ||
* the configuration properties the commit coordinator client needs to be correctly | ||
* instantiated and to communicate to the commit coordinator. | ||
* @return the {@link CommitCoordinatorClient} corresponding to the given commit coordinator name | ||
*/ | ||
public static CommitCoordinatorClient buildCommitCoordinatorClient( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand correctly, the dev experience for using this method will be:
Somehow, it doesn't feel intuitive that this code will look up the builder in the conf and do stuff There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How's this method different than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is the old code that will be deleted when this is refactored. it uses HadoopConf and io.delta.storage APIs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Really? I quite like it. Feel free to let me know what doesn't feel right about this. This is what we agreed on in the sync on Tuesday Oct 22 @ 1:00pm There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Further, this API feels very similar to the previous API of
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was hoping we could prevent patterns like the following by moving
|
||
String commitCoordinatorName, | ||
ConfigurationProvider sessionConfig, | ||
Map<String, String> commitCoordinatorConf) { | ||
final String builderConfKey = getCommitCoordinatorBuilderConfKey(commitCoordinatorName); | ||
|
||
final String builderClassName; | ||
try { | ||
builderClassName = sessionConfig.getNonNull(builderConfKey); | ||
} catch (NoSuchElementException | IllegalStateException ex) { | ||
throw DeltaErrors.unknownCommitCoordinator(commitCoordinatorName, builderConfKey); | ||
} | ||
|
||
try { | ||
return Class.forName(builderClassName) | ||
.asSubclass(AbstractCommitCoordinatorBuilder.class) | ||
.getConstructor() | ||
.newInstance() | ||
.build(sessionConfig, commitCoordinatorConf); | ||
} catch (ClassNotFoundException | ||
| NoSuchMethodException | ||
| InstantiationException | ||
| IllegalAccessException | ||
| InvocationTargetException e) { | ||
throw DeltaErrors.couldNotInstantiateCommitCoordinatorClient( | ||
commitCoordinatorName, builderClassName, e); | ||
} | ||
} | ||
|
||
/** Returns the builder configuration key for the given commit coordinator name */ | ||
public static String getCommitCoordinatorBuilderConfKey(String ccName) { | ||
return String.format("io.delta.kernel.commitCoordinatorBuilder.%s.impl", ccName); | ||
} | ||
|
||
//////////////////// | ||
// Member Methods // | ||
//////////////////// | ||
|
||
/** Subclasses of this class must provide a no-argument constructor. */ | ||
public AbstractCommitCoordinatorBuilder() {} | ||
|
||
/** @return the commit coordinator name */ | ||
public abstract String getName(); | ||
|
||
/** | ||
* Build the {@link CommitCoordinatorClient}. This may be a new instance or a cached one. | ||
* | ||
* @param sessionConfig The session-level configuration that may represent environment-specific | ||
* configurations such as {@code HadoopConf} or {@code SparkConf}. This sessionConfig can be | ||
* used by builders to lookup per-session configuration values unique to that builder. | ||
* @param commitCoordinatorConf the parsed value of the Delta table property {@link | ||
* io.delta.kernel.internal.TableConfig#COORDINATED_COMMITS_COORDINATOR_CONF} which represents | ||
* the configuration properties the commit coordinator client needs to be correctly | ||
* instantiated and to communicate to the commit coordinator. | ||
* @return the {@link CommitCoordinatorClient} | ||
*/ | ||
public abstract CommitCoordinatorClient build( | ||
ConfigurationProvider sessionConfig, Map<String, String> commitCoordinatorConf); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scottsand-db This class doesn't provide any method to iterate all configs. Does that work for UC Commit Coordinator? Don't we need to iterate over all configs in that? Will let @sumeet-db confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prakharjain09 -- Yup, great callout. We can update APIs when we get there. IMO there's no rush or requirement to add it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will need a
getAll
/getAllByPrefix
API for UC Commit Coordinator Builder when we get there.