-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Kernel][Writes] Initial insert APIs and implementation #2944
Conversation
c5b2ee5
to
f61d500
Compare
(Split from larger PR delta-io#2944) Add `toRow` for each of the actions objects. Also add any missing actions such as `CommitInfo`.
(Split from larger PR delta-io#2944) Add `toRow` for each of the actions objects. Also add any missing actions such as `CommitInfo`.
(Split from larger PR delta-io#2944) Add `toRow` for each of the actions objects. Also add any missing actions such as `CommitInfo`.
(Split from larger PR delta-io#2944) Add `toRow` for each of the actions objects. Also add any missing actions such as `CommitInfo`.
…cts (#2976) (Split from larger PR #2944) Add `toRow` for each of the action objects. Also add any missing actions such as `CommitInfo`. Also * Rename `READ_SCHEMA` to `FULL_SCHEMA` when all columns in an action are present. `READ_SCHEMA` is a term used to read just the subset of columns, but in some actions, it also represents the full schema. * Utility method to create single action `Row` object from a specific actions. Eg. `createMetadataSingleAction(Metadata metadata)` returns a `Row` of single action schema with `metaData` column representing the given `metadata` object.
…rings (Split from larger PR delta-io#2944) Adds a utility method to convert the partition value literal to Delta protocol-compliant string value. UTs
…ctory (Split from larger PR delta-io#2944) Utility method to construct the partition data directory. UTs
… objects (delta-io#2976) (Split from larger PR delta-io#2944) Add `toRow` for each of the action objects. Also add any missing actions such as `CommitInfo`. Also * Rename `READ_SCHEMA` to `FULL_SCHEMA` when all columns in an action are present. `READ_SCHEMA` is a term used to read just the subset of columns, but in some actions, it also represents the full schema. * Utility method to create single action `Row` object from a specific actions. Eg. `createMetadataSingleAction(Metadata metadata)` returns a `Row` of single action schema with `metaData` column representing the given `metadata` object.
f61d500
to
738f0f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Thanks in advance for answering all my questions :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this go under some exceptions
folder? IMO this shouldn't be next to Table.java
kernel/kernel-api/src/main/java/io/delta/kernel/ConcurrentWriteException.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/DataWriteContext.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java
Outdated
Show resolved
Hide resolved
kernel/kernel-api/src/main/java/io/delta/kernel/Transaction.java
Outdated
Show resolved
Hide resolved
List<Row> metadataActions = new ArrayList<>(); | ||
metadataActions.add(createCommitInfoSingleAction(generateCommitAction())); | ||
metadataActions.add(createMetadataSingleAction(metadata.toRow())); | ||
metadataActions.add(createProtocolSingleAction(protocol.toRow())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you write the metadata and protocol action every time? performance improvement?
|
||
// Create a new CloseableIterator that will return the commit action, metadata action, and | ||
// the stagedData | ||
CloseableIterator<Row> stagedDataWithCommitInfo = new CloseableIterator<Row>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you basically just unioning two iterators? is there a simpler util method we can use to do that instead?
|
||
// Create a new CloseableIterator that will return the commit action, metadata action, and | ||
// the stagedData | ||
CloseableIterator<Row> stagedDataWithCommitInfo = new CloseableIterator<Row>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it's more than just commitInfo right? Since above it is also protocol and metadata?
} | ||
|
||
private Row generateCommitAction() { | ||
return new CommitInfo(System.currentTimeMillis(), engineInfo, operation).toRow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we insert the kernel-specific engine info? Or do we rely on the connector to do that?
* child paths. | ||
* @return | ||
*/ | ||
public static Path tryRelativizePath(Path child, URI root) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just relativizePath
? "try" makes me think that it may or may not return a relativized path, i.e. that it is best effort.
but this method 100% does return a relativezedPath (it just might not have to do any work if it already is relativized, that's okay)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to relativizePath
## Description (Split from larger PR #2944) This API allows the creation of an initial delta log directory when the table is created.
## Description (Split from the larger PR #2944) These are utility to make sure the given schema when creating the table is valid (has no duplicate column names or invalid chars). The code/logic is similar to Delta-Spark/Standalone. ## How was this patch tested? Unittests
(Split from larger PR delta-io#2944) This API allows the creation of an initial delta log directory when the table is created.
(Split from the larger PR delta-io#2944) These are utility to make sure the given schema when creating the table is valid (has no duplicate column names or invalid chars). The code/logic is similar to Delta-Spark/Standalone. Unittests
58e39cd
to
d8ae28b
Compare
d8ae28b
to
f17450d
Compare
## Description (Split from #2944) APIs and implementation for creating partitioned or unpartitioned tables. No data insertion yet. Will come in the next PR. ## How was this patch tested? Test suite
## Description (Split from #2944) Adds support for inserting data into the table. ## How was this patch tested? Tests for inserting into partitioned and unpartitioned tables with various combinations of the types, partition values etc. Also tests the checkpoint is ready to create.
…o#3016) (Split from delta-io#2944) APIs and implementation for creating partitioned or unpartitioned tables. No data insertion yet. Will come in the next PR. Test suite
…a-io#3030) (Split from delta-io#2944) Adds support for inserting data into the table. Tests for inserting into partitioned and unpartitioned tables with various combinations of the types, partition values etc. Also tests the checkpoint is ready to create.
## Description (Split from #2944) Adds an API on `TransactionBuilder` to take the transaction identifier for idempotent writes ``` /* * Set the transaction identifier for idempotent writes. Incremental processing systems (e.g., * streaming systems) that track progress using their own application-specific versions need to * record what progress has been made, in order to avoid duplicating data in the face of * failures and retries during writes. By setting the transaction identifier, the Delta table * can ensure that the data with same identifier is not written multiple times. For more * information refer to the Delta protocol section <a * href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers"> * Transaction Identifiers</a>. * * @param engine {@link Engine} instance to use. * @param applicationId The application ID that is writing to the table. * @param transactionVersion The version of the transaction. This should be monotonically * increasing with each write for the same application ID. * @return updated {@link TransactionBuilder} instance. */ TransactionBuilder withTransactionId( Engine engine, String applicationId, long transactionVersion); ``` During the transaction build, check the latest txn version of the given AppId. If it is not monotonically increasing throw `ConcurrentTransactionException`. ## How was this patch tested? Added to `DeltaTableWriteSuite.scala`
(Split from #2944) Adds an API on `TransactionBuilder` to take the transaction identifier for idempotent writes ``` /* * Set the transaction identifier for idempotent writes. Incremental processing systems (e.g., * streaming systems) that track progress using their own application-specific versions need to * record what progress has been made, in order to avoid duplicating data in the face of * failures and retries during writes. By setting the transaction identifier, the Delta table * can ensure that the data with same identifier is not written multiple times. For more * information refer to the Delta protocol section <a * href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#transaction-identifiers"> * Transaction Identifiers</a>. * * @param engine {@link Engine} instance to use. * @param applicationId The application ID that is writing to the table. * @param transactionVersion The version of the transaction. This should be monotonically * increasing with each write for the same application ID. * @return updated {@link TransactionBuilder} instance. */ TransactionBuilder withTransactionId( Engine engine, String applicationId, long transactionVersion); ``` During the transaction build, check the latest txn version of the given AppId. If it is not monotonically increasing throw `ConcurrentTransactionException`. Added to `DeltaTableWriteSuite.scala`
Description
Implements interfaces described in #2920. Currently supports inserting into partitioned/unpartitioned tables with protocol version (1, 2). It also has transaction identifier support for idempotent writes.
Conflict resolution is not yet available. It will come as part of the follow up PRs.
How was this patch tested?
Integration tests. More are on the way.