Skip to content

Commit

Permalink
[SPARK-33385][SQL] Support bucket pruning for IsNaN
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This pr add support bucket pruning on `IsNaN` predicate.

### Why are the changes needed?

Improve query performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #30291 from wangyum/SPARK-33385.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
wangyum authored and cloud-fan committed Nov 9, 2020
1 parent 69799c5 commit 7a5647a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.sql.types.{DoubleType, FloatType}
import org.apache.spark.util.collection.BitSet

/**
Expand Down Expand Up @@ -93,6 +94,12 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
getBucketSetFromIterable(a, hset)
case expressions.IsNull(a: Attribute) if a.name == bucketColumnName =>
getBucketSetFromValue(a, null)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == FloatType =>
getBucketSetFromValue(a, Float.NaN)
case expressions.IsNaN(a: Attribute)
if a.name == bucketColumnName && a.dataType == DoubleType =>
getBucketSetFromValue(a, Double.NaN)
case expressions.And(left, right) =>
getExpressionBuckets(left, bucketColumnName, numBuckets) &
getExpressionBuckets(right, bucketColumnName, numBuckets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
// 2) Verify the final result is the same as the expected one
private def checkPrunedAnswers(
bucketSpec: BucketSpec,
bucketValues: Seq[Integer],
bucketValues: Seq[Any],
filterCondition: Column,
originalDataFrame: DataFrame): Unit = {
// This test verifies parts of the plan. Disable whole stage codegen.
Expand Down Expand Up @@ -245,6 +245,25 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils {
}
}

test("bucket pruning support IsNaN") {
withTable("bucketed_table") {
val numBuckets = NumBucketsForPruningNullDf
val bucketSpec = BucketSpec(numBuckets, Seq("j"), Nil)
val naNDF = nullDF.selectExpr("i", "cast(if(isnull(j), 'NaN', j) as double) as j", "k")
// json does not support predicate push-down, and thus json is used here
naNDF.write
.format("json")
.bucketBy(numBuckets, "j")
.saveAsTable("bucketed_table")

checkPrunedAnswers(
bucketSpec,
bucketValues = Double.NaN :: Nil,
filterCondition = $"j".isNaN,
naNDF)
}
}

test("read partitioning bucketed tables having composite filters") {
withTable("bucketed_table") {
val numBuckets = NumBucketsForPruningDF
Expand Down

0 comments on commit 7a5647a

Please sign in to comment.