forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 52
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#129, define spark default db #130
Merged
fishcus
merged 1 commit into
Kyligence:kyspark-2.4.1.x-4.x
from
fishcus:KE-spark-default-db-4.x
Jul 8, 2020
Merged
#129, define spark default db #130
fishcus
merged 1 commit into
Kyligence:kyspark-2.4.1.x-4.x
from
fishcus:KE-spark-default-db-4.x
Jul 8, 2020
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
eventd
previously approved these changes
Jun 24, 2020
fishcus
force-pushed
the
KE-spark-default-db-4.x
branch
from
July 8, 2020 06:30
385588b
to
8945487
Compare
eventd
previously approved these changes
Jul 8, 2020
fishcus
force-pushed
the
KE-spark-default-db-4.x
branch
3 times, most recently
from
July 8, 2020 12:44
b2a29fa
to
0d0173d
Compare
fishcus
force-pushed
the
KE-spark-default-db-4.x
branch
from
July 8, 2020 12:48
0d0173d
to
4b1fa92
Compare
zheniantoushipashi
approved these changes
Jul 8, 2020
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
eventd
approved these changes
Jul 8, 2020
7mming7
pushed a commit
that referenced
this pull request
Nov 4, 2020
…or its output partitioning ### What changes were proposed in this pull request? Currently, the `BroadcastHashJoinExec`'s `outputPartitioning` only uses the streamed side's `outputPartitioning`. However, if the join type of `BroadcastHashJoinExec` is an inner-like join, the build side's info (the join keys) can be added to `BroadcastHashJoinExec`'s `outputPartitioning`. For example, ```Scala spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500") val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1") val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2") val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3") val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4") // join1 is a sort merge join. val join1 = t1.join(t2, t1("i1") === t2("i2")) // join2 is a broadcast join where t3 is broadcasted. val join2 = join1.join(t3, join1("i1") === t3("i3")) // Join on the column from the broadcasted side (i3). val join3 = join2.join(t4, join2("i3") === t4("i4")) join3.explain ``` You see that `Exchange hashpartitioning(i2#103, 200)` is introduced because there is no output partitioning info from the build side. ``` == Physical Plan == *(6) SortMergeJoin [i3#29], [i4#40], Inner :- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i3#29, 200), true, [id=#55] : +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight : :- *(3) SortMergeJoin [i1#7], [i2#18], Inner : : :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(i1#7, 200), true, [id=#28] : : : +- LocalTableScan [i1#7, j1#8] : : +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i2#18, 200), true, [id=#29] : : +- LocalTableScan [i2#18, j2#19] : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#34] : +- LocalTableScan [i3#29, j3#30] +- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i4#40, 200), true, [id=#39] +- LocalTableScan [i4#40, j4#41] ``` This PR proposes to introduce output partitioning for the build side for `BroadcastHashJoinExec` if the streamed side has a `HashPartitioning` or a collection of `HashPartitioning`s. There is a new internal config `spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit`, which can limit the number of partitioning a `HashPartitioning` can expand to. It can be set to "0" to disable this feature. ### Why are the changes needed? To remove unnecessary shuffle. ### Does this PR introduce _any_ user-facing change? Yes, now the shuffle in the above example can be eliminated: ``` == Physical Plan == *(5) SortMergeJoin [i3#108], [i4#119], Inner :- *(3) Sort [i3#108 ASC NULLS FIRST], false, 0 : +- *(3) BroadcastHashJoin [i1#86], [i3#108], Inner, BuildRight : :- *(3) SortMergeJoin [i1#86], [i2#97], Inner : : :- *(1) Sort [i1#86 ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(i1#86, 200), true, [id=#120] : : : +- LocalTableScan [i1#86, j1#87] : : +- *(2) Sort [i2#97 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(i2#97, 200), true, [id=#121] : : +- LocalTableScan [i2#97, j2#98] : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#126] : +- LocalTableScan [i3#108, j3#109] +- *(4) Sort [i4#119 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i4#119, 200), true, [id=#130] +- LocalTableScan [i4#119, j4#120] ``` ### How was this patch tested? Added new tests. Closes apache#28676 from imback82/broadcast_join_output. Authored-by: Terry Kim <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.