Skip to content

Commit

Permalink
[SPARK-32268][SQL][FOLLOWUP] Add RewritePredicateSubquery below the I…
Browse files Browse the repository at this point in the history
…njectRuntimeFilter

### What changes were proposed in this pull request?

Add `RewritePredicateSubquery` below the `InjectRuntimeFilter` in `SparkOptimizer`.

### Why are the changes needed?

It seems if the runtime use in-subquery to do the filter, it won't be converted to semi-join as the design said.

This pr fixes the issue.

### Does this PR introduce _any_ user-facing change?

No, not released

### How was this patch tested?

Improve the test by adding: ensure the semi-join exists if the runtime filter use in-subquery code path.

Closes apache#35998 from ulysses-you/SPARK-32268-FOllOWUP.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
ulysses-you authored and cloud-fan committed Mar 29, 2022
1 parent 8fab597 commit c0c52dd
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class SparkOptimizer(
Batch("PartitionPruning", Once,
PartitionPruning) :+
Batch("InjectRuntimeFilter", FixedPoint(1),
InjectRuntimeFilter) :+
InjectRuntimeFilter,
RewritePredicateSubquery) :+
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
PushDownPredicates) :+
Batch("Cleanup filters that cannot be pushed down", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.{Alias, BloomFilterMightContain, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, BloomFilterAggregate}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.LeftSemi
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Join, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
import org.apache.spark.sql.types.{IntegerType, StructType}
Expand Down Expand Up @@ -213,6 +214,15 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
super.afterAll()
}

private def ensureLeftSemiJoinExists(plan: LogicalPlan): Unit = {
assert(
plan.find {
case j: Join if j.joinType == LeftSemi => true
case _ => false
}.isDefined
)
}

def checkWithAndWithoutFeatureEnabled(query: String, testSemiJoin: Boolean,
shouldReplace: Boolean): Unit = {
var planDisabled: LogicalPlan = null
Expand All @@ -234,6 +244,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp
if (shouldReplace) {
val normalizedEnabled = normalizePlan(normalizeExprIds(planEnabled))
val normalizedDisabled = normalizePlan(normalizeExprIds(planDisabled))
ensureLeftSemiJoinExists(planEnabled)
assert(normalizedEnabled != normalizedDisabled)
} else {
comparePlans(planDisabled, planEnabled)
Expand Down

0 comments on commit c0c52dd

Please sign in to comment.