-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DV table plan transformer trait to prune the deleted rows from scan output #1560
Conversation
3b564c9
to
c08a48c
Compare
* It requires that the given plan already gone through [[OptimizeSubqueries]] and the | ||
* root node denoting a subquery is removed and optimized appropriately. | ||
*/ | ||
def transformWithSubqueries(plan: LogicalPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to give it a different name as Spark has a transformWithSubqueries
method which processes the tree in a different way. We also need to document this behavior clearly as now this becomes a utility method.
In PrepareDeltaScan, we want to scan all subqueries from the leaf nodes to the root (transformUp) but for each subquery, we want to scan from the root to the leaf nodes (transformDown). But transformWithSubqueries does't have this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the docs.
core/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/sql/delta/stats/PrepareDeltaScan.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This PR is part of the feature: Support reading Delta tables with deletion vectors (more details at #1485)
Add a trait (used by
PrepareDeltaScan
to modify its output) to modify DV enabled tables to prune the deleted rows from scan outputPlanner trait to inject a Filter just after the Delta Parquet scan. This transformer modifies the plan:
.
<Parent Node> -> Delta Scan (key, value)
. Here we are readingkey
,value
columns from the Delta table<Parent Node> -> Project(key, value) -> Filter (udf(__skip_row == 0) -> Delta Scan (key, value, __skip_row)
__skip_row
. This value is populated by the Parquet reader using the DV corresponding to the Parquet file read (refer to the change) and it contains0
if we want to keep the row.__skip_row
we need to read the rows in a file consecutively in order to generate the row index. This is a drawback we need to pay until we upgrade to latest Apache Spark which contains Parquet reader changes that automatically generate the row_index irrespective of the file splitting and filter pushdowns.__skip_row
equals to 0Project
to keep the plan node output same as before the rule is appliedIn addition
deletionVector
to DeltaLog protocol objects (AddFile
,RemoveFile
)OptimizeMetadataOnlyDeltaQuery
to take into consideration of the DVs when calculating the row count.DeletionVectorsSuite
In following up PRs, will be adding extensive tests.