Skip to content

Commit

Permalink
Add more tests: negative test cases, checking results against spark
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Aug 21, 2024
1 parent ea6dcb2 commit 4fad52e
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 89 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
package io.delta.kernel.defaults.internal.parquet

import java.math.BigDecimal
import io.delta.golden.GoldenTableUtils.goldenTableFile
import java.util.TimeZone

import io.delta.golden.GoldenTableUtils.{goldenTableFile, goldenTablePath}
import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow}
import io.delta.kernel.test.VectorTestUtils
import io.delta.kernel.types._
import org.apache.spark.sql.internal.SQLConf
import org.scalatest.funsuite.AnyFunSuite

class ParquetFileReaderSuite extends AnyFunSuite
Expand Down Expand Up @@ -88,6 +91,132 @@ class ParquetFileReaderSuite extends AnyFunSuite
}
}

/**
* Test case for reading a column using a given type.
* @param columnName Column to read from the file
* @param toType Read type to use. May be different from the actually Parquet type.
* @param expectedExpr Expression returning the expected value for each row in the file.
*/
case class TestCase(columnName: String, toType: DataType, expectedExpr: Int => Any)

private val supportedConversions: Seq[TestCase] = Seq(
// 'ByteType' column was generated with overflowing values, we need to call i.toByte to also
// wrap around here and generate the correct expected values.
TestCase("ByteType", ShortType.SHORT, i => if (i % 72 != 0) i.toByte.toShort else null),
TestCase("ByteType", IntegerType.INTEGER, i => if (i % 72 != 0) i.toByte.toInt else null),
TestCase("ByteType", LongType.LONG, i => if (i % 72 != 0) i.toByte.toLong else null),
TestCase("ByteType", DoubleType.DOUBLE, i => if (i % 72 != 0) i.toByte.toDouble else null),
TestCase("ShortType", IntegerType.INTEGER, i => if (i % 56 != 0) i else null),
TestCase("ShortType", LongType.LONG, i => if (i % 56 != 0) i.toLong else null),
TestCase("ShortType", DoubleType.DOUBLE, i => if (i % 56 != 0) i.toDouble else null),
TestCase("IntegerType", LongType.LONG, i => if (i % 23 != 0) i.toLong else null),
TestCase("IntegerType", DoubleType.DOUBLE, i => if (i % 23 != 0) i.toDouble else null),

TestCase("FloatType", DoubleType.DOUBLE,
i => if (i % 28 != 0) (i * 0.234).toFloat.toDouble else null),
TestCase("decimal", new DecimalType(12, 2),
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 12352, 2) else null),
TestCase("decimal", new DecimalType(12, 4),
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 1235200, 4) else null),
TestCase("decimal", new DecimalType(26, 10),
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 12352, 2).setScale(10)
else null),
TestCase("IntegerType", new DecimalType(10, 0),
i => if (i % 23 != 0) new java.math.BigDecimal(i) else null),
TestCase("IntegerType", new DecimalType(16, 4),
i => if (i % 23 != 0) new java.math.BigDecimal(i).setScale(4) else null),
TestCase("LongType", new DecimalType(20, 0),
i => if (i % 25 != 0) new java.math.BigDecimal(i + 1) else null),
TestCase("LongType", new DecimalType(28, 6),
i => if (i % 25 != 0) new java.math.BigDecimal(i + 1).setScale(6) else null),

TestCase("BinaryType", StringType.STRING, i => if (i % 59 != 0) i.toString else null),
)

// The following conversions are supported by Kernel but not by Spark with parquet-mr.
// TODO: We should properly reject these conversions, a lot of them produce wrong results.
// Collecting them here to document the current behavior.
private val kernelOnlyConversions: Seq[TestCase] = Seq(
// This conversions will silently overflow.
TestCase("ShortType", ByteType.BYTE, i => if (i % 56 != 0) i.toByte else null),
TestCase("IntegerType", ByteType.BYTE, i => if (i % 23 != 0) i.toByte else null),
TestCase("IntegerType", ShortType.SHORT, i => if (i % 23 != 0) i.toShort else null),

// This is reading the unscaled decimal value as long which is wrong.
TestCase("decimal", LongType.LONG, i => if (i % 67 != 0) i.toLong * 12352 else null),

// The following conversions seem legit, although Spark rejects them.
TestCase("ByteType", DateType.DATE, i => if (i % 72 != 0) i.toByte.toInt else null),
TestCase("ShortType", DateType.DATE, i => if (i % 56 != 0) i else null),
TestCase("IntegerType", DateType.DATE, i => if (i % 23 != 0) i else null),
TestCase("StringType", BinaryType.BINARY, i => if (i % 57 != 0) i.toString.getBytes else null)
)

for (testCase <- supportedConversions ++ kernelOnlyConversions)
test(s"parquet supported conversion - ${testCase.columnName} -> ${testCase.toType.toString}") {
val inputLocation = goldenTablePath("parquet-all-types")
val readSchema = new StructType().add(testCase.columnName, testCase.toType)
val result = readParquetFilesUsingKernel(inputLocation, readSchema)
val expected = (0 until 200)
.map { i => TestRow(testCase.expectedExpr(i))}
checkAnswer(result, expected)

if (!kernelOnlyConversions.contains(testCase)) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val sparkResult = readParquetFilesUsingSpark(inputLocation, readSchema)
checkAnswer(result, sparkResult)
}
}
}

test (s"parquet supported conversion - date -> timestamp_ntz") {
val timezones =
Seq("UTC", "Iceland", "PST", "America/Los_Angeles", "Etc/GMT+9", "Asia/Beirut", "JST")
for (fromTimezone <- timezones; toTimezone <- timezones) {
val inputLocation = goldenTablePath(s"data-reader-date-types-$fromTimezone")
TimeZone.setDefault(TimeZone.getTimeZone(toTimezone))

val readSchema = new StructType().add("date", TimestampNTZType.TIMESTAMP_NTZ)
val result = readParquetFilesUsingKernel(inputLocation, readSchema)
// 1577836800000000L -> 2020-01-01 00:00:00 UTC
checkAnswer(result, Seq(TestRow(1577836800000000L)))
}
}

for(column <- Seq("BooleanType", "ByteType", "ShortType", "IntegerType", "LongType",
"FloatType", "DoubleType", "StringType", "BinaryType")) {
test(s"parquet unsupported conversion from $column") {
val inputLocation = goldenTablePath("parquet-all-types")
val supportedTypes = (supportedConversions ++ kernelOnlyConversions)
.filter(_.columnName == column)
.map(_.toType)
val unsupportedTypes = ALL_TYPES
.filterNot(supportedTypes.contains)
.filterNot(_.getClass.getSimpleName == column)

for (toType <- unsupportedTypes) {
val readSchema = new StructType().add(column, toType)
withClue(s"Converting $column to $toType") {
assertThrows[Throwable](readParquetFilesUsingKernel(inputLocation, readSchema))
}
}
}
}

test(s"parquet unsupported conversion from decimal") {
val inputLocation = goldenTablePath("parquet-all-types")
// 'decimal' column is Decimal(10, 2) which fits into a long.
for (toType <- ALL_TYPES.filterNot(_ == LongType.LONG)) {
val readSchema = new StructType().add("decimal", toType)
withClue(s"Converting decimal to $toType") {
// We don't properly reject conversions and the error we get are currently varying a lot, so
// we catch a generic Throwable.
// TODO: Uniformize rejecting unsupported conversions.
assertThrows[Throwable](readParquetFilesUsingKernel(inputLocation, readSchema))
}
}
}

test("read subset of columns") {
val tablePath = goldenTableFile("parquet-all-types").getAbsolutePath
val readSchema = new StructType()
Expand Down

0 comments on commit 4fad52e

Please sign in to comment.