diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 9197b8b5637eb..a0abfe458d498 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -17,15 +17,17 @@ package org.apache.spark.sql.execution.datasources.parquet -import org.apache.parquet.filter2.predicate.Operators._ +import org.apache.parquet.filter2.predicate.FilterApi._ +import org.apache.parquet.filter2.predicate.Operators.{Column => _, _} import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators} -import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation} import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types._ /** * A test suite that tests Parquet filter2 API based filter pushdown optimization. @@ -382,6 +384,42 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } + test("SPARK-12218 Converting conjunctions into Parquet filter predicates") { + val schema = StructType(Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", StringType, nullable = true), + StructField("c", DoubleType, nullable = true) + )) + + assertResult(Some(and( + lt(intColumn("a"), 10: Integer), + gt(doubleColumn("c"), 1.5: java.lang.Double))) + ) { + ParquetFilters.createFilter( + schema, + sources.And( + sources.LessThan("a", 10), + sources.GreaterThan("c", 1.5D))) + } + + assertResult(None) { + ParquetFilters.createFilter( + schema, + sources.And( + sources.LessThan("a", 10), + sources.StringContains("b", "prefix"))) + } + + assertResult(None) { + ParquetFilters.createFilter( + schema, + sources.Not( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")))) + } + } + test("SPARK-11164: test the parquet filter in") { import testImplicits._ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index ebfb1759b8b96..165210f9ff301 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -26,15 +26,47 @@ import org.apache.spark.Logging import org.apache.spark.sql.sources._ /** - * It may be optimized by push down partial filters. But we are conservative here. - * Because if some filters fail to be parsed, the tree may be corrupted, - * and cannot be used anymore. + * Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down. + * + * Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double- + * checking pattern when converting `And`/`Or`/`Not` filters. + * + * An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't + * build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite + * different from the cases in Spark SQL or Parquet, where complex filters can be easily built using + * existing simpler ones. + * + * The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and + * `startNot()` mutate internal state of the builder instance. This forces us to translate all + * convertible filters with a single builder instance. However, before actually converting a filter, + * we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is + * found, we may already end up with a builder whose internal state is inconsistent. + * + * For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then + * try to convert its children. Say we convert `left` child successfully, but find that `right` + * child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent + * now. + * + * The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their + * children with brand new builders, and only do the actual conversion with the right builder + * instance when the children are proven to be convertible. + * + * P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of + * builder methods mentioned above can only be found in test code, where all tested filters are + * known to be convertible. */ private[orc] object OrcFilters extends Logging { def createFilter(filters: Array[Filter]): Option[SearchArgument] = { + // First, tries to convert each filter individually to see whether it's convertible, and then + // collect all convertible ones to build the final `SearchArgument`. + val convertibleFilters = for { + filter <- filters + _ <- buildSearchArgument(filter, SearchArgumentFactory.newBuilder()) + } yield filter + for { - // Combines all filters with `And`s to produce a single conjunction predicate - conjunction <- filters.reduceOption(And) + // Combines all convertible filters using `And` to produce a single conjunction + conjunction <- convertibleFilters.reduceOption(And) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate builder <- buildSearchArgument(conjunction, SearchArgumentFactory.newBuilder()) } yield builder.build() @@ -50,28 +82,6 @@ private[orc] object OrcFilters extends Logging { case _ => false } - // lian: I probably missed something here, and had to end up with a pretty weird double-checking - // pattern when converting `And`/`Or`/`Not` filters. - // - // The annoying part is that, `SearchArgument` builder methods like `startAnd()` `startOr()`, - // and `startNot()` mutate internal state of the builder instance. This forces us to translate - // all convertible filters with a single builder instance. However, before actually converting a - // filter, we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible - // filter is found, we may already end up with a builder whose internal state is inconsistent. - // - // For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and - // then try to convert its children. Say we convert `left` child successfully, but find that - // `right` child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is - // inconsistent now. - // - // The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their - // children with brand new builders, and only do the actual conversion with the right builder - // instance when the children are proven to be convertible. - // - // P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. - // Usage of builder methods mentioned above can only be found in test code, where all tested - // filters are known to be convertible. - expression match { case And(left, right) => // At here, it is not safe to just convert one side if we do not understand the @@ -102,6 +112,10 @@ private[orc] object OrcFilters extends Logging { negate <- buildSearchArgument(child, builder.startNot()) } yield negate.end() + // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` + // call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be + // wrapped by a "parent" predicate (`And`, `Or`, or `Not`). + case EqualTo(attribute, value) if isSearchableLiteral(value) => Some(builder.startAnd().equals(attribute, value).end()) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 7a34cf731b4c5..47e73b4006fa5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -21,8 +21,9 @@ import java.io.File import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.{QueryTest, Row} case class OrcData(intField: Int, stringField: String) @@ -174,4 +175,33 @@ class OrcSourceSuite extends OrcSuite { |) """.stripMargin) } + + test("SPARK-12218 Converting conjunctions into ORC SearchArguments") { + // The `LessThan` should be converted while the `StringContains` shouldn't + assertResult( + """leaf-0 = (LESS_THAN a 10) + |expr = leaf-0 + """.stripMargin.trim + ) { + OrcFilters.createFilter(Array( + LessThan("a", 10), + StringContains("b", "prefix") + )).get.toString + } + + // The `LessThan` should be converted while the whole inner `And` shouldn't + assertResult( + """leaf-0 = (LESS_THAN a 10) + |expr = leaf-0 + """.stripMargin.trim + ) { + OrcFilters.createFilter(Array( + LessThan("a", 10), + Not(And( + GreaterThan("a", 1), + StringContains("b", "prefix") + )) + )).get.toString + } + } }