Skip to content

Commit

Permalink
Explicitly match join type and build side for partitioning and ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
c21 committed Jul 23, 2020
1 parent 387feee commit df8b32a
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,41 @@ trait HashJoin extends BaseJoinExec {
}
}

override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
override def outputPartitioning: Partitioning = buildSide match {
case BuildLeft =>
joinType match {
case _: InnerLike | RightOuter => right.outputPartitioning
case x =>
throw new IllegalArgumentException(
s"HashJoin should not take $x as the JoinType with building left side")
}
case BuildRight =>
joinType match {
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin =>
left.outputPartitioning
case x =>
throw new IllegalArgumentException(
s"HashJoin should not take $x as the JoinType with building right side")
}
}

override def outputOrdering: Seq[SortOrder] = streamedPlan.outputOrdering
override def outputOrdering: Seq[SortOrder] = buildSide match {
case BuildLeft =>
joinType match {
case _: InnerLike | RightOuter => right.outputOrdering
case x =>
throw new IllegalArgumentException(
s"HashJoin should not take $x as the JoinType with building left side")
}
case BuildRight =>
joinType match {
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin =>
left.outputOrdering
case x =>
throw new IllegalArgumentException(
s"HashJoin should not take $x as the JoinType with building right side")
}
}

protected lazy val (buildPlan, streamedPlan) = buildSide match {
case BuildLeft => (left, right)
Expand Down
41 changes: 23 additions & 18 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1114,15 +1114,17 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
// Test broadcast hash join
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") {
val plan = df1.join(df2, $"k1" === $"k2")
.join(df3, $"k1" === $"k3")
.join(df4, $"k1" === $"k4")
.queryExecution
.executedPlan
assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2)
assert(plan.collect { case _: BroadcastHashJoinExec => true }.size === 1)
// No extra sort before last sort merge join
assert(plan.collect { case _: SortExec => true }.size === 3)
Seq("inner", "left_outer").foreach(joinType => {
val plan = df1.join(df2, $"k1" === $"k2", joinType)
.join(df3, $"k1" === $"k3", joinType)
.join(df4, $"k1" === $"k4", joinType)
.queryExecution
.executedPlan
assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2)
assert(plan.collect { case _: BroadcastHashJoinExec => true }.size === 1)
// No extra sort before last sort merge join
assert(plan.collect { case _: SortExec => true }.size === 3)
})
}

// Test shuffled hash join
Expand All @@ -1131,15 +1133,18 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val df3 = spark.range(10).select($"id".as("k3"))
val plan = df1.join(df2, $"k1" === $"k2")
.join(df3, $"k1" === $"k3")
.join(df4, $"k1" === $"k4")
.queryExecution
.executedPlan
assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2)
assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
// No extra sort before last sort merge join
assert(plan.collect { case _: SortExec => true }.size === 3)

Seq("inner", "left_outer").foreach(joinType => {
val plan = df1.join(df2, $"k1" === $"k2", joinType)
.join(df3, $"k1" === $"k3", joinType)
.join(df4, $"k1" === $"k4", joinType)
.queryExecution
.executedPlan
assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2)
assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
// No extra sort before last sort merge join
assert(plan.collect { case _: SortExec => true }.size === 3)
})
}
}
}

0 comments on commit df8b32a

Please sign in to comment.