-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel] Added Domain Metadata support to Delta Kernel #3835
base: master
Are you sure you want to change the base?
[Kernel] Added Domain Metadata support to Delta Kernel #3835
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, this is looking pretty good
kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java
Outdated
Show resolved
Hide resolved
@@ -221,6 +222,10 @@ private TransactionCommitResult doCommit( | |||
} | |||
setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow()))); | |||
|
|||
// If dataActions contains any domainMetadata actions, we check if the feature is supported | |||
// and if there are duplicates. | |||
DomainMetadataUtils.validateSupportedAndNoDuplicate(protocol, dataActions, FULL_SCHEMA); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will consume the dataActions
iterator, reading all actions at once to validate that there aren't duplicates, whereas the previous code wouldn't read from the iterator and just pass it to writeJsonFileAtomically
That means in particular that we read all actions from the iterator twice, which may not necessarily be safe and will likely impact performance.
As an alternative, you could try implementing the check as an iterator itself that will wrap around the dataActions
iterator:
ValidateDomainMetadata implements Iterator {
CloseableIterator<Row> actions;
Row next() {
action = actions.next()
// Do checks, keep track of DomainMetadata actions seen
return action;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea! I’ve implemented this in my latest commits - please see ValidateDomainMetadataIterator
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadataUtils.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
} catch (IOException ex) { | ||
throw new RuntimeException("Could not close iterator", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just log an error and keep moving instead? Not sure what we do in the rest of kernel in such a case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what we do in the loadTableProtocolAndMetadata()
function above.
In some cases, we also throw new UncheckedIOException(...)
(e.g., in TransactionImpl::doCommit
).
However, it seems we always thrown an exception in cases like this.
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala
Outdated
Show resolved
Hide resolved
kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataSuite.scala
Show resolved
Hide resolved
…Errors.java Co-authored-by: Johan Lasperas <[email protected]>
Co-authored-by: Johan Lasperas <[email protected]>
Co-authored-by: Johan Lasperas <[email protected]>
…uring conflict resolution
2da16e9
to
7d7032e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Thanks for this PR!
Left a few comments :p Mostly style. A few important ones about logic/functionality.
I did not require the tests, will do so after changes are made.
kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java
Outdated
Show resolved
Hide resolved
public static final String FEATURE_NAME = "domainMetadata"; | ||
|
||
/** The minimum writer version required to support domain metadata. */ | ||
public static final int MIN_WRITER_VERSION_REQUIRED = 7; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @vkorukanti -- I feel like we need to come up with a cleaner template for Kernel Delta Table Features for them to all include common information, like the min reader writer version required, the feature name, etc.
* @param removed If it is true it serves as a tombstone to logically delete a {@link | ||
* DomainMetadata} action. | ||
*/ | ||
public DomainMetadata(String domain, String configuration, boolean removed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't we want to parse the configuration
to a Map<String, String>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you suggest we should also parse and validate the configuration
string as a Map<String, String>
at construction, in addition to storing a raw string? Currently I only use the raw string since this is how it is write to/read from log files and this is how it is implemented in DBR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- On read path, it can be lazy.
- On write path, we should be sure we only write valid domain metadata. Btw, where in the spec does it say the configuration has to be a
Map<String, String>
? - Shouldn't we validate on the write path that the user hasn't written to a
System-controlled metadata domain
?
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ConflictChecker.java
Outdated
Show resolved
Hide resolved
...l/kernel-api/src/main/java/io/delta/kernel/internal/util/ValidateDomainMetadataIterator.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/actions/DomainMetadata.java
Show resolved
Hide resolved
...l/kernel-api/src/main/java/io/delta/kernel/internal/util/ValidateDomainMetadataIterator.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/CreateCheckpointIterator.java
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java
Outdated
Show resolved
Hide resolved
/** | ||
* Retrieves a map of domainName to {@link DomainMetadata} from the log files. | ||
* | ||
* <p>Now loading domain metadata requires an additional round of log replay so this is done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should do this during the P & M log replay
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to our discussion, we will address this in a separate PR.
@@ -221,7 +223,8 @@ private TransactionCommitResult doCommit( | |||
} | |||
setTxnOpt.ifPresent(setTxn -> metadataActions.add(createTxnSingleAction(setTxn.toRow()))); | |||
|
|||
try (CloseableIterator<Row> stageDataIter = dataActions.iterator()) { | |||
try (CloseableIterator<Row> stageDataIter = | |||
new ValidateDomainMetadataIterator(protocol, dataActions.iterator(), FULL_SCHEMA)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. So this is just a thin wrapper over the actions-to-be-committed and validates the DomainMetadata on write.
@vkorukanti -- would we ever want to validate other things on write?
@qiyuandong-db -- I wonder if we should not have a ValidateDomainMetadataIterator
but rather a ValidationIterator
. This validation iterator lets you pass in different validationFunctions that take a row and decide to or not to throw an error. This would be a more extensible solution. It would let us avoid iterater wrappers on top of iterator wrappers on top of iterator wrappers ...
For example, I'd be fine removing the validation from this PR and coding that ^ up in a followup PR ... let's see what @vkorukanti thinks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requested some minor changes. Had one pretty big question about the validator iterator. The tests look great! Will stamp after this 🫡
Which Delta project/connector is this regarding?
Description
This PR adds support for Domain Metadata to Delta Kernel as described in the Delta Protocal.
In particular, it enables Delta Kernel to handle domain metadata across transaction commit, checkpointing, log replay, and conflict resolution.
How was this patch tested?
Added tests covering all operations involving DomainMetadata in
DomainMetadataSuite
Does this PR introduce any user-facing changes?
No.