Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12218] Fixes ORC conjunction predicate push down #10377

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.parquet.filter2.predicate.Operators._
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}

import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

/**
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
Expand Down Expand Up @@ -382,6 +384,42 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}

test("SPARK-12218 Converting conjunctions into Parquet filter predicates") {
val schema = StructType(Seq(
StructField("a", IntegerType, nullable = false),
StructField("b", StringType, nullable = true),
StructField("c", DoubleType, nullable = true)
))

assertResult(Some(and(
lt(intColumn("a"), 10: Integer),
gt(doubleColumn("c"), 1.5: java.lang.Double)))
) {
ParquetFilters.createFilter(
schema,
sources.And(
sources.LessThan("a", 10),
sources.GreaterThan("c", 1.5D)))
}

assertResult(None) {
ParquetFilters.createFilter(
schema,
sources.And(
sources.LessThan("a", 10),
sources.StringContains("b", "prefix")))
}

assertResult(None) {
ParquetFilters.createFilter(
schema,
sources.Not(
sources.And(
sources.GreaterThan("a", 1),
sources.StringContains("b", "prefix"))))
}
}

test("SPARK-11164: test the parquet filter in") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,47 @@ import org.apache.spark.Logging
import org.apache.spark.sql.sources._

/**
* It may be optimized by push down partial filters. But we are conservative here.
* Because if some filters fail to be parsed, the tree may be corrupted,
* and cannot be used anymore.
* Helper object for building ORC `SearchArgument`s, which are used for ORC predicate push-down.
*
* Due to limitation of ORC `SearchArgument` builder, we had to end up with a pretty weird double-
* checking pattern when converting `And`/`Or`/`Not` filters.
*
* An ORC `SearchArgument` must be built in one pass using a single builder. For example, you can't
* build `a = 1` and `b = 2` first, and then combine them into `a = 1 AND b = 2`. This is quite
* different from the cases in Spark SQL or Parquet, where complex filters can be easily built using
* existing simpler ones.
*
* The annoying part is that, `SearchArgument` builder methods like `startAnd()`, `startOr()`, and
* `startNot()` mutate internal state of the builder instance. This forces us to translate all
* convertible filters with a single builder instance. However, before actually converting a filter,
* we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible filter is
* found, we may already end up with a builder whose internal state is inconsistent.
*
* For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and then
* try to convert its children. Say we convert `left` child successfully, but find that `right`
* child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is inconsistent
* now.
*
* The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to mention for And Or Not? createFilter is dealing with the top level filters (they will be connected by AND), right? I think it is important to emphasize that createFilter is for top level filters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildSearchArgument is recursive, so nested And/Or/Not within top level filters are also covered.

* children with brand new builders, and only do the actual conversion with the right builder
* instance when the children are proven to be convertible.
*
* P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only. Usage of
* builder methods mentioned above can only be found in test code, where all tested filters are
* known to be convertible.
*/
private[orc] object OrcFilters extends Logging {
def createFilter(filters: Array[Filter]): Option[SearchArgument] = {
// First, tries to convert each filter individually to see whether it's convertible, and then
// collect all convertible ones to build the final `SearchArgument`.
val convertibleFilters = for {
filter <- filters
_ <- buildSearchArgument(filter, SearchArgumentFactory.newBuilder())
} yield filter

for {
// Combines all filters with `And`s to produce a single conjunction predicate
conjunction <- filters.reduceOption(And)
// Combines all convertible filters using `And` to produce a single conjunction
conjunction <- convertibleFilters.reduceOption(And)
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate
builder <- buildSearchArgument(conjunction, SearchArgumentFactory.newBuilder())
} yield builder.build()
Expand All @@ -50,28 +82,6 @@ private[orc] object OrcFilters extends Logging {
case _ => false
}

// lian: I probably missed something here, and had to end up with a pretty weird double-checking
// pattern when converting `And`/`Or`/`Not` filters.
//
// The annoying part is that, `SearchArgument` builder methods like `startAnd()` `startOr()`,
// and `startNot()` mutate internal state of the builder instance. This forces us to translate
// all convertible filters with a single builder instance. However, before actually converting a
// filter, we've no idea whether it can be recognized by ORC or not. Thus, when an inconvertible
// filter is found, we may already end up with a builder whose internal state is inconsistent.
//
// For example, to convert an `And` filter with builder `b`, we call `b.startAnd()` first, and
// then try to convert its children. Say we convert `left` child successfully, but find that
// `right` child is inconvertible. Alas, `b.startAnd()` call can't be rolled back, and `b` is
// inconsistent now.
//
// The workaround employed here is that, for `And`/`Or`/`Not`, we first try to convert their
// children with brand new builders, and only do the actual conversion with the right builder
// instance when the children are proven to be convertible.
//
// P.S.: Hive seems to use `SearchArgument` together with `ExprNodeGenericFuncDesc` only.
// Usage of builder methods mentioned above can only be found in test code, where all tested
// filters are known to be convertible.

expression match {
case And(left, right) =>
// At here, it is not safe to just convert one side if we do not understand the
Expand Down Expand Up @@ -102,6 +112,10 @@ private[orc] object OrcFilters extends Logging {
negate <- buildSearchArgument(child, builder.startNot())
} yield negate.end()

// NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`
// call is mandatory. ORC `SearchArgument` builder requires that all leaf predicates must be
// wrapped by a "parent" predicate (`And`, `Or`, or `Not`).

case EqualTo(attribute, value) if isSearchableLiteral(value) =>
Some(builder.startAnd().equals(attribute, value).end())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import java.io.File

import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.sources._
import org.apache.spark.sql.{QueryTest, Row}

case class OrcData(intField: Int, stringField: String)

Expand Down Expand Up @@ -174,4 +175,33 @@ class OrcSourceSuite extends OrcSuite {
|)
""".stripMargin)
}

test("SPARK-12218 Converting conjunctions into ORC SearchArguments") {
// The `LessThan` should be converted while the `StringContains` shouldn't
assertResult(
"""leaf-0 = (LESS_THAN a 10)
|expr = leaf-0
""".stripMargin.trim
) {
OrcFilters.createFilter(Array(
LessThan("a", 10),
StringContains("b", "prefix")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringContains is not supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, ORC SearchArgument doesn't support string operations.

)).get.toString
}

// The `LessThan` should be converted while the whole inner `And` shouldn't
assertResult(
"""leaf-0 = (LESS_THAN a 10)
|expr = leaf-0
""".stripMargin.trim
) {
OrcFilters.createFilter(Array(
LessThan("a", 10),
Not(And(
GreaterThan("a", 1),
StringContains("b", "prefix")
))
)).get.toString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, can we have a similar test for parquet? Right now, we only have a query test that will not trigger this anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, will open a follow up PR for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually added the Parquet test in this PR.

}
}
}