Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32383][SQL] Preserve hash join (BHJ and SHJ) stream side ordering #29181

Closed
wants to merge 2 commits into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Jul 22, 2020

What changes were proposed in this pull request?

Currently BroadcastHashJoinExec and ShuffledHashJoinExec do not preserve children output ordering information (inherit from SparkPlan.outputOrdering, which is Nil). This can add unnecessary sort in complex queries involved multiple joins.

Example:

withSQLConf(
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") {
      val df1 = spark.range(100).select($"id".as("k1"))
      val df2 = spark.range(100).select($"id".as("k2"))
      val df3 = spark.range(3).select($"id".as("k3"))
      val df4 = spark.range(100).select($"id".as("k4"))
      val plan = df1.join(df2, $"k1" === $"k2")
        .join(df3, $"k1" === $"k3")
        .join(df4, $"k1" === $"k4")
        .queryExecution
        .executedPlan
}

Current physical plan (extra sort on k1 before top sort merge join):

*(9) SortMergeJoin [k1#220L], [k4#232L], Inner
:- *(6) Sort [k1#220L ASC NULLS FIRST], false, 0
:  +- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight
:     :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner
:     :  :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0
:     :  :  +- Exchange hashpartitioning(k1#220L, 5), true, [id=#128]
:     :  :     +- *(1) Project [id#218L AS k1#220L]
:     :  :        +- *(1) Range (0, 100, step=1, splits=2)
:     :  +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0
:     :     +- Exchange hashpartitioning(k2#224L, 5), true, [id=#134]
:     :        +- *(3) Project [id#222L AS k2#224L]
:     :           +- *(3) Range (0, 100, step=1, splits=2)
:     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#141]
:        +- *(5) Project [id#226L AS k3#228L]
:           +- *(5) Range (0, 3, step=1, splits=2)
+- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(k4#232L, 5), true, [id=#148]
      +- *(7) Project [id#230L AS k4#232L]
         +- *(7) Range (0, 100, step=1, splits=2)

Ideal physical plan (no extra sort on k1 before top sort merge join):

*(9) SortMergeJoin [k1#220L], [k4#232L], Inner
:- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight
:  :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner
:  :  :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0
:  :  :  +- Exchange hashpartitioning(k1#220L, 5), true, [id=#127]
:  :  :     +- *(1) Project [id#218L AS k1#220L]
:  :  :        +- *(1) Range (0, 100, step=1, splits=2)
:  :  +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0
:  :     +- Exchange hashpartitioning(k2#224L, 5), true, [id=#133]
:  :        +- *(3) Project [id#222L AS k2#224L]
:  :           +- *(3) Range (0, 100, step=1, splits=2)
:  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#140]
:     +- *(5) Project [id#226L AS k3#228L]
:        +- *(5) Range (0, 3, step=1, splits=2)
+- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(k4#232L, 5), true, [id=#146]
      +- *(7) Project [id#230L AS k4#232L]
         +- *(7) Range (0, 100, step=1, splits=2)

Why are the changes needed?

To avoid unnecessary sort in query, and it has most impact when users read sorted bucketed table.
Though the unnecessary sort is operating on already sorted data, it would have obvious negative impact on IO and query run time if the data is large and external sorting happens.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unit test in JoinSuite.

@c21
Copy link
Contributor Author

c21 commented Jul 22, 2020

cc @cloud-fan and @sameeragarwal if you guys can help take a look. Thanks!

@SparkQA
Copy link

SparkQA commented Jul 22, 2020

Test build #126288 has finished for PR 29181 at commit 5235604.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@bart-samwel
Copy link

Can you double check that the ordering is correct if there are NULLs involved, or outer join conditions? The tricky cases I see:

  • RIGHT / FULL SHJ. If the streaming / "probe" input is ordered by (some of) the join keys. After consuming the streaming input, the hash join will emit rows for build side rows that didn't have matches. Those rows may actually have values for the join keys, and those will end up in the output. Those will be out of order.

  • RIGHT / FULL SHJ. If the streaming / "prob" input is ordered by some non-join keys. After consuming the streaming input, the hash join will emit rows with NULL values for the streaming input's columns, which include the ordering keys. This may be correct if Spark's ordering property has "nulls last", but it may not be correct even then. For instance, if the input is ordered by (JOINKEY1, NONJOINKEY1) with NULLS LAST, then a final output ordering may look like:

(1, 'a')
(2, 'c')
(3, 'b')
(1, NULL)

But the correct ordering for NULLS LAST is

(1, 'a')
(1, NULL)
(2, 'c')
(3, 'b')

@c21
Copy link
Contributor Author

c21 commented Jul 22, 2020

@bart-samwel - just to bring us in the same page.

Current spark scala/java implementation for hash join (broadcast hash join and shuffled hash join) has following restriction:

Both of cases you mentioned are to do right outer join, with left stream side. This will not happen.

A separate topic: I think it would be interesting to explore support full outer join in shuffled hash join and broadcast hash join where I discussed with @cloud-fan in another PR. I created a JIRA for this now - https://issues.apache.org/jira/browse/SPARK-32399. This should help save shuffle and sort as currently for full outer join, we always do a sort merge join no matter of table size. BTW, does delta engine support full outer join in hash join? Would like to understand more here. Thanks.

@@ -54,6 +54,8 @@ trait HashJoin extends BaseJoinExec {

override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning

override def outputOrdering: Seq[SortOrder] = streamedPlan.outputOrdering
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be future-proof, we should only do it if the join type allows us to do so. It's fragile to rely on what the join type can be in hash join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan - agree. Updated both outputPartitioning and outputOrdering to be based on join type and build side.

@bart-samwel
Copy link

@bart-samwel - just to bring us in the same page.

Current spark scala/java implementation for hash join (broadcast hash join and shuffled hash join) has following restriction:

Both of cases you mentioned are to do right outer join, with left stream side. This will not happen.

A separate topic: I think it would be interesting to explore support full outer join in shuffled hash join and broadcast hash join where I discussed with @cloud-fan in another PR. I created a JIRA for this now - https://issues.apache.org/jira/browse/SPARK-32399. This should help save shuffle and sort as currently for full outer join, we always do a sort merge join no matter of table size. BTW, does delta engine support full outer join in hash join? Would like to understand more here. Thanks.

Doing full outer for SHJ is not that hard so we should have that. BHJ is harder because you have to merge the "probedness" of all tasks before figuring out which rows you need to emit. (Delta engine will indeed support full outer join in SHJ.)

Let's be future proof for these cases!

@c21
Copy link
Contributor Author

c21 commented Jul 23, 2020

Delta engine will indeed support full outer join in SHJ.

@bart-samwel - sounds good. I will work on to support full outer join in SHJ at its current java stack then in https://issues.apache.org/jira/browse/SPARK-32399.

BHJ is harder because you have to merge the "probedness" of all tasks before figuring out which rows you need to emit.

For BHJ, every task gets a copy of whole build side. So I am thinking for each task, iterating all rows for build side, after exhausting stream side, and only emitting rows for its own part (we can rely on hash, e.g. task i only emits build side row if hash(build_side_row_keys) % num_partitions_of_RDD == i).

@SparkQA
Copy link

SparkQA commented Jul 23, 2020

Test build #126427 has finished for PR 29181 at commit df8b32a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@c21
Copy link
Contributor Author

c21 commented Jul 25, 2020

@cloud-fan gentle ping, could you help take another look? Thanks.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 01cf8a4 Jul 27, 2020
@c21
Copy link
Contributor Author

c21 commented Jul 27, 2020

Thanks @cloud-fan and @bart-samwel for review and discussion!

@c21 c21 deleted the ordering branch July 27, 2020 05:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants