Skip to content

Commit

Permalink
[SPARK-33427][SQL][FOLLOWUP] Put key and value into IdentityHashMap s…
Browse files Browse the repository at this point in the history
…equantially

### What changes were proposed in this pull request?

This follow-up fixes an issue when inserting key/value pairs into `IdentityHashMap` in `SubExprEvaluationRuntime`.

### Why are the changes needed?

The last commits to #30341 follows review comment to use `IdentityHashMap`. Because we leverage `IdentityHashMap` to compare keys in reference, we should not convert expression pairs to Scala map before inserting. Scala map compares keys by equality so we will loss keys with different references.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Run benchmark to verify.

Closes #30459 from viirya/SPARK-33427-map.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
viirya authored and HyukjinKwon committed Nov 23, 2020
1 parent a459238 commit aa78c05
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package org.apache.spark.sql.catalyst.expressions

import java.util.IdentityHashMap

import scala.collection.JavaConverters._

import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}

Expand Down Expand Up @@ -98,7 +96,12 @@ class SubExprEvaluationRuntime(cacheMaxEntries: Int) {
val proxy = ExpressionProxy(expr, proxyExpressionCurrentId, this)
proxyExpressionCurrentId += 1

proxyMap.putAll(e.map(_ -> proxy).toMap.asJava)
// We leverage `IdentityHashMap` so we compare expression keys by reference here.
// So for example if there are one group of common exprs like Seq(common expr 1,
// common expr2, ..., common expr n), we will insert into `proxyMap` some key/value
// pairs like Map(common expr 1 -> proxy(common expr 1), ...,
// common expr n -> proxy(common expr 1)).
e.map(proxyMap.put(_, proxy))
}

// Only adding proxy if we find subexpressions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,26 @@ class SubExprEvaluationRuntimeSuite extends SparkFunSuite {
})
assert(proxys.isEmpty)
}

test("SubExprEvaluationRuntime should wrap semantically equal exprs") {
val runtime = new SubExprEvaluationRuntime(1)

val one = Literal(1)
val two = Literal(2)
def mul: (Literal, Literal) => Expression =
(left: Literal, right: Literal) => Multiply(left, right)

val mul2_1 = Multiply(mul(one, two), mul(one, two))
val mul2_2 = Multiply(mul(one, two), mul(one, two))

val sqrt = Sqrt(mul2_1)
val sum = Add(mul2_2, sqrt)
val proxyExpressions = runtime.proxyExpressions(Seq(sum))
val proxys = proxyExpressions.flatMap(_.collect {
case p: ExpressionProxy => p
})
// ( (one * two) * (one * two) )
assert(proxys.size == 2)
assert(proxys.forall(_.child.semanticEquals(mul2_1)))
}
}

0 comments on commit aa78c05

Please sign in to comment.