Skip to content

Commit

Permalink
[SPARK-12218][SQL] Invalid splitting of nested AND expressions in Dat…
Browse files Browse the repository at this point in the history
…a Source filter API

JIRA: https://issues.apache.org/jira/browse/SPARK-12218

When creating filters for Parquet/ORC, we should not push nested AND expressions partially.

Author: Yin Huai <[email protected]>

Closes #10362 from yhuai/SPARK-12218.

(cherry picked from commit 41ee7c5)
Signed-off-by: Yin Huai <[email protected]>

Conflicts:
	sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
  • Loading branch information
yhuai committed Dec 18, 2015
1 parent d2f71c2 commit afffe24
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,17 @@ private[sql] object ParquetFilters {
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))

case sources.And(lhs, rhs) =>
(createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and)
// At here, it is not safe to just convert one side if we do not understand the
// other side. Here is an example used to explain the reason.
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
// NOT(a = 2), which will generate wrong results.
// Pushing one side of AND down is only safe to do at the top level.
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
for {
lhsFilter <- createFilter(schema, lhs)
rhsFilter <- createFilter(schema, rhs)
} yield FilterApi.and(lhsFilter, rhsFilter)

case sources.Or(lhs, rhs) =>
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,4 +336,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}

test("SPARK-12218: 'Not' is included in Parquet filter pushdown") {
import testImplicits._

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
(1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.parquet(path)

checkAnswer(
sqlContext.read.parquet(path).where("not (a = 2) or not(b in ('1'))"),
(1 to 5).map(i => Row(i, (i % 2).toString)))

checkAnswer(
sqlContext.read.parquet(path).where("not (a = 2 and b in ('1'))"),
(1 to 5).map(i => Row(i, (i % 2).toString)))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,20 @@ private[orc] object OrcFilters extends Logging {

expression match {
case And(left, right) =>
val tryLeft = buildSearchArgument(left, newBuilder)
val tryRight = buildSearchArgument(right, newBuilder)

val conjunction = for {
_ <- tryLeft
_ <- tryRight
// At here, it is not safe to just convert one side if we do not understand the
// other side. Here is an example used to explain the reason.
// Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to
// convert b in ('1'). If we only convert a = 2, we will end up with a filter
// NOT(a = 2), which will generate wrong results.
// Pushing one side of AND down is only safe to do at the top level.
// You can see ParquetRelation's initializeLocalJobFunc method as an example.
for {
_ <- buildSearchArgument(left, newBuilder)
_ <- buildSearchArgument(right, newBuilder)
lhs <- buildSearchArgument(left, builder.startAnd())
rhs <- buildSearchArgument(right, lhs)
} yield rhs.end()

// For filter `left AND right`, we can still push down `left` even if `right` is not
// convertible, and vice versa.
conjunction
.orElse(tryLeft.flatMap(_ => buildSearchArgument(left, builder)))
.orElse(tryRight.flatMap(_ => buildSearchArgument(right, builder)))

case Or(left, right) =>
for {
_ <- buildSearchArgument(left, newBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.orc
import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{Row, SQLConf}
import org.apache.spark.sql.sources.HadoopFsRelationTest
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -61,4 +62,23 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
"dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load())
}
}

test("SPARK-12218: 'Not' is included in ORC filter pushdown") {
import testImplicits._

withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
(1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path)

checkAnswer(
sqlContext.read.orc(path).where("not (a = 2) or not(b in ('1'))"),
(1 to 5).map(i => Row(i, (i % 2).toString)))

checkAnswer(
sqlContext.read.orc(path).where("not (a = 2 and b in ('1'))"),
(1 to 5).map(i => Row(i, (i % 2).toString)))
}
}
}
}

0 comments on commit afffe24

Please sign in to comment.