Skip to content
This repository has been archived by the owner on Jun 14, 2024. It is now read-only.

Commit

Permalink
Apply JoinIndexRule only for SortMergeJoin (#502)
Browse files Browse the repository at this point in the history
  • Loading branch information
sezruby authored and paryoja committed Nov 4, 2021
1 parent 82fa8d2 commit 4d93770
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hyperspace.shim

import org.apache.spark.sql.{ExperimentalMethods, SparkSession}
import org.apache.spark.sql.execution.SparkPlanner

class SparkPlannerShim(spark: SparkSession)
extends SparkPlanner(spark.sparkContext, spark.sessionState.conf, new ExperimentalMethods) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hyperspace.shim

import org.apache.spark.sql.{ExperimentalMethods, SparkSession}
import org.apache.spark.sql.execution.SparkPlanner

class SparkPlannerShim(spark: SparkSession)
extends SparkPlanner(spark, spark.sessionState.conf, new ExperimentalMethods) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (2021) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hyperspace.shim

import org.apache.spark.sql.{ExperimentalMethods, SparkSession}
import org.apache.spark.sql.execution.SparkPlanner

class SparkPlannerShim(spark: SparkSession)
extends SparkPlanner(spark, new ExperimentalMethods) {}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.util.Try
import org.apache.spark.sql.catalyst.analysis.CleanupAliases
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.hyperspace.shim.SparkPlannerShim

import com.microsoft.hyperspace.Hyperspace
import com.microsoft.hyperspace.index.{IndexLogEntry, IndexLogEntryTags}
Expand Down Expand Up @@ -70,13 +72,21 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
isJoinConditionSupported(condition)
}

val sortMergeJoinCond = withFilterReasonTag(
plan,
leftAndRightIndexes,
FilterReasons.NotEligibleJoin("Not SortMergeJoin")) {
isSortMergeJoin(plan)
}

val leftPlanLinearCond =
withFilterReasonTag(
plan,
leftAndRightIndexes,
FilterReasons.NotEligibleJoin("Non linear left child plan")) {
isPlanLinear(l)
}

val rightPlanLinearCond =
withFilterReasonTag(
plan,
Expand All @@ -85,7 +95,7 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
isPlanLinear(r)
}

if (joinConditionCond && leftPlanLinearCond && rightPlanLinearCond) {
if (sortMergeJoinCond && joinConditionCond && leftPlanLinearCond && rightPlanLinearCond) {
// Set join query context.
JoinIndexRule.leftRelation.set(left.get)
JoinIndexRule.rightRelation.set(right.get)
Expand All @@ -109,6 +119,11 @@ object JoinPlanNodeFilter extends QueryPlanIndexFilter {
}
}

private def isSortMergeJoin(join: LogicalPlan): Boolean = {
val execJoin = new SparkPlannerShim(spark).JoinSelection(join)
execJoin.head.isInstanceOf[SortMergeJoinExec]
}

/**
* Checks whether a logical plan is linear. Linear means starting at the top, each node in the
* plan has at most one child.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper {
*/
override def beforeAll(): Unit = {
super.beforeAll()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

val t1Location =
new InMemoryFileIndex(spark, Seq(new Path("t1")), Map.empty, Some(t1Schema), NoopCache)
Expand Down Expand Up @@ -127,6 +128,23 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper {
verifyUpdatedIndex(originalPlan, updatedPlan, indexPaths)
}

test("Join rule doesn't update plan if it's broadcast join.") {
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "10241024") {
val joinCondition = EqualTo(t1c1, t2c1)
val originalPlan =
Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition))
val allIndexes = IndexCollectionManager(spark).getIndexes(Seq(Constants.States.ACTIVE))
val (updatedPlan, _) = applyJoinIndexRuleHelper(originalPlan, allIndexes)
assert(updatedPlan.equals(originalPlan))
allIndexes.foreach { index =>
val reasons = index.getTagValue(originalPlan, IndexLogEntryTags.FILTER_REASONS)
assert(reasons.isDefined)
val msg = reasons.get.map(_.verboseStr)
assert(msg.exists(_.contains("Join condition is not eligible. Reason: Not SortMergeJoin")))
}
}
}

test("Join rule works if indexes exist for case insensitive index and query.") {
val t1c1Caps = t1c1.withName("T1C1")

Expand All @@ -142,8 +160,7 @@ class JoinIndexRuleTest extends HyperspaceRuleSuite with SQLHelper {
test("Join rule does not update plan if index location is not set.") {
withSQLConf(IndexConstants.INDEX_SYSTEM_PATH -> "") {
val joinCondition = EqualTo(t1c1, t2c1)
val originalPlan =
Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition))
val originalPlan = Join(t1ProjectNode, t2ProjectNode, JoinType("inner"), Some(joinCondition))

try {
applyJoinIndexRuleHelper(originalPlan)
Expand Down

0 comments on commit 4d93770

Please sign in to comment.