-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-3457] Refactored Spark DataSource Relations to avoid code duplication #4877
Conversation
af2c977
to
2eb193a
Compare
...spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
Show resolved
Hide resolved
2eb193a
to
d875e41
Compare
d875e41
to
2940f46
Compare
...-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala
Outdated
Show resolved
Hide resolved
...ark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
Show resolved
Hide resolved
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
Outdated
Show resolved
Hide resolved
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
Outdated
Show resolved
Hide resolved
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") | ||
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") | ||
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one was false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. There's no reason to disable vectorization.
Confirmed this with @YannByron
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep. enableVectorizedReader
was false. but as discussed with @alexeykudinkin before, we need to enable this to speed up.
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
Show resolved
Hide resolved
2940f46
to
c71cfab
Compare
...spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
Show resolved
Hide resolved
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") | ||
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") | ||
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. There's no reason to disable vectorization.
Confirmed this with @YannByron
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
Show resolved
Hide resolved
...ark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
Show resolved
Hide resolved
blockLocations: Array[SerializableBlockLocation]) | ||
|
||
/** Checks if we should filter out this path name. */ | ||
def shouldFilterOutPathName(pathName: String): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only thing that changed as compared to Spark's HadoopFsUtils
@hudi-bot run azure |
be26058
to
ec7e1b3
Compare
Moved `buildScan` impl into `HoodieBaseRelation`
Tidying up
…ity, avoid duplication
Tidying up;
…uld be shared w/ COW impl
…ils` to override default behavior of InMemoryFileIndex filtering out all files stated w/ "."
… to be able to appropriately invoke super
ec7e1b3
to
40e5a85
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM direction-wise. The extracted BaseRelation logic a bit hard to exam line by line. Testing will be a more effective way to verify. If result ok, it's good to land this.
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
Show resolved
Hide resolved
val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || pathName.endsWith("._COPYING_") | ||
val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") | ||
exclude && !include |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should some utils from MDT be the source of truth to these rules instead? spark side does not own these, also can avoid copying it over different spark versions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now this is mostly about filtering out Spark-specific stuff. We can replace it with own utils when there will be a need for it, but for now the goal of borrowing this class was to override its behavior filtering out the files starting with "."
...source/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
Show resolved
Hide resolved
...atasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
Show resolved
Hide resolved
hadoopConf = hadoopConf, | ||
filter = new PathFilterWrapper(filter), | ||
ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles, | ||
// NOTE: We're disabling fetching Block Info to speed up file listing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may need a special token here to indicate changed part in hudi's codebase for easier maintenance. // NOTE:
is not special enough. what about // HUDI NOTE:
? this can apply to any other incoming code variation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure understand your point here: what do you suggest this token to be used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant to say when we want to understand which part of code is modified in Hudi, we may search for a special token and find the relevant code. NOTE:
might come from the original code base so wanted to make it special.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha. It's gonna be tough to identify all such places with markers, instead i'm referencing respective Spark release version this is borrowed from so that we can simply diff against it and see what has changed.
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") | ||
// TODO(HUDI-3639) vectorized reader has to be disabled to make sure MORIncrementalRelation is working properly | ||
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.sql.parquet.enableVectorizedReader = true
to enable vectorization acceleration ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take a look at the TODO note i've added to it. We can't do that b/c MOR Incremental Relation relies on Parquet Filtering which doesn't work w/ vectorized reader
+1 LGTM |
manually tested this patch in spark 3.2.1, using quickstart examples, and passed. landing this. |
…cation (apache#4877) Refactoring Spark DataSource Relations to avoid code duplication. Following Relations were in scope: - BaseFileOnlyViewRelation - MergeOnReadSnapshotRelaation - MergeOnReadIncrementalRelation
…cation (apache#4877) Refactoring Spark DataSource Relations to avoid code duplication. Following Relations were in scope: - BaseFileOnlyViewRelation - MergeOnReadSnapshotRelaation - MergeOnReadIncrementalRelation
Tips
What is the purpose of the pull request
NOTE: This PR is stacked on top of #4818
Refactoring Spark DataSource Relations to avoid code duplication. Following Relations were in scope:
BaseFileOnlyViewRelation
MergeOnReadSnapshotRelaation
MergeOnReadIncrementalRelation
Brief change log
See above
Verify this pull request
This pull request is already covered by existing tests, such as (please describe tests).
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.