Skip to content

Commit

Permalink
[Kernel] Add widening type conversions to Kernel default parquet read…
Browse files Browse the repository at this point in the history
…er (#3541)

\## Description
Add a set of conversions to the default parquet reader provided by
kernel to allow reading columns using a wider type than the actual in
the parquet file.
This will support the type widening table feature, see
https://github.com/delta-io/delta/blob/master/protocol_rfcs/type-widening.md.

Conversions added:
- INT32 -> long
- FLOAT -> double
- decimal precision/scale increase
- DATE -> timestamp_ntz
- INT32 -> double
- integers -> decimal

## How was this patch tested?
Added tests covering all conversions in `ParquetColumnReaderSuite`

## Does this PR introduce _any_ user-facing changes?
This change alone doesn't allow reading Delta table that use the type
widening table feature. That feature is still unsupported.
It does allow reading Delta tables that somehow have Parquet files that
contain types that are different from the table schema, but that really
should never happen for tables that don't support type widening..
  • Loading branch information
johanl-db authored Aug 26, 2024
1 parent b5e9aeb commit dcf9ea9
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructType;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -71,6 +72,15 @@ public static long millisToMicros(long millis) {
return Math.multiplyExact(millis, DateTimeConstants.MICROS_PER_MILLIS);
}

/**
* Converts a number of days since epoch (1970-01-01 00:00:00 UTC) to microseconds between epoch
* and start of the day in the given timezone.
*/
public static long daysToMicros(int days, ZoneOffset timezone) {
long seconds = LocalDate.ofEpochDay(days).atStartOfDay(timezone).toEpochSecond();
return seconds * DateTimeConstants.MICROS_PER_SECOND;
}

/**
* Parses a TimestampNTZ string in UTC format, supporting milliseconds and microseconds, to
* microseconds since the Unix epoch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.delta.kernel.types.DecimalType;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.util.Arrays;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.api.Binary;
Expand Down Expand Up @@ -101,20 +102,23 @@ public abstract static class BaseDecimalColumnReader extends BasePrimitiveColumn
// working state
private BigDecimal[] values;

private final DataType dataType;
private final DecimalType decimalType;

private final int scale;
protected BigDecimal[] expandedDictionary;

BaseDecimalColumnReader(DataType dataType, int precision, int scale, int initialBatchSize) {
super(initialBatchSize);
DecimalType decimalType = (DecimalType) dataType;
int scaleIncrease = decimalType.getScale() - scale;
int precisionIncrease = decimalType.getPrecision() - precision;
checkArgument(
decimalType.getPrecision() == precision && decimalType.getScale() == scale,
scaleIncrease >= 0 && precisionIncrease >= scaleIncrease,
String.format(
"Found Delta type %s but Parquet type has precision=%s and scale=%s",
decimalType, precision, scale));
this.scale = scale;
this.dataType = dataType;
this.decimalType = decimalType;
this.values = new BigDecimal[initialBatchSize];
}

Expand All @@ -130,6 +134,9 @@ public boolean hasDictionarySupport() {
protected void addDecimal(BigDecimal value) {
resizeIfNeeded();
this.nullability[currentRowIndex] = false;
if (decimalType.getScale() != scale) {
value = value.setScale(decimalType.getScale(), RoundingMode.UNNECESSARY);
}
this.values[currentRowIndex] = value;
}

Expand All @@ -140,7 +147,7 @@ public void addValueFromDictionary(int dictionaryId) {

@Override
public ColumnVector getDataColumnVector(int batchSize) {
ColumnVector vector = new DefaultDecimalVector(dataType, batchSize, values);
ColumnVector vector = new DefaultDecimalVector(decimalType, batchSize, values);
// re-initialize the working space
this.nullability = ParquetColumnReaders.initNullabilityVector(nullability.length);
this.values = new BigDecimal[values.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public static class IntColumnReader extends BasePrimitiveColumnReader {

IntColumnReader(DataType dataType, int initialBatchSize) {
super(initialBatchSize);
checkArgument(dataType instanceof IntegerType || dataType instanceof DataType);
checkArgument(dataType instanceof IntegerType || dataType instanceof DateType);
this.dataType = dataType;
this.values = new int[initialBatchSize];
}
Expand Down Expand Up @@ -315,6 +315,13 @@ public static class LongColumnReader extends BasePrimitiveColumnReader {
this.values = new long[initialBatchSize];
}

@Override
public void addInt(int value) {
resizeIfNeeded();
this.nullability[currentRowIndex] = false;
this.values[currentRowIndex] = value;
}

@Override
public void addLong(long value) {
resizeIfNeeded();
Expand Down Expand Up @@ -388,6 +395,20 @@ public static class DoubleColumnReader extends BasePrimitiveColumnReader {
this.values = new double[initialBatchSize];
}

@Override
public void addInt(int value) {
resizeIfNeeded();
this.nullability[currentRowIndex] = false;
this.values[currentRowIndex] = value;
}

@Override
public void addFloat(float value) {
resizeIfNeeded();
this.nullability[currentRowIndex] = false;
this.values[currentRowIndex] = value;
}

@Override
public void addDouble(double value) {
resizeIfNeeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
package io.delta.kernel.defaults.internal.parquet;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;

import io.delta.kernel.defaults.internal.DefaultKernelUtils;
import io.delta.kernel.types.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.ZoneOffset;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.schema.*;
Expand Down Expand Up @@ -75,6 +77,10 @@ public static Converter createTimestampConverter(
throw new UnsupportedOperationException(
String.format("Unsupported Parquet TimeType unit=%s", timestamp.getUnit()));
}
} else if (typeFromClient == TimestampNTZType.TIMESTAMP_NTZ
&& primType.getPrimitiveTypeName() == INT32
&& typeAnnotation instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
return new DateToTimestampNTZConverter(typeFromClient, initialBatchSize);
} else {
throw new RuntimeException(
String.format("Unsupported timestamp column with Parquet type %s.", typeFromFile));
Expand Down Expand Up @@ -126,6 +132,18 @@ public void addLong(long value) {
}
}

public static class DateToTimestampNTZConverter extends ParquetColumnReaders.LongColumnReader {

DateToTimestampNTZConverter(DataType dataType, int initialBatchSize) {
super(validTimestampType(dataType), initialBatchSize);
}

@Override
public void addInt(int value) {
super.addLong(DefaultKernelUtils.daysToMicros(value, ZoneOffset.UTC));
}
}

private static DataType validTimestampType(DataType dataType) {
checkArgument(dataType instanceof TimestampType || dataType instanceof TimestampNTZType);
return dataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
package io.delta.kernel.defaults.internal.parquet

import java.math.BigDecimal
import io.delta.golden.GoldenTableUtils.goldenTableFile
import java.util.TimeZone

import io.delta.golden.GoldenTableUtils.{goldenTableFile, goldenTablePath}
import io.delta.kernel.defaults.utils.{ExpressionTestUtils, TestRow}
import io.delta.kernel.test.VectorTestUtils
import io.delta.kernel.types._
import org.apache.spark.sql.internal.SQLConf
import org.scalatest.funsuite.AnyFunSuite
import org.apache.parquet.io.ParquetDecodingException

class ParquetFileReaderSuite extends AnyFunSuite
with ParquetSuiteBase with VectorTestUtils with ExpressionTestUtils {
Expand Down Expand Up @@ -88,6 +92,148 @@ class ParquetFileReaderSuite extends AnyFunSuite
}
}

/////////////////////////////////////////////////////////////////////////////////////////////////
// Tests covering reading parquet values into a wider column type //
/////////////////////////////////////////////////////////////////////////////////////////////////
/**
* Test case for reading a column using a given type.
* @param columnName Column to read from the file
* @param toType Read type to use. May be different from the actually Parquet type.
* @param expectedExpr Expression returning the expected value for each row in the file.
*/
case class TestCase(columnName: String, toType: DataType, expectedExpr: Int => Any)

private val supportedConversions: Seq[TestCase] = Seq(
// 'ByteType' column was generated with overflowing values, we need to call i.toByte to also
// wrap around here and generate the correct expected values.
TestCase("ByteType", ShortType.SHORT, i => if (i % 72 != 0) i.toByte.toShort else null),
TestCase("ByteType", IntegerType.INTEGER, i => if (i % 72 != 0) i.toByte.toInt else null),
TestCase("ByteType", LongType.LONG, i => if (i % 72 != 0) i.toByte.toLong else null),
TestCase("ByteType", DoubleType.DOUBLE, i => if (i % 72 != 0) i.toByte.toDouble else null),
TestCase("ShortType", IntegerType.INTEGER, i => if (i % 56 != 0) i else null),
TestCase("ShortType", LongType.LONG, i => if (i % 56 != 0) i.toLong else null),
TestCase("ShortType", DoubleType.DOUBLE, i => if (i % 56 != 0) i.toDouble else null),
TestCase("IntegerType", LongType.LONG, i => if (i % 23 != 0) i.toLong else null),
TestCase("IntegerType", DoubleType.DOUBLE, i => if (i % 23 != 0) i.toDouble else null),

TestCase("FloatType", DoubleType.DOUBLE,
i => if (i % 28 != 0) (i * 0.234).toFloat.toDouble else null),
TestCase("decimal", new DecimalType(12, 2),
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 12352, 2) else null),
TestCase("decimal", new DecimalType(12, 4),
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 1235200, 4) else null),
TestCase("decimal", new DecimalType(26, 10),
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 12352, 2).setScale(10)
else null),
TestCase("IntegerType", new DecimalType(10, 0),
i => if (i % 23 != 0) new java.math.BigDecimal(i) else null),
TestCase("IntegerType", new DecimalType(16, 4),
i => if (i % 23 != 0) new java.math.BigDecimal(i).setScale(4) else null),
TestCase("LongType", new DecimalType(20, 0),
i => if (i % 25 != 0) new java.math.BigDecimal(i + 1) else null),
TestCase("LongType", new DecimalType(28, 6),
i => if (i % 25 != 0) new java.math.BigDecimal(i + 1).setScale(6) else null),

TestCase("BinaryType", StringType.STRING, i => if (i % 59 != 0) i.toString else null)
)

// The following conversions are supported by Kernel but not by Spark with parquet-mr.
// TODO: We should properly reject these conversions, a lot of them produce wrong results.
// Collecting them here to document the current behavior.
private val kernelOnlyConversions: Seq[TestCase] = Seq(
// This conversions will silently overflow.
TestCase("ShortType", ByteType.BYTE, i => if (i % 56 != 0) i.toByte else null),
TestCase("IntegerType", ByteType.BYTE, i => if (i % 23 != 0) i.toByte else null),
TestCase("IntegerType", ShortType.SHORT, i => if (i % 23 != 0) i.toShort else null),

// This is reading the unscaled decimal value as long which is wrong.
TestCase("decimal", LongType.LONG, i => if (i % 67 != 0) i.toLong * 12352 else null),

// The following conversions seem legit, although Spark rejects them.
TestCase("ByteType", DateType.DATE, i => if (i % 72 != 0) i.toByte.toInt else null),
TestCase("ShortType", DateType.DATE, i => if (i % 56 != 0) i else null),
TestCase("IntegerType", DateType.DATE, i => if (i % 23 != 0) i else null),
TestCase("StringType", BinaryType.BINARY, i => if (i % 57 != 0) i.toString.getBytes else null)
)

for (testCase <- supportedConversions ++ kernelOnlyConversions)
test(s"parquet supported conversion - ${testCase.columnName} -> ${testCase.toType.toString}") {
val inputLocation = goldenTablePath("parquet-all-types")
val readSchema = new StructType().add(testCase.columnName, testCase.toType)
val result = readParquetFilesUsingKernel(inputLocation, readSchema)
val expected = (0 until 200)
.map { i => TestRow(testCase.expectedExpr(i))}
checkAnswer(result, expected)

if (!kernelOnlyConversions.contains(testCase)) {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val sparkResult = readParquetFilesUsingSpark(inputLocation, readSchema)
checkAnswer(result, sparkResult)
}
}
}

test (s"parquet supported conversion - date -> timestamp_ntz") {
val timezones =
Seq("UTC", "Iceland", "PST", "America/Los_Angeles", "Etc/GMT+9", "Asia/Beirut", "JST")
for (fromTimezone <- timezones; toTimezone <- timezones) {
val inputLocation = goldenTablePath(s"data-reader-date-types-$fromTimezone")
TimeZone.setDefault(TimeZone.getTimeZone(toTimezone))

val readSchema = new StructType().add("date", TimestampNTZType.TIMESTAMP_NTZ)
val result = readParquetFilesUsingKernel(inputLocation, readSchema)
// 1577836800000000L -> 2020-01-01 00:00:00 UTC
checkAnswer(result, Seq(TestRow(1577836800000000L)))
}
}

def checkParquetReadError(inputLocation: String, readSchema: StructType): Unit = {
val ex = intercept[Throwable] {
readParquetFilesUsingKernel(inputLocation, readSchema)
}
// We don't properly reject conversions and the error we get vary a lot, this checks various
// error message we may get as result.
// TODO: Uniformize rejecting unsupported conversions.
assert(
ex.getMessage.contains("Can not read value") ||
ex.getMessage.contains("column with Parquet type") ||
ex.getMessage.contains("Unable to create Parquet converter for") ||
ex.getMessage.contains("Found Delta type Decimal") ||
ex.getMessage.contains("cannot be cast to")
)
}

for(column <- Seq("BooleanType", "ByteType", "ShortType", "IntegerType", "LongType",
"FloatType", "DoubleType", "StringType", "BinaryType")) {
test(s"parquet unsupported conversion from $column") {
val inputLocation = goldenTablePath("parquet-all-types")
val supportedTypes = (supportedConversions ++ kernelOnlyConversions)
.filter(_.columnName == column)
.map(_.toType)
val unsupportedTypes = ALL_TYPES
.filterNot(supportedTypes.contains)
.filterNot(_.getClass.getSimpleName == column)

for (toType <- unsupportedTypes) {
val readSchema = new StructType().add(column, toType)
withClue(s"Converting $column to $toType") {
checkParquetReadError(inputLocation, readSchema)
}
}
}
}

test(s"parquet unsupported conversion from decimal") {
val inputLocation = goldenTablePath("parquet-all-types")
// 'decimal' column is Decimal(10, 2) which fits into a long.
for (toType <- ALL_TYPES.filterNot(_ == LongType.LONG)) {
val readSchema = new StructType().add("decimal", toType)
withClue(s"Converting decimal to $toType") {
checkParquetReadError(inputLocation, readSchema)
}
}
}

test("read subset of columns") {
val tablePath = goldenTableFile("parquet-all-types").getAbsolutePath
val readSchema = new StructType()
Expand Down

0 comments on commit dcf9ea9

Please sign in to comment.