From 7a36dd397c6276594308529a9fd6ac2c0e81a5c6 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Tue, 30 Jun 2020 19:50:33 +0900 Subject: [PATCH] Avoid changing test utils and minimise the diff --- .../parquet/ParquetFilterSuite.scala | 789 +++++++++--------- .../datasources/parquet/ParquetIOSuite.scala | 20 +- .../datasources/parquet/ParquetTest.scala | 14 +- 3 files changed, 411 insertions(+), 412 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index d20a07f420e87..8b922aaed4e68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -22,6 +22,9 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import java.time.{LocalDate, LocalDateTime, ZoneId} +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators} import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate.Operators.{Column => _, _} @@ -106,10 +109,18 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } /** - * Takes single level `inputDF` dataframe to generate multi-level nested - * dataframes as new test data. + * Takes a sequence of products `data` to generate multi-level nested + * dataframes as new test data. It tests both non-nested and nested dataframes + * which are written and read back with Parquet datasource. + * + * This is different from [[ParquetTest.withParquetDataFrame]] which does not + * test nested cases. */ - private def withNestedDataFrame(inputDF: DataFrame) + private def withNestedParquetDataFrame[T <: Product: ClassTag: TypeTag](data: Seq[T]) + (runTest: (DataFrame, String, Any => Any) => Unit): Unit = + withNestedParquetDataFrame(spark.createDataFrame(data))(runTest) + + private def withNestedParquetDataFrame(inputDF: DataFrame) (runTest: (DataFrame, String, Any => Any) => Unit): Unit = { assert(inputDF.schema.fields.length == 1) assert(!inputDF.schema.fields.head.dataType.isInstanceOf[StructType]) @@ -138,8 +149,11 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared "`a.b`.`c.d`", // one level nesting with column names containing `dots` (x: Any) => Row(x) ) - ).foreach { case (df, colName, resultFun) => - runTest(df, colName, resultFun) + ).foreach { case (newDF, colName, resultFun) => + withTempPath { file => + newDF.write.format(dataSourceName).save(file.getCanonicalPath) + readParquetFile(file.getCanonicalPath) { df => runTest(df, colName, resultFun) } + } } } @@ -155,7 +169,9 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared import testImplicits._ val df = data.map(i => Tuple1(Timestamp.valueOf(i))).toDF() - withNestedDataFrame(df) { case (inputDF, colName, fun) => + withNestedParquetDataFrame(df) { case (parquetDF, colName, fun) => + implicit val df: DataFrame = parquetDF + def resultFun(tsStr: String): Any = { val parsed = if (java8Api) { LocalDateTime.parse(tsStr.replace(" ", "T")) @@ -166,36 +182,35 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } fun(parsed) } - withParquetDataFrame(inputDF) { implicit df => - val tsAttr = df(colName).expr - assert(df(colName).expr.dataType === TimestampType) - - checkFilterPredicate(tsAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(tsAttr.isNotNull, classOf[NotEq[_]], - data.map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(tsAttr === ts1.ts, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr <=> ts1.ts, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr =!= ts1.ts, classOf[NotEq[_]], - Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(tsAttr < ts2.ts, classOf[Lt[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr > ts1.ts, classOf[Gt[_]], - Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i)))) - checkFilterPredicate(tsAttr <= ts1.ts, classOf[LtEq[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr >= ts4.ts, classOf[GtEq[_]], resultFun(ts4)) - - checkFilterPredicate(Literal(ts1.ts) === tsAttr, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts1.ts) <=> tsAttr, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts2.ts) > tsAttr, classOf[Lt[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts3.ts) < tsAttr, classOf[Gt[_]], resultFun(ts4)) - checkFilterPredicate(Literal(ts1.ts) >= tsAttr, classOf[LtEq[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts4.ts) <= tsAttr, classOf[GtEq[_]], resultFun(ts4)) - - checkFilterPredicate(!(tsAttr < ts4.ts), classOf[GtEq[_]], resultFun(ts4)) - checkFilterPredicate(tsAttr < ts2.ts || tsAttr > ts3.ts, classOf[Operators.Or], - Seq(Row(resultFun(ts1)), Row(resultFun(ts4)))) - } + + val tsAttr = df(colName).expr + assert(df(colName).expr.dataType === TimestampType) + + checkFilterPredicate(tsAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(tsAttr.isNotNull, classOf[NotEq[_]], + data.map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(tsAttr === ts1.ts, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr <=> ts1.ts, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr =!= ts1.ts, classOf[NotEq[_]], + Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(tsAttr < ts2.ts, classOf[Lt[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr > ts1.ts, classOf[Gt[_]], + Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i)))) + checkFilterPredicate(tsAttr <= ts1.ts, classOf[LtEq[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr >= ts4.ts, classOf[GtEq[_]], resultFun(ts4)) + + checkFilterPredicate(Literal(ts1.ts) === tsAttr, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts1.ts) <=> tsAttr, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts2.ts) > tsAttr, classOf[Lt[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts3.ts) < tsAttr, classOf[Gt[_]], resultFun(ts4)) + checkFilterPredicate(Literal(ts1.ts) >= tsAttr, classOf[LtEq[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts4.ts) <= tsAttr, classOf[GtEq[_]], resultFun(ts4)) + + checkFilterPredicate(!(tsAttr < ts4.ts), classOf[GtEq[_]], resultFun(ts4)) + checkFilterPredicate(tsAttr < ts2.ts || tsAttr > ts3.ts, classOf[Operators.Or], + Seq(Row(resultFun(ts1)), Row(resultFun(ts4)))) } } @@ -226,272 +241,264 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared test("filter pushdown - boolean") { val data = (true :: false :: Nil).map(b => Tuple1.apply(Option(b))) - import testImplicits._ - withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => - withParquetDataFrame(inputDF) { implicit df => - val booleanAttr = df(colName).expr - assert(df(colName).expr.dataType === BooleanType) - - checkFilterPredicate(booleanAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(booleanAttr.isNotNull, classOf[NotEq[_]], - Seq(Row(resultFun(true)), Row(resultFun(false)))) - - checkFilterPredicate(booleanAttr === true, classOf[Eq[_]], resultFun(true)) - checkFilterPredicate(booleanAttr <=> true, classOf[Eq[_]], resultFun(true)) - checkFilterPredicate(booleanAttr =!= true, classOf[NotEq[_]], resultFun(false)) - } + withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) => + implicit val df: DataFrame = inputDF + + val booleanAttr = df(colName).expr + assert(df(colName).expr.dataType === BooleanType) + + checkFilterPredicate(booleanAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(booleanAttr.isNotNull, classOf[NotEq[_]], + Seq(Row(resultFun(true)), Row(resultFun(false)))) + + checkFilterPredicate(booleanAttr === true, classOf[Eq[_]], resultFun(true)) + checkFilterPredicate(booleanAttr <=> true, classOf[Eq[_]], resultFun(true)) + checkFilterPredicate(booleanAttr =!= true, classOf[NotEq[_]], resultFun(false)) } } test("filter pushdown - tinyint") { val data = (1 to 4).map(i => Tuple1(Option(i.toByte))) - import testImplicits._ - withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => - withParquetDataFrame(inputDF) { implicit df => - val tinyIntAttr = df(colName).expr - assert(df(colName).expr.dataType === ByteType) - - checkFilterPredicate(tinyIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(tinyIntAttr.isNotNull, classOf[NotEq[_]], - (1 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(tinyIntAttr === 1.toByte, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(tinyIntAttr <=> 1.toByte, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(tinyIntAttr =!= 1.toByte, classOf[NotEq[_]], - (2 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(tinyIntAttr < 2.toByte, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(tinyIntAttr > 3.toByte, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(tinyIntAttr <= 1.toByte, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(tinyIntAttr >= 4.toByte, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(Literal(1.toByte) === tinyIntAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(1.toByte) <=> tinyIntAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(2.toByte) > tinyIntAttr, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(Literal(3.toByte) < tinyIntAttr, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(Literal(1.toByte) >= tinyIntAttr, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(Literal(4.toByte) <= tinyIntAttr, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(!(tinyIntAttr < 4.toByte), classOf[GtEq[_]], resultFun(4)) - checkFilterPredicate(tinyIntAttr < 2.toByte || tinyIntAttr > 3.toByte, - classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4)))) - } + withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) => + implicit val df: DataFrame = inputDF + + val tinyIntAttr = df(colName).expr + assert(df(colName).expr.dataType === ByteType) + + checkFilterPredicate(tinyIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(tinyIntAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(tinyIntAttr === 1.toByte, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(tinyIntAttr <=> 1.toByte, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(tinyIntAttr =!= 1.toByte, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(tinyIntAttr < 2.toByte, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(tinyIntAttr > 3.toByte, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(tinyIntAttr <= 1.toByte, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(tinyIntAttr >= 4.toByte, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(Literal(1.toByte) === tinyIntAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(1.toByte) <=> tinyIntAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(2.toByte) > tinyIntAttr, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(Literal(3.toByte) < tinyIntAttr, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(Literal(1.toByte) >= tinyIntAttr, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(Literal(4.toByte) <= tinyIntAttr, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(!(tinyIntAttr < 4.toByte), classOf[GtEq[_]], resultFun(4)) + checkFilterPredicate(tinyIntAttr < 2.toByte || tinyIntAttr > 3.toByte, + classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4)))) } } test("filter pushdown - smallint") { val data = (1 to 4).map(i => Tuple1(Option(i.toShort))) - import testImplicits._ - withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => - withParquetDataFrame(inputDF) { implicit df => - val smallIntAttr = df(colName).expr - assert(df(colName).expr.dataType === ShortType) - - checkFilterPredicate(smallIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(smallIntAttr.isNotNull, classOf[NotEq[_]], - (1 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(smallIntAttr === 1.toShort, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(smallIntAttr <=> 1.toShort, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(smallIntAttr =!= 1.toShort, classOf[NotEq[_]], - (2 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(smallIntAttr < 2.toShort, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(smallIntAttr > 3.toShort, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(smallIntAttr <= 1.toShort, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(smallIntAttr >= 4.toShort, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(Literal(1.toShort) === smallIntAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(1.toShort) <=> smallIntAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(2.toShort) > smallIntAttr, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(Literal(3.toShort) < smallIntAttr, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(Literal(1.toShort) >= smallIntAttr, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(Literal(4.toShort) <= smallIntAttr, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(!(smallIntAttr < 4.toShort), classOf[GtEq[_]], resultFun(4)) - checkFilterPredicate(smallIntAttr < 2.toShort || smallIntAttr > 3.toShort, - classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4)))) - } + withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) => + implicit val df: DataFrame = inputDF + + val smallIntAttr = df(colName).expr + assert(df(colName).expr.dataType === ShortType) + + checkFilterPredicate(smallIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(smallIntAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(smallIntAttr === 1.toShort, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(smallIntAttr <=> 1.toShort, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(smallIntAttr =!= 1.toShort, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(smallIntAttr < 2.toShort, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(smallIntAttr > 3.toShort, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(smallIntAttr <= 1.toShort, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(smallIntAttr >= 4.toShort, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(Literal(1.toShort) === smallIntAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(1.toShort) <=> smallIntAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(2.toShort) > smallIntAttr, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(Literal(3.toShort) < smallIntAttr, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(Literal(1.toShort) >= smallIntAttr, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(Literal(4.toShort) <= smallIntAttr, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(!(smallIntAttr < 4.toShort), classOf[GtEq[_]], resultFun(4)) + checkFilterPredicate(smallIntAttr < 2.toShort || smallIntAttr > 3.toShort, + classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4)))) } } test("filter pushdown - integer") { val data = (1 to 4).map(i => Tuple1(Option(i))) - import testImplicits._ - withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => - withParquetDataFrame(inputDF) { implicit df => - val intAttr = df(colName).expr - assert(df(colName).expr.dataType === IntegerType) - - checkFilterPredicate(intAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(intAttr.isNotNull, classOf[NotEq[_]], - (1 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(intAttr === 1, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(intAttr <=> 1, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(intAttr =!= 1, classOf[NotEq[_]], - (2 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(intAttr < 2, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(intAttr > 3, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(intAttr <= 1, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(intAttr >= 4, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(Literal(1) === intAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(1) <=> intAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(2) > intAttr, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(Literal(3) < intAttr, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(Literal(1) >= intAttr, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(Literal(4) <= intAttr, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(!(intAttr < 4), classOf[GtEq[_]], resultFun(4)) - checkFilterPredicate(intAttr < 2 || intAttr > 3, classOf[Operators.Or], - Seq(Row(resultFun(1)), Row(resultFun(4)))) - } + withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) => + implicit val df: DataFrame = inputDF + + val intAttr = df(colName).expr + assert(df(colName).expr.dataType === IntegerType) + + checkFilterPredicate(intAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(intAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(intAttr === 1, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(intAttr <=> 1, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(intAttr =!= 1, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(intAttr < 2, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(intAttr > 3, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(intAttr <= 1, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(intAttr >= 4, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(Literal(1) === intAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(1) <=> intAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(2) > intAttr, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(Literal(3) < intAttr, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(Literal(1) >= intAttr, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(Literal(4) <= intAttr, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(!(intAttr < 4), classOf[GtEq[_]], resultFun(4)) + checkFilterPredicate(intAttr < 2 || intAttr > 3, classOf[Operators.Or], + Seq(Row(resultFun(1)), Row(resultFun(4)))) } } test("filter pushdown - long") { val data = (1 to 4).map(i => Tuple1(Option(i.toLong))) - import testImplicits._ - withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => - withParquetDataFrame(inputDF) { implicit df => - val longAttr = df(colName).expr - assert(df(colName).expr.dataType === LongType) - - checkFilterPredicate(longAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(longAttr.isNotNull, classOf[NotEq[_]], - (1 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(longAttr === 1, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(longAttr <=> 1, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(longAttr =!= 1, classOf[NotEq[_]], - (2 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(longAttr < 2, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(longAttr > 3, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(longAttr <= 1, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(longAttr >= 4, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(Literal(1) === longAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(1) <=> longAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(2) > longAttr, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(Literal(3) < longAttr, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(Literal(1) >= longAttr, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(Literal(4) <= longAttr, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(!(longAttr < 4), classOf[GtEq[_]], resultFun(4)) - checkFilterPredicate(longAttr < 2 || longAttr > 3, classOf[Operators.Or], - Seq(Row(resultFun(1)), Row(resultFun(4)))) - } + withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) => + implicit val df: DataFrame = inputDF + + val longAttr = df(colName).expr + assert(df(colName).expr.dataType === LongType) + + checkFilterPredicate(longAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(longAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(longAttr === 1, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(longAttr <=> 1, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(longAttr =!= 1, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(longAttr < 2, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(longAttr > 3, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(longAttr <= 1, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(longAttr >= 4, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(Literal(1) === longAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(1) <=> longAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(2) > longAttr, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(Literal(3) < longAttr, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(Literal(1) >= longAttr, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(Literal(4) <= longAttr, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(!(longAttr < 4), classOf[GtEq[_]], resultFun(4)) + checkFilterPredicate(longAttr < 2 || longAttr > 3, classOf[Operators.Or], + Seq(Row(resultFun(1)), Row(resultFun(4)))) } } test("filter pushdown - float") { val data = (1 to 4).map(i => Tuple1(Option(i.toFloat))) - import testImplicits._ - withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => - withParquetDataFrame(inputDF) { implicit df => - val floatAttr = df(colName).expr - assert(df(colName).expr.dataType === FloatType) - - checkFilterPredicate(floatAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(floatAttr.isNotNull, classOf[NotEq[_]], - (1 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(floatAttr === 1, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(floatAttr <=> 1, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(floatAttr =!= 1, classOf[NotEq[_]], - (2 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(floatAttr < 2, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(floatAttr > 3, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(floatAttr <= 1, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(floatAttr >= 4, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(Literal(1) === floatAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(1) <=> floatAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(2) > floatAttr, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(Literal(3) < floatAttr, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(Literal(1) >= floatAttr, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(Literal(4) <= floatAttr, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(!(floatAttr < 4), classOf[GtEq[_]], resultFun(4)) - checkFilterPredicate(floatAttr < 2 || floatAttr > 3, classOf[Operators.Or], - Seq(Row(resultFun(1)), Row(resultFun(4)))) - } + withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) => + implicit val df: DataFrame = inputDF + + val floatAttr = df(colName).expr + assert(df(colName).expr.dataType === FloatType) + + checkFilterPredicate(floatAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(floatAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(floatAttr === 1, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(floatAttr <=> 1, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(floatAttr =!= 1, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(floatAttr < 2, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(floatAttr > 3, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(floatAttr <= 1, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(floatAttr >= 4, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(Literal(1) === floatAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(1) <=> floatAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(2) > floatAttr, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(Literal(3) < floatAttr, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(Literal(1) >= floatAttr, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(Literal(4) <= floatAttr, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(!(floatAttr < 4), classOf[GtEq[_]], resultFun(4)) + checkFilterPredicate(floatAttr < 2 || floatAttr > 3, classOf[Operators.Or], + Seq(Row(resultFun(1)), Row(resultFun(4)))) } } test("filter pushdown - double") { val data = (1 to 4).map(i => Tuple1(Option(i.toDouble))) - import testImplicits._ - withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => - withParquetDataFrame(inputDF) { implicit df => - val doubleAttr = df(colName).expr - assert(df(colName).expr.dataType === DoubleType) - - checkFilterPredicate(doubleAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(doubleAttr.isNotNull, classOf[NotEq[_]], - (1 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(doubleAttr === 1, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(doubleAttr <=> 1, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(doubleAttr =!= 1, classOf[NotEq[_]], - (2 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(doubleAttr < 2, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(doubleAttr > 3, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(doubleAttr <= 1, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(doubleAttr >= 4, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(Literal(1) === doubleAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(1) <=> doubleAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(2) > doubleAttr, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(Literal(3) < doubleAttr, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(Literal(1) >= doubleAttr, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(Literal(4) <= doubleAttr, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(!(doubleAttr < 4), classOf[GtEq[_]], resultFun(4)) - checkFilterPredicate(doubleAttr < 2 || doubleAttr > 3, classOf[Operators.Or], - Seq(Row(resultFun(1)), Row(resultFun(4)))) - } + withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) => + implicit val df: DataFrame = inputDF + + val doubleAttr = df(colName).expr + assert(df(colName).expr.dataType === DoubleType) + + checkFilterPredicate(doubleAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(doubleAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(doubleAttr === 1, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(doubleAttr <=> 1, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(doubleAttr =!= 1, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(doubleAttr < 2, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(doubleAttr > 3, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(doubleAttr <= 1, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(doubleAttr >= 4, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(Literal(1) === doubleAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(1) <=> doubleAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(2) > doubleAttr, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(Literal(3) < doubleAttr, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(Literal(1) >= doubleAttr, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(Literal(4) <= doubleAttr, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(!(doubleAttr < 4), classOf[GtEq[_]], resultFun(4)) + checkFilterPredicate(doubleAttr < 2 || doubleAttr > 3, classOf[Operators.Or], + Seq(Row(resultFun(1)), Row(resultFun(4)))) } } test("filter pushdown - string") { val data = (1 to 4).map(i => Tuple1(Option(i.toString))) - import testImplicits._ - withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => - withParquetDataFrame(inputDF) { implicit df => - val stringAttr = df(colName).expr - assert(df(colName).expr.dataType === StringType) - - checkFilterPredicate(stringAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(stringAttr.isNotNull, classOf[NotEq[_]], - (1 to 4).map(i => Row.apply(resultFun(i.toString)))) - - checkFilterPredicate(stringAttr === "1", classOf[Eq[_]], resultFun("1")) - checkFilterPredicate(stringAttr <=> "1", classOf[Eq[_]], resultFun("1")) - checkFilterPredicate(stringAttr =!= "1", classOf[NotEq[_]], - (2 to 4).map(i => Row.apply(resultFun(i.toString)))) - - checkFilterPredicate(stringAttr < "2", classOf[Lt[_]], resultFun("1")) - checkFilterPredicate(stringAttr > "3", classOf[Gt[_]], resultFun("4")) - checkFilterPredicate(stringAttr <= "1", classOf[LtEq[_]], resultFun("1")) - checkFilterPredicate(stringAttr >= "4", classOf[GtEq[_]], resultFun("4")) - - checkFilterPredicate(Literal("1") === stringAttr, classOf[Eq[_]], resultFun("1")) - checkFilterPredicate(Literal("1") <=> stringAttr, classOf[Eq[_]], resultFun("1")) - checkFilterPredicate(Literal("2") > stringAttr, classOf[Lt[_]], resultFun("1")) - checkFilterPredicate(Literal("3") < stringAttr, classOf[Gt[_]], resultFun("4")) - checkFilterPredicate(Literal("1") >= stringAttr, classOf[LtEq[_]], resultFun("1")) - checkFilterPredicate(Literal("4") <= stringAttr, classOf[GtEq[_]], resultFun("4")) - - checkFilterPredicate(!(stringAttr < "4"), classOf[GtEq[_]], resultFun("4")) - checkFilterPredicate(stringAttr < "2" || stringAttr > "3", classOf[Operators.Or], - Seq(Row(resultFun("1")), Row(resultFun("4")))) - } + withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) => + implicit val df: DataFrame = inputDF + + val stringAttr = df(colName).expr + assert(df(colName).expr.dataType === StringType) + + checkFilterPredicate(stringAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(stringAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(resultFun(i.toString)))) + + checkFilterPredicate(stringAttr === "1", classOf[Eq[_]], resultFun("1")) + checkFilterPredicate(stringAttr <=> "1", classOf[Eq[_]], resultFun("1")) + checkFilterPredicate(stringAttr =!= "1", classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(resultFun(i.toString)))) + + checkFilterPredicate(stringAttr < "2", classOf[Lt[_]], resultFun("1")) + checkFilterPredicate(stringAttr > "3", classOf[Gt[_]], resultFun("4")) + checkFilterPredicate(stringAttr <= "1", classOf[LtEq[_]], resultFun("1")) + checkFilterPredicate(stringAttr >= "4", classOf[GtEq[_]], resultFun("4")) + + checkFilterPredicate(Literal("1") === stringAttr, classOf[Eq[_]], resultFun("1")) + checkFilterPredicate(Literal("1") <=> stringAttr, classOf[Eq[_]], resultFun("1")) + checkFilterPredicate(Literal("2") > stringAttr, classOf[Lt[_]], resultFun("1")) + checkFilterPredicate(Literal("3") < stringAttr, classOf[Gt[_]], resultFun("4")) + checkFilterPredicate(Literal("1") >= stringAttr, classOf[LtEq[_]], resultFun("1")) + checkFilterPredicate(Literal("4") <= stringAttr, classOf[GtEq[_]], resultFun("4")) + + checkFilterPredicate(!(stringAttr < "4"), classOf[GtEq[_]], resultFun("4")) + checkFilterPredicate(stringAttr < "2" || stringAttr > "3", classOf[Operators.Or], + Seq(Row(resultFun("1")), Row(resultFun("4")))) } } @@ -501,38 +508,37 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } val data = (1 to 4).map(i => Tuple1(Option(i.b))) - import testImplicits._ - withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) => - withParquetDataFrame(inputDF) { implicit df => - val binaryAttr: Expression = df(colName).expr - assert(df(colName).expr.dataType === BinaryType) - - checkFilterPredicate(binaryAttr === 1.b, classOf[Eq[_]], resultFun(1.b)) - checkFilterPredicate(binaryAttr <=> 1.b, classOf[Eq[_]], resultFun(1.b)) - - checkFilterPredicate(binaryAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(binaryAttr.isNotNull, classOf[NotEq[_]], - (1 to 4).map(i => Row.apply(resultFun(i.b)))) - - checkFilterPredicate(binaryAttr =!= 1.b, classOf[NotEq[_]], - (2 to 4).map(i => Row.apply(resultFun(i.b)))) - - checkFilterPredicate(binaryAttr < 2.b, classOf[Lt[_]], resultFun(1.b)) - checkFilterPredicate(binaryAttr > 3.b, classOf[Gt[_]], resultFun(4.b)) - checkFilterPredicate(binaryAttr <= 1.b, classOf[LtEq[_]], resultFun(1.b)) - checkFilterPredicate(binaryAttr >= 4.b, classOf[GtEq[_]], resultFun(4.b)) - - checkFilterPredicate(Literal(1.b) === binaryAttr, classOf[Eq[_]], resultFun(1.b)) - checkFilterPredicate(Literal(1.b) <=> binaryAttr, classOf[Eq[_]], resultFun(1.b)) - checkFilterPredicate(Literal(2.b) > binaryAttr, classOf[Lt[_]], resultFun(1.b)) - checkFilterPredicate(Literal(3.b) < binaryAttr, classOf[Gt[_]], resultFun(4.b)) - checkFilterPredicate(Literal(1.b) >= binaryAttr, classOf[LtEq[_]], resultFun(1.b)) - checkFilterPredicate(Literal(4.b) <= binaryAttr, classOf[GtEq[_]], resultFun(4.b)) - - checkFilterPredicate(!(binaryAttr < 4.b), classOf[GtEq[_]], resultFun(4.b)) - checkFilterPredicate(binaryAttr < 2.b || binaryAttr > 3.b, classOf[Operators.Or], - Seq(Row(resultFun(1.b)), Row(resultFun(4.b)))) - } + withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) => + implicit val df: DataFrame = inputDF + + val binaryAttr: Expression = df(colName).expr + assert(df(colName).expr.dataType === BinaryType) + + checkFilterPredicate(binaryAttr === 1.b, classOf[Eq[_]], resultFun(1.b)) + checkFilterPredicate(binaryAttr <=> 1.b, classOf[Eq[_]], resultFun(1.b)) + + checkFilterPredicate(binaryAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(binaryAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(resultFun(i.b)))) + + checkFilterPredicate(binaryAttr =!= 1.b, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(resultFun(i.b)))) + + checkFilterPredicate(binaryAttr < 2.b, classOf[Lt[_]], resultFun(1.b)) + checkFilterPredicate(binaryAttr > 3.b, classOf[Gt[_]], resultFun(4.b)) + checkFilterPredicate(binaryAttr <= 1.b, classOf[LtEq[_]], resultFun(1.b)) + checkFilterPredicate(binaryAttr >= 4.b, classOf[GtEq[_]], resultFun(4.b)) + + checkFilterPredicate(Literal(1.b) === binaryAttr, classOf[Eq[_]], resultFun(1.b)) + checkFilterPredicate(Literal(1.b) <=> binaryAttr, classOf[Eq[_]], resultFun(1.b)) + checkFilterPredicate(Literal(2.b) > binaryAttr, classOf[Lt[_]], resultFun(1.b)) + checkFilterPredicate(Literal(3.b) < binaryAttr, classOf[Gt[_]], resultFun(4.b)) + checkFilterPredicate(Literal(1.b) >= binaryAttr, classOf[LtEq[_]], resultFun(1.b)) + checkFilterPredicate(Literal(4.b) <= binaryAttr, classOf[GtEq[_]], resultFun(4.b)) + + checkFilterPredicate(!(binaryAttr < 4.b), classOf[GtEq[_]], resultFun(4.b)) + checkFilterPredicate(binaryAttr < 2.b || binaryAttr > 3.b, classOf[Operators.Or], + Seq(Row(resultFun(1.b)), Row(resultFun(4.b)))) } } @@ -546,56 +552,57 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared Seq(false, true).foreach { java8Api => withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { - val df = data.map(i => Tuple1(Date.valueOf(i))).toDF() - withNestedDataFrame(df) { case (inputDF, colName, fun) => + val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF() + withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) => + implicit val df: DataFrame = inputDF + def resultFun(dateStr: String): Any = { val parsed = if (java8Api) LocalDate.parse(dateStr) else Date.valueOf(dateStr) fun(parsed) } - withParquetDataFrame(inputDF) { implicit df => - val dateAttr: Expression = df(colName).expr - assert(df(colName).expr.dataType === DateType) - - checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]], - data.map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]], - Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]], - resultFun("2018-03-21")) - checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]], - resultFun("2018-03-21")) - - checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]], - resultFun("2018-03-21")) - checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]], - resultFun("2018-03-18")) - checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]], - resultFun("2018-03-21")) - - checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]], - resultFun("2018-03-21")) - checkFilterPredicate( - dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date, - classOf[Operators.Or], - Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21")))) - } + + val dateAttr: Expression = df(colName).expr + assert(df(colName).expr.dataType === DateType) + + checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]], + data.map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]], + resultFun("2018-03-18")) + checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]], + resultFun("2018-03-18")) + checkFilterPredicate(dateAttr =!= "2018-03-18".date, classOf[NotEq[_]], + Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]], + resultFun("2018-03-18")) + checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]], + resultFun("2018-03-21")) + checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]], + resultFun("2018-03-18")) + checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]], + resultFun("2018-03-21")) + + checkFilterPredicate(Literal("2018-03-18".date) === dateAttr, classOf[Eq[_]], + resultFun("2018-03-18")) + checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr, classOf[Eq[_]], + resultFun("2018-03-18")) + checkFilterPredicate(Literal("2018-03-19".date) > dateAttr, classOf[Lt[_]], + resultFun("2018-03-18")) + checkFilterPredicate(Literal("2018-03-20".date) < dateAttr, classOf[Gt[_]], + resultFun("2018-03-21")) + checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr, classOf[LtEq[_]], + resultFun("2018-03-18")) + checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr, classOf[GtEq[_]], + resultFun("2018-03-21")) + + checkFilterPredicate(!(dateAttr < "2018-03-21".date), classOf[GtEq[_]], + resultFun("2018-03-21")) + checkFilterPredicate( + dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date, + classOf[Operators.Or], + Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21")))) } } } @@ -603,7 +610,9 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared test("filter pushdown - timestamp") { Seq(true, false).foreach { java8Api => - withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { + withSQLConf( + SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString, + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS val millisData = Seq( "1000-06-14 08:28:53.123", @@ -630,11 +639,14 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> ParquetOutputTimestampType.INT96.toString) { import testImplicits._ - withParquetDataFrame( - millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF()) { implicit df => - val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) - assertResult(None) { - createParquetFilters(schema).createFilter(sources.IsNull("_1")) + withTempPath { file => + millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF + .write.format(dataSourceName).save(file.getCanonicalPath) + readParquetFile(file.getCanonicalPath) { df => + val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) + assertResult(None) { + createParquetFilters(schema).createFilter(sources.IsNull("_1")) + } } } } @@ -653,36 +665,36 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared val rdd = spark.sparkContext.parallelize((1 to 4).map(i => Row(new java.math.BigDecimal(i)))) val dataFrame = spark.createDataFrame(rdd, StructType.fromDDL(s"a decimal($precision, 2)")) - withNestedDataFrame(dataFrame) { case (inputDF, colName, resultFun) => - withParquetDataFrame(inputDF) { implicit df => - val decimalAttr: Expression = df(colName).expr - assert(df(colName).expr.dataType === DecimalType(precision, 2)) - - checkFilterPredicate(decimalAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) - checkFilterPredicate(decimalAttr.isNotNull, classOf[NotEq[_]], - (1 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(decimalAttr === 1, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(decimalAttr <=> 1, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(decimalAttr =!= 1, classOf[NotEq[_]], - (2 to 4).map(i => Row.apply(resultFun(i)))) - - checkFilterPredicate(decimalAttr < 2, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(decimalAttr > 3, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(decimalAttr <= 1, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(decimalAttr >= 4, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(Literal(1) === decimalAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(1) <=> decimalAttr, classOf[Eq[_]], resultFun(1)) - checkFilterPredicate(Literal(2) > decimalAttr, classOf[Lt[_]], resultFun(1)) - checkFilterPredicate(Literal(3) < decimalAttr, classOf[Gt[_]], resultFun(4)) - checkFilterPredicate(Literal(1) >= decimalAttr, classOf[LtEq[_]], resultFun(1)) - checkFilterPredicate(Literal(4) <= decimalAttr, classOf[GtEq[_]], resultFun(4)) - - checkFilterPredicate(!(decimalAttr < 4), classOf[GtEq[_]], resultFun(4)) - checkFilterPredicate(decimalAttr < 2 || decimalAttr > 3, classOf[Operators.Or], - Seq(Row(resultFun(1)), Row(resultFun(4)))) - } + withNestedParquetDataFrame(dataFrame) { case (inputDF, colName, resultFun) => + implicit val df: DataFrame = inputDF + + val decimalAttr: Expression = df(colName).expr + assert(df(colName).expr.dataType === DecimalType(precision, 2)) + + checkFilterPredicate(decimalAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(decimalAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(decimalAttr === 1, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(decimalAttr <=> 1, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(decimalAttr =!= 1, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(resultFun(i)))) + + checkFilterPredicate(decimalAttr < 2, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(decimalAttr > 3, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(decimalAttr <= 1, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(decimalAttr >= 4, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(Literal(1) === decimalAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(1) <=> decimalAttr, classOf[Eq[_]], resultFun(1)) + checkFilterPredicate(Literal(2) > decimalAttr, classOf[Lt[_]], resultFun(1)) + checkFilterPredicate(Literal(3) < decimalAttr, classOf[Gt[_]], resultFun(4)) + checkFilterPredicate(Literal(1) >= decimalAttr, classOf[LtEq[_]], resultFun(1)) + checkFilterPredicate(Literal(4) <= decimalAttr, classOf[GtEq[_]], resultFun(4)) + + checkFilterPredicate(!(decimalAttr < 4), classOf[GtEq[_]], resultFun(4)) + checkFilterPredicate(decimalAttr < 2 || decimalAttr > 3, classOf[Operators.Or], + Seq(Row(resultFun(1)), Row(resultFun(4)))) } } } @@ -1195,8 +1207,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } test("SPARK-16371 Do not push down filters when inner name and outer name are the same") { - import testImplicits._ - withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i))).toDF()) { implicit df => + withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df => // Here the schema becomes as below: // // root @@ -1336,10 +1347,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } test("filter pushdown - StringStartsWith") { - withParquetDataFrame { - import testImplicits._ - (1 to 4).map(i => Tuple1(i + "str" + i)).toDF() - } { implicit df => + withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit df => checkFilterPredicate( '_1.startsWith("").asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], @@ -1385,10 +1393,7 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } // SPARK-28371: make sure filter is null-safe. - withParquetDataFrame { - import testImplicits._ - Seq(Tuple1[String](null)).toDF() - } { implicit df => + withParquetDataFrame(Seq(Tuple1[String](null))) { implicit df => checkFilterPredicate( '_1.startsWith("blah").asInstanceOf[Predicate], classOf[UserDefinedByInstance[_, _]], @@ -1607,7 +1612,7 @@ class ParquetV1FilterSuite extends ParquetFilterSuite { expected: Seq[Row]): Unit = { val output = predicate.collect { case a: Attribute => a }.distinct - Seq(("parquet", true), ("", false)).map { case (pushdownDsList, nestedPredicatePushdown) => + Seq(("parquet", true), ("", false)).foreach { case (pushdownDsList, nestedPredicatePushdown) => withSQLConf( SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 79c32976f02ec..2dc8a062bb73d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -85,7 +85,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession * Writes `data` to a Parquet file, reads it back and check file contents. */ protected def checkParquetFile[T <: Product : ClassTag: TypeTag](data: Seq[T]): Unit = { - withParquetDataFrame(data.toDF())(r => checkAnswer(r, data.map(Row.fromTuple))) + withParquetDataFrame(data)(r => checkAnswer(r, data.map(Row.fromTuple))) } test("basic data types (without binary)") { @@ -97,7 +97,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession test("raw binary") { val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte))) - withParquetDataFrame(data.toDF()) { df => + withParquetDataFrame(data) { df => assertResult(data.map(_._1.mkString(",")).sorted) { df.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted } @@ -200,7 +200,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession testStandardAndLegacyModes("struct") { val data = (1 to 4).map(i => Tuple1((i, s"val_$i"))) - withParquetDataFrame(data.toDF()) { df => + withParquetDataFrame(data) { df => // Structs are converted to `Row`s checkAnswer(df, data.map { case Tuple1(struct) => Row(Row(struct.productIterator.toSeq: _*)) @@ -217,7 +217,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession ) ) } - withParquetDataFrame(data.toDF()) { df => + withParquetDataFrame(data) { df => // Structs are converted to `Row`s checkAnswer(df, data.map { case Tuple1(array) => Row(array.map(struct => Row(struct.productIterator.toSeq: _*))) @@ -236,7 +236,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession ) ) } - withParquetDataFrame(data.toDF()) { df => + withParquetDataFrame(data) { df => // Structs are converted to `Row`s checkAnswer(df, data.map { case Tuple1(array) => Row(array.map { case Tuple1(Tuple1(str)) => Row(Row(str))}) @@ -246,7 +246,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession testStandardAndLegacyModes("nested struct with array of array as field") { val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i"))))) - withParquetDataFrame(data.toDF()) { df => + withParquetDataFrame(data) { df => // Structs are converted to `Row`s checkAnswer(df, data.map { case Tuple1(struct) => Row(Row(struct.productIterator.toSeq: _*)) @@ -263,7 +263,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession ) ) } - withParquetDataFrame(data.toDF()) { df => + withParquetDataFrame(data) { df => // Structs are converted to `Row`s checkAnswer(df, data.map { case Tuple1(m) => Row(m.map { case (k, v) => Row(k.productIterator.toSeq: _*) -> v }) @@ -280,7 +280,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession ) ) } - withParquetDataFrame(data.toDF()) { df => + withParquetDataFrame(data) { df => // Structs are converted to `Row`s checkAnswer(df, data.map { case Tuple1(m) => Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*))) @@ -296,7 +296,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession null.asInstanceOf[java.lang.Float], null.asInstanceOf[java.lang.Double]) - withParquetDataFrame((allNulls :: Nil).toDF()) { df => + withParquetDataFrame(allNulls :: Nil) { df => val rows = df.collect() assert(rows.length === 1) assert(rows.head === Row(Seq.fill(5)(null): _*)) @@ -309,7 +309,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession None.asInstanceOf[Option[Long]], None.asInstanceOf[Option[String]]) - withParquetDataFrame((allNones :: Nil).toDF()) { df => + withParquetDataFrame(allNones :: Nil) { df => val rows = df.collect() assert(rows.length === 1) assert(rows.head === Row(Seq.fill(3)(null): _*)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index 105f025adc0ad..db8ee724c01c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -63,18 +63,12 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { (f: String => Unit): Unit = withDataSourceFile(data)(f) /** - * Writes `df` dataframe to a Parquet file and reads it back as a [[DataFrame]], + * Writes `data` to a Parquet file and reads it back as a [[DataFrame]], * which is then passed to `f`. The Parquet file will be deleted after `f` returns. */ - protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean = true) - (f: DataFrame => Unit): Unit = { - withTempPath { file => - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { - df.write.format(dataSourceName).save(file.getCanonicalPath) - } - readFile(file.getCanonicalPath, testVectorized)(f) - } - } + protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag] + (data: Seq[T], testVectorized: Boolean = true) + (f: DataFrame => Unit): Unit = withDataSourceDataFrame(data, testVectorized)(f) /** * Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a