-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] DV Reads Stability Improvement in Delta by removing Broadcasting DV Information #2888
[Spark] DV Reads Stability Improvement in Delta by removing Broadcasting DV Information #2888
Conversation
spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice, thank you! Looking forward to seeing how much speed-up we can get with this change :)
9f1cb83
to
8c988ab
Compare
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
648ddde
to
e049778
Compare
super.metadataSchemaFields ++ rowTrackingFields | ||
} | ||
|
||
val isDVsEnabled = DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metadata
shows the current status of the table, i.e., if DV is enabled in the current version, isn't it? How about some old versions that have DV, but now the user has decided to disable it? I think we should look at the table protocol: if DV table feature is turned on, then we assume the table contains DV.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xupefei Quick question, can an existing table add new table feature? Like if a table does not have DV from the beginning, in the property, and then at some point later in time get added DVs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xupefei I think you can do this using ALTER TABLE right? But once it is in, you cannot drop it at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at the code, I feel that we need a test case with the following logic:
- Create a table without DV. Insert some values.
- Turn on DV (
delta.enableDeletionVectors
=true
). - Delete some values.
- Turn off DV (
delta.enableDeletionVectors
=false
). - Delete some values.
Check:
- Version 1 does not contain DV metadata column.
- Version 2 does not contain DV metadata column (because there's no DV in this table - correct me if I'm wrong).
- Version 3 does contain DV metadata columns.
- Version 4 does contain DV metadata columns.
- Version 5 does contain DV metadata columns.
@xupefei I addressed your comments =)). Regarding the new test case:
|
e049778
to
9618b0e
Compare
spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few improvements to the tests then we're good to go
Cc @larsk-db |
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Show resolved
Hide resolved
@@ -104,7 +102,7 @@ case class DeltaParquetFileFormat( | |||
override def isSplitable( | |||
sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = isSplittable | |||
|
|||
def hasDeletionVectorMap: Boolean = broadcastDvMap.isDefined && broadcastHadoopConf.isDefined | |||
def hasBroadcastHadoopConf: Boolean = broadcastHadoopConf.isDefined |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can get rid of broadcastHadoopConf
and hasBroadcastHadoopConf
altogether here.
The only use we have for it now is to pass it to rowIndexFilter.createInstance
when creating the DV filter but we also have hadoopConf: Configuration
around (passed as an argument to buildReaderWithPartitionValues
) which I believe should be sufficient here: we don't need this configuration to be broadcast first for the purpose of loading DVs from storage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johanl-db No clue why we had it back in the days hmm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@johanl-db Removing broadcastHadoopConf
doesn't seem to work, I haven't investigated yet, changing back to using the broadcastHadoopConf
makes the tests passed
spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/DeltaParquetFileFormatSuite.scala
Outdated
Show resolved
Hide resolved
I took a quick look, but it seems you and @johanl-db are on top of this. Ping me if you have something specific that needs my input. |
3c52eb0
to
479ef5d
Compare
spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/files/TahoeFileIndex.scala
Outdated
Show resolved
Hide resolved
479ef5d
to
63e9527
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I highly approve this change :D
Thank you for doing this! This will improve the stability on large tables with lots of DVs significantly.
Which Delta project/connector is this regarding?
Description
Back then, we relied on an expensive Broadcast of DV files to pass the DV files to the associated Parquet Files. With the introduction of adding custom metadata to files introduced in Spark 3.5, we can now pass the DV through the custom metadata field, this is expected to improve the stability of DV reads in Delta.
How was this patch tested?
Adjusted the existing UTs that cover our changes.
Does this PR introduce any user-facing changes?
No.