Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update DeltaParquetFileFormat to add isRowDeleted column populated from DV #1542

Closed
wants to merge 7 commits into from

Conversation

vkorukanti
Copy link
Collaborator

This PR is part of the feature: Support reading Delta tables with deletion vectors (more details at #1485)

It modifies the DeltaParquetFileFormat to append an additional column called __delta_internal_skip_row__. This column is populated by reading the DV associated with the Parquet file. We assume the rows returned are in order given in the file. To ensure the order we disable file splitting and filter pushdown to Parquet reader. This has performance penalty for Delta tables with deletion vectors until we upgrade Delta to Spark version to 3.4 (which has Parquet reader that can generate row-indexes correctly with file splitting and filter pushdown).

Currently added a single test. There will be e2e tests that cover the code better.

GitOrigin-RevId: 2067958ffc770a89df15fd165c9999d49b2dd1c4

GitOrigin-RevId: 93382ac54f836fb4c14f23f97b28eea6e663d0be
GitOrigin-RevId: 0a3e11e1e50e852f9b5e601b46ff491bf8fe060d
GitOrigin-RevId: e03192bbb6a6bc0ce0397e7ef1f4ad4958a20f47
GitOrigin-RevId: 5724f81bb88d8e4c725aeafd49db5d5433860fd4
GitOrigin-RevId: b19019efc5b3c1beb9ce464bf7ac087f0bd01182
@vkorukanti vkorukanti changed the title Update DeltaParquetFileFormat to add skip row flag column for files with DVs Update DeltaParquetFileFormat to add isRowDeleted column populated from DV Jan 6, 2023
val broadcastHadoopConf: Option[Broadcast[SerializableConfiguration]] = None)
extends ParquetFileFormat {
// Validate either we have all arguments for DV enabled read or none of them.
require(!(broadcastHadoopConf.isDefined ^ broadcastDvMap.isDefined ^ tablePath .isDefined ^
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: tablePath .isDefined there's a space there? is that valid syntax?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove the extra space. It is valid from the compiler perspective.

GitOrigin-RevId: 6730b4211535bbfac0bc410d5dbe78d568dd6e50
GitOrigin-RevId: d6204d5afcedd72f2f05da684cd001374e76b5dd
vkorukanti added a commit that referenced this pull request Jan 26, 2023
…an output

This PR is part of the feature: Support reading Delta tables with deletion vectors (more details at #1485)

Add a trait (used by `PrepareDeltaScan` to modify its output) to modify DV enabled tables to prune the deleted rows from scan output

Planner trait to inject a Filter just after the Delta Parquet scan. This transformer modifies the plan:
 * Before rule: `<Parent Node> -> Delta Scan (key, value)`
   * Here we are reading `key`, `value` columns from the Delta table
 * After rule: `<Parent Node> -> Project(key, value) -> Filter (udf(__skip_row == 0) -> Delta Scan (key, value, __skip_row)`
   * Here we insert a new column in Delta scan `__skip_row`. This value is populated by the Parquet reader using the DV corresponding to
     the Parquet file read (refer [to the change](#1542)) and it contains `0` if we want to keep the row.
   * The scan created also disables Parquet file splitting and filter pushdowns, because in order to generate the `__skip_row` we need to
     read the rows in a file consecutively in order to generate the row index. This is a drawback we need to pay until we upgrade to latest
     Apache Spark which contains Parquet reader changes that automatically generate the row_index irrespective of the file splitting and
     filter pushdowns.
   * The scan created also contains a broadcast variable of Parquet File -> DV File map. The Parquet reader created uses this map to find
     the DV file corresponding to the Parquet file.
   * Filter created just filters out rows with `__skip_row` equals to 0
   * And at the end we have a `Project` to keep the plan node output same as before the rule is applied

In addition
* it adds the `deletionVector` to DeltaLog protocol objects (`AddFile`, `RemoveFile`)
* It also updates the `OptimizeMetadataOnlyDeltaQuery` to take into consideration of the DVs when calculating the row count.
* end-to-end integration of reading Delta tables with DVs in `DeletionVectorsSuite`

In following up PRs, will be adding extensive tests.

Close #1560

GitOrigin-RevId: 3d67b6240865d880493f1d15a80b00cb079dacdc
tdas pushed a commit that referenced this pull request Apr 23, 2024
…sting DV Information (#2888)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Back then, we relied on an [expensive Broadcast of DV
files](#1542) to pass the DV files
to the associated Parquet Files. With the introduction of [adding custom
metadata to files](apache/spark#40677)
introduced in Spark 3.5, we can now pass the DV through the custom
metadata field, this is expected to improve the performance of DV reads
in Delta.
## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Adjusted the existing UTs that cover our changes.
## Does this PR introduce _any_ user-facing changes?
No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
@vkorukanti vkorukanti deleted the dv8 branch May 9, 2024 02:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants