Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark 2.4.1 版本UT 会卡住,jenkins上跑test 会OOM, 影响 kyspark-2.4.1.x kyspark-2.4.1.x-#19 kyspark-2.4.1.x-#23 kyspark-2.4.1.x-aron 分支 #28

Open
zheniantoushipashi opened this issue Jul 9, 2019 · 2 comments

Comments

@zheniantoushipashi
Copy link

image.png

@zheniantoushipashi zheniantoushipashi changed the title spark 2.4.1 版本UT 会卡住,jenkins上跑test 会OOM spark 2.4.1 版本UT 会卡住,jenkins上跑test 会OOM, 影响 kyspark-2.4.1.x kyspark-2.4.1.x-#19 kyspark-2.4.1.x-#23 kyspark-2.4.1.x-aron 分支 Jul 9, 2019
@zheniantoushipashi
Copy link
Author

经过分析是由

985213b#diff-3c00ff6ff579027007d0e60ba47fbf5e

提交的 accumulator performance 导致,这个不是需要的 UT, 先做删除处理

@zheniantoushipashi
Copy link
Author

#29

7mming7 pushed a commit that referenced this issue Nov 4, 2020
…or its output partitioning

### What changes were proposed in this pull request?

Currently, the `BroadcastHashJoinExec`'s `outputPartitioning` only uses the streamed side's `outputPartitioning`. However, if the join type of `BroadcastHashJoinExec` is an inner-like join, the build side's info (the join keys) can be added to `BroadcastHashJoinExec`'s `outputPartitioning`.

 For example,
```Scala
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "500")
val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")

// join1 is a sort merge join.
val join1 = t1.join(t2, t1("i1") === t2("i2"))

// join2 is a broadcast join where t3 is broadcasted.
val join2 = join1.join(t3, join1("i1") === t3("i3"))

// Join on the column from the broadcasted side (i3).
val join3 = join2.join(t4, join2("i3") === t4("i4"))

join3.explain
```
You see that `Exchange hashpartitioning(i2#103, 200)` is introduced because there is no output partitioning info from the build side.
```
== Physical Plan ==
*(6) SortMergeJoin [i3#29], [i4#40], Inner
:- *(4) Sort [i3#29 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i3#29, 200), true, [id=#55]
:     +- *(3) BroadcastHashJoin [i1#7], [i3#29], Inner, BuildRight
:        :- *(3) SortMergeJoin [i1#7], [i2#18], Inner
:        :  :- *(1) Sort [i1#7 ASC NULLS FIRST], false, 0
:        :  :  +- Exchange hashpartitioning(i1#7, 200), true, [id=#28]
:        :  :     +- LocalTableScan [i1#7, j1#8]
:        :  +- *(2) Sort [i2#18 ASC NULLS FIRST], false, 0
:        :     +- Exchange hashpartitioning(i2#18, 200), true, [id=#29]
:        :        +- LocalTableScan [i2#18, j2#19]
:        +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#34]
:           +- LocalTableScan [i3#29, j3#30]
+- *(5) Sort [i4#40 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i4#40, 200), true, [id=#39]
      +- LocalTableScan [i4#40, j4#41]
```
This PR proposes to introduce output partitioning for the build side for `BroadcastHashJoinExec` if the streamed side has a `HashPartitioning` or a collection of `HashPartitioning`s.

There is a new internal config `spark.sql.execution.broadcastHashJoin.outputPartitioningExpandLimit`, which can limit the number of partitioning a `HashPartitioning` can expand to. It can be set to "0" to disable this feature.

### Why are the changes needed?

To remove unnecessary shuffle.

### Does this PR introduce _any_ user-facing change?

Yes, now the shuffle in the above example can be eliminated:
```
== Physical Plan ==
*(5) SortMergeJoin [i3#108], [i4#119], Inner
:- *(3) Sort [i3#108 ASC NULLS FIRST], false, 0
:  +- *(3) BroadcastHashJoin [i1#86], [i3#108], Inner, BuildRight
:     :- *(3) SortMergeJoin [i1#86], [i2#97], Inner
:     :  :- *(1) Sort [i1#86 ASC NULLS FIRST], false, 0
:     :  :  +- Exchange hashpartitioning(i1#86, 200), true, [id=#120]
:     :  :     +- LocalTableScan [i1#86, j1#87]
:     :  +- *(2) Sort [i2#97 ASC NULLS FIRST], false, 0
:     :     +- Exchange hashpartitioning(i2#97, 200), true, [id=#121]
:     :        +- LocalTableScan [i2#97, j2#98]
:     +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#126]
:        +- LocalTableScan [i3#108, j3#109]
+- *(4) Sort [i4#119 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i4#119, 200), true, [id=#130]
      +- LocalTableScan [i4#119, j4#120]
```

### How was this patch tested?

Added new tests.

Closes apache#28676 from imback82/broadcast_join_output.

Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant