Skip to content

Commit

Permalink
Generalize pattern for planning hash joins.
Browse files Browse the repository at this point in the history
This will be helpful for [SPARK-1495](https://issues.apache.org/jira/browse/SPARK-1495) and other cases where we want to have custom hash join implementations but don't want to repeat the logic for finding the join keys.

Author: Michael Armbrust <[email protected]>

Closes apache#418 from marmbrus/hashFilter and squashes the following commits:

d5cc79b [Michael Armbrust] Address @rxin 's comments.
366b6d9 [Michael Armbrust] style fixes
14560eb [Michael Armbrust] Generalize pattern for planning hash joins.
f4809c1 [Michael Armbrust] Move common functions to PredicateHelper.
  • Loading branch information
marmbrus authored and pdeyhim committed Jun 25, 2014
1 parent 48248a2 commit 8cc2307
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.types.{BooleanType, StringType, TimestampType}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.types.BooleanType


object InterpretedPredicate {
def apply(expression: Expression): (Row => Boolean) = {
Expand All @@ -37,10 +38,26 @@ trait Predicate extends Expression {
}

trait PredicateHelper {
def splitConjunctivePredicates(condition: Expression): Seq[Expression] = condition match {
case And(cond1, cond2) => splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
protected def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
condition match {
case And(cond1, cond2) =>
splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
case other => other :: Nil
}
}

/**
* Returns true if `expr` can be evaluated using only the output of `plan`. This method
* can be used to determine when is is acceptable to move expression evaluation within a query
* plan.
*
* For example consider a join between two relations R(a, b) and S(c, d).
*
* `canEvaluate(Equals(a,b), R)` returns `true` where as `canEvaluate(Equals(a,c), R)` returns
* `false`.
*/
protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
expr.references.subsetOf(plan.outputSet)
}

abstract class BinaryPredicate extends BinaryExpression with Predicate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package org.apache.spark.sql.catalyst.planning

import scala.annotation.tailrec

import org.apache.spark.sql.Logging

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._

/**
Expand Down Expand Up @@ -101,6 +104,55 @@ object PhysicalOperation extends PredicateHelper {
}
}

/**
* A pattern that finds joins with equality conditions that can be evaluated using hashing
* techniques. For inner joins, any filters on top of the join operator are also matched.
*/
object HashFilteredJoin extends Logging with PredicateHelper {
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
type ReturnType =
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)

def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
// All predicates can be evaluated for inner join (i.e., those that are in the ON
// clause and WHERE clause.)
case FilteredOperation(predicates, join @ Join(left, right, Inner, condition)) =>
logger.debug(s"Considering hash inner join on: ${predicates ++ condition}")
splitPredicates(predicates ++ condition, join)
case join @ Join(left, right, joinType, condition) =>
logger.debug(s"Considering hash join on: $condition")
splitPredicates(condition.toSeq, join)
case _ => None
}

// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
def splitPredicates(allPredicates: Seq[Expression], join: Join): Option[ReturnType] = {
val Join(left, right, joinType, _) = join
val (joinPredicates, otherPredicates) = allPredicates.partition {
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
}

val joinKeys = joinPredicates.map {
case Equals(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case Equals(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}

// Do not consider this strategy if there are no join keys.
if (joinKeys.nonEmpty) {
val leftKeys = joinKeys.map(_._1)
val rightKeys = joinKeys.map(_._2)

Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right))
} else {
logger.debug(s"Avoiding hash join with no join keys.")
None
}
}
}

/**
* A pattern that collects all adjacent unions and returns their children as a Seq.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,51 +28,16 @@ import org.apache.spark.sql.parquet._
private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
self: SQLContext#SparkPlanner =>

object HashJoin extends Strategy {
object HashJoin extends Strategy with PredicateHelper {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case FilteredOperation(predicates, logical.Join(left, right, Inner, condition)) =>
logger.debug(s"Considering join: ${predicates ++ condition}")
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys. Note we can only mix in the conditions with other predicates because the
// match above ensures that this is and Inner join.
val (joinPredicates, otherPredicates) = (predicates ++ condition).partition {
case Equals(l, r) if (canEvaluate(l, left) && canEvaluate(r, right)) ||
(canEvaluate(l, right) && canEvaluate(r, left)) => true
case _ => false
}

val joinKeys = joinPredicates.map {
case Equals(l,r) if canEvaluate(l, left) && canEvaluate(r, right) => (l, r)
case Equals(l,r) if canEvaluate(l, right) && canEvaluate(r, left) => (r, l)
}

// Do not consider this strategy if there are no join keys.
if (joinKeys.nonEmpty) {
val leftKeys = joinKeys.map(_._1)
val rightKeys = joinKeys.map(_._2)

val joinOp = execution.HashJoin(
leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))

// Make sure other conditions are met if present.
if (otherPredicates.nonEmpty) {
execution.Filter(combineConjunctivePredicates(otherPredicates), joinOp) :: Nil
} else {
joinOp :: Nil
}
} else {
logger.debug(s"Avoiding spark join with no join keys.")
Nil
}
// Find inner joins where at least some predicates can be evaluated by matching hash keys
// using the HashFilteredJoin pattern.
case HashFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) =>
val hashJoin =
execution.HashJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right))
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
case _ => Nil
}

private def combineConjunctivePredicates(predicates: Seq[Expression]) =
predicates.reduceLeft(And)

/** Returns true if `expr` can be evaluated using only the output of `plan`. */
protected def canEvaluate(expr: Expression, plan: LogicalPlan): Boolean =
expr.references subsetOf plan.outputSet
}

object PartialAggregation extends Strategy {
Expand Down

0 comments on commit 8cc2307

Please sign in to comment.