Skip to content

Commit

Permalink
[SPARK-30842][SQL] Adjust abstraction structure for join operators
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Currently the join operators are not well abstracted, since there are lot of common logic. A trait can be created for easier pattern matching and other future handiness. This is a follow-up PR based on comment
apache#27509 (comment) .

This PR refined from the following aspects:
1. Refined structure of all physical join operators
2. Add missing joinType field for CartesianProductExec operator
3. Refined codes related to Explain Formatted

The EXPLAIN FORMATTED changes are
1. Converge all join operator `verboseStringWithOperatorId` implementations to `BaseJoinExec`. Join condition displayed, and join keys displayed if it’s not empty.
2. `#1` will add Join condition to `BroadcastNestedLoopJoinExec`.
3. `#1` will **NOT** affect `CartesianProductExec`,`SortMergeJoin` and `HashJoin`s, since they already got there override implementation before.
4. Converge all join operator `simpleStringWithNodeId` to `BaseJoinExec`, which will enhance the one line description for `CartesianProductExec` with `JoinType` added.
5. Override `simpleStringWithNodeId` in `BroadcastNestedLoopJoinExec` to show `BuildSide`, which was only done for `HashJoin`s before.

### Why are the changes needed?
Make the code consistent with other operators and for future handiness of join operators.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing tests

Closes apache#27595 from Eric5553/RefineJoin.

Authored-by: Eric Wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
Eric5553 authored and cloud-fan committed Feb 28, 2020
1 parent 14bb639 commit eba2076
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 60 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.joins

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.execution.{BinaryExecNode, ExplainUtils}

/**
* Holds common logic for join operators
*/
trait BaseJoinExec extends BinaryExecNode {
def joinType: JoinType
def condition: Option[Expression]
def leftKeys: Seq[Expression]
def rightKeys: Seq[Expression]

override def simpleStringWithNodeId(): String = {
val opId = ExplainUtils.getOpId(this)
s"$nodeName $joinType ($opId)".trim
}

override def verboseStringWithOperatorId(): String = {
val joinCondStr = if (condition.isDefined) {
s"${condition.get}"
} else "None"
if (leftKeys.nonEmpty || rightKeys.nonEmpty) {
s"""
|(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
|${ExplainUtils.generateFieldString("Left keys", leftKeys)}
|${ExplainUtils.generateFieldString("Right keys", rightKeys)}
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
""".stripMargin
} else {
s"""
|(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
""".stripMargin
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Distribution, UnspecifiedDistribution}
import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, SparkPlan}
import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.{BooleanType, LongType}

Expand All @@ -44,7 +44,7 @@ case class BroadcastHashJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends BinaryExecNode with HashJoin with CodegenSupport {
extends HashJoin with CodegenSupport {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.{ExplainUtils, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.collection.{BitSet, CompactBuffer}

Expand All @@ -32,7 +32,10 @@ case class BroadcastNestedLoopJoinExec(
right: SparkPlan,
buildSide: BuildSide,
joinType: JoinType,
condition: Option[Expression]) extends BinaryExecNode {
condition: Option[Expression]) extends BaseJoinExec {

override def leftKeys: Seq[Expression] = Nil
override def rightKeys: Seq[Expression] = Nil

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand All @@ -43,6 +46,11 @@ case class BroadcastNestedLoopJoinExec(
case BuildLeft => (right, left)
}

override def simpleStringWithNodeId(): String = {
val opId = ExplainUtils.getOpId(this)
s"$nodeName $joinType ${buildSide} ($opId)".trim
}

override def requiredChildDistribution: Seq[Distribution] = buildSide match {
case BuildLeft =>
BroadcastDistribution(IdentityBroadcastMode) :: UnspecifiedDistribution :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.spark.rdd.{CartesianPartition, CartesianRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, Predicate, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
import org.apache.spark.sql.execution.{BinaryExecNode, ExplainUtils, ExternalAppendOnlyUnsafeRowArray, SparkPlan}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.execution.{ExternalAppendOnlyUnsafeRowArray, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.CompletionIterator

Expand Down Expand Up @@ -60,23 +61,17 @@ class UnsafeCartesianRDD(
case class CartesianProductExec(
left: SparkPlan,
right: SparkPlan,
condition: Option[Expression]) extends BinaryExecNode {
condition: Option[Expression]) extends BaseJoinExec {

override def joinType: JoinType = Inner
override def leftKeys: Seq[Expression] = Nil
override def rightKeys: Seq[Expression] = Nil

override def output: Seq[Attribute] = left.output ++ right.output

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

override def verboseStringWithOperatorId(): String = {
val joinCondStr = if (condition.isDefined) {
s"${condition.get}"
} else "None"

s"""
|(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
""".stripMargin
}

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,39 +22,18 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{ExplainUtils, RowIterator, SparkPlan}
import org.apache.spark.sql.execution.{ExplainUtils, RowIterator}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.{IntegralType, LongType}

trait HashJoin {
self: SparkPlan =>

def leftKeys: Seq[Expression]
def rightKeys: Seq[Expression]
def joinType: JoinType
trait HashJoin extends BaseJoinExec {
def buildSide: BuildSide
def condition: Option[Expression]
def left: SparkPlan
def right: SparkPlan

override def simpleStringWithNodeId(): String = {
val opId = ExplainUtils.getOpId(this)
s"$nodeName $joinType ${buildSide} ($opId)".trim
}

override def verboseStringWithOperatorId(): String = {
val joinCondStr = if (condition.isDefined) {
s"${condition.get}"
} else "None"

s"""
|(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
|${ExplainUtils.generateFieldString("Left keys", leftKeys)}
|${ExplainUtils.generateFieldString("Right keys", rightKeys)}
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
""".stripMargin
}

override def output: Seq[Attribute] = {
joinType match {
case _: InnerLike =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
Expand All @@ -39,7 +39,7 @@ case class ShuffledHashJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan)
extends BinaryExecNode with HashJoin {
extends HashJoin {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ case class SortMergeJoinExec(
condition: Option[Expression],
left: SparkPlan,
right: SparkPlan,
isSkewJoin: Boolean = false) extends BinaryExecNode with CodegenSupport {
isSkewJoin: Boolean = false) extends BaseJoinExec with CodegenSupport {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand All @@ -52,23 +52,6 @@ case class SortMergeJoinExec(

override def stringArgs: Iterator[Any] = super.stringArgs.toSeq.dropRight(1).iterator

override def simpleStringWithNodeId(): String = {
val opId = ExplainUtils.getOpId(this)
s"$nodeName $joinType ($opId)".trim
}

override def verboseStringWithOperatorId(): String = {
val joinCondStr = if (condition.isDefined) {
s"${condition.get}"
} else "None"
s"""
|(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
|${ExplainUtils.generateFieldString("Left keys", leftKeys)}
|${ExplainUtils.generateFieldString("Right keys", rightKeys)}
|${ExplainUtils.generateFieldString("Join condition", joinCondStr)}
""".stripMargin
}

override def output: Seq[Attribute] = {
joinType match {
case _: InnerLike =>
Expand Down

0 comments on commit eba2076

Please sign in to comment.