Skip to content

Commit

Permalink
Add tests for structs in filter pushdown (apache#5)
Browse files Browse the repository at this point in the history
* add tests for structs

* add comp operator in test

* fix debug change
  • Loading branch information
stefankandic authored Feb 8, 2024
1 parent 835be0f commit 3f8bf87
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.json4s.jackson.Serialization
import org.apache.spark.{SparkException, SparkUpgradeException}
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME_METADATA_KEY, SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, Expression, ExpressionSet, IsNotNull, IsNull, PredicateHelper}
import org.apache.spark.sql.catalyst.util.RebaseDateTime
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
Expand Down Expand Up @@ -287,18 +287,17 @@ object DataSourceUtils extends PredicateHelper {
* @return A boolean indicating whether the filter should be pushed down or not.
*/
def shouldPushFilter(expression: Expression): Boolean = {
def shouldPushFilterRecursive(expression: Expression): Boolean = expression match {
case attr: AttributeReference =>
attr.dataType match {
def checkRecursive(expression: Expression): Boolean = expression match {
case _: IsNull | _: IsNotNull => true
case _ =>
expression.dataType match {
// don't push down filters for string columns with non-default collation
// as it could lead to incorrect results
case st: StringType => st.isDefaultCollation
case _ => true
case _ => expression.children.forall(checkRecursive)
}

case _ => expression.children.forall(shouldPushFilterRecursive)
}

expression.deterministic && shouldPushFilterRecursive(expression)
expression.deterministic && checkRecursive(expression)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2210,10 +2210,16 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared
}

test("disable filter pushdown for collated strings") {
def containsFilters(df: DataFrame, filterString: String): Unit = {
val explain = df.queryExecution.explainString(ExplainMode.fromString("extended"))
assert(explain.contains(filterString))
}

withTempPath { path =>
val collation = "'SR_CI_AI'"
val df = sql(
s""" SELECT collate(c, $collation) as c
s""" SELECT collate(c, $collation) as c1,
|named_struct('f1', named_struct('f2', collate(c, $collation), 'f3', 1)) as ns
|FROM VALUES ('aaa'), ('AAA'), ('bbb')
|as data(c)
|""".stripMargin)
Expand All @@ -2231,11 +2237,24 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared

filters.foreach { filter =>
val readback = spark.read.parquet(path.getAbsolutePath)
.where(s"c ${filter._1} collate('aaa', $collation)")
val explain = readback.queryExecution.explainString(ExplainMode.fromString("extended"))
assert(explain.contains("PushedFilters: []"))
.where(s"c1 ${filter._1} collate('aaa', $collation)")
.where(s"ns.f1.f2 ${filter._1} collate('aaa', $collation)")
.where(s"ns ${filter._1} " +
s"named_struct('f1', named_struct('f2', collate('aaa', $collation), 'f3', 1))")
.select("c1")

containsFilters(readback,
"PushedFilters: [IsNotNull(c1), IsNotNull(ns.f1.f2), IsNotNull(ns)]")
checkAnswer(readback, filter._2)
}

// should still push down the filter for the nested column which is not collated
val readback = spark.read.parquet(path.getAbsolutePath)
.where(s"ns.f1.f3 == 1")
.select("c1")

containsFilters(readback, "PushedFilters: [IsNotNull(ns.f1.f3), EqualTo(ns.f1.f3,1)]")
checkAnswer(readback, Seq(Row("aaa"), Row("AAA"), Row("bbb")))
}
}
}
Expand Down

0 comments on commit 3f8bf87

Please sign in to comment.