Skip to content

Commit

Permalink
Add widening type conversions to Kernel default parquet handler
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Aug 15, 2024
1 parent e763fe9 commit 58f8c86
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.delta.kernel.types.DataType;
import io.delta.kernel.types.StructType;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -71,6 +72,15 @@ public static long millisToMicros(long millis) {
return Math.multiplyExact(millis, DateTimeConstants.MICROS_PER_MILLIS);
}

/**
* Converts a number of days since epoch (1970-01-01 00:00:00 UTC) to microseconds between epoch
* and start of the day in the given timezone.
*/
public static long daysToMicros(int days, ZoneOffset timezone) {
long seconds = LocalDate.ofEpochDay(days).atStartOfDay(timezone).toEpochSecond();
return seconds * DateTimeConstants.MICROS_PER_SECOND;
}

/**
* Parses a TimestampNTZ string in UTC format, supporting milliseconds and microseconds, to
* microseconds since the Unix epoch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.delta.kernel.types.DecimalType;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.util.Arrays;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.io.api.Binary;
Expand Down Expand Up @@ -101,20 +102,23 @@ public abstract static class BaseDecimalColumnReader extends BasePrimitiveColumn
// working state
private BigDecimal[] values;

private final DataType dataType;
private final DecimalType decimalType;

private final int scale;
protected BigDecimal[] expandedDictionary;

BaseDecimalColumnReader(DataType dataType, int precision, int scale, int initialBatchSize) {
super(initialBatchSize);
DecimalType decimalType = (DecimalType) dataType;
int scaleIncrease = decimalType.getScale() - scale;
int precisionIncrease = decimalType.getPrecision() - precision;
checkArgument(
decimalType.getPrecision() == precision && decimalType.getScale() == scale,
scaleIncrease >= 0 && precisionIncrease >= scaleIncrease,
String.format(
"Found Delta type %s but Parquet type has precision=%s and scale=%s",
decimalType, precision, scale));
this.scale = scale;
this.dataType = dataType;
this.decimalType = decimalType;
this.values = new BigDecimal[initialBatchSize];
}

Expand All @@ -130,6 +134,9 @@ public boolean hasDictionarySupport() {
protected void addDecimal(BigDecimal value) {
resizeIfNeeded();
this.nullability[currentRowIndex] = false;
if (decimalType.getScale() != scale) {
value = value.setScale(decimalType.getScale(), RoundingMode.UNNECESSARY);
}
this.values[currentRowIndex] = value;
}

Expand All @@ -140,7 +147,7 @@ public void addValueFromDictionary(int dictionaryId) {

@Override
public ColumnVector getDataColumnVector(int batchSize) {
ColumnVector vector = new DefaultDecimalVector(dataType, batchSize, values);
ColumnVector vector = new DefaultDecimalVector(decimalType, batchSize, values);
// re-initialize the working space
this.nullability = ParquetColumnReaders.initNullabilityVector(nullability.length);
this.values = new BigDecimal[values.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public static class IntColumnReader extends BasePrimitiveColumnReader {

IntColumnReader(DataType dataType, int initialBatchSize) {
super(initialBatchSize);
checkArgument(dataType instanceof IntegerType || dataType instanceof DataType);
checkArgument(dataType instanceof IntegerType || dataType instanceof DateType);
this.dataType = dataType;
this.values = new int[initialBatchSize];
}
Expand Down Expand Up @@ -315,6 +315,13 @@ public static class LongColumnReader extends BasePrimitiveColumnReader {
this.values = new long[initialBatchSize];
}

@Override
public void addInt(int value) {
resizeIfNeeded();
this.nullability[currentRowIndex] = false;
this.values[currentRowIndex] = value;
}

@Override
public void addLong(long value) {
resizeIfNeeded();
Expand Down Expand Up @@ -388,6 +395,20 @@ public static class DoubleColumnReader extends BasePrimitiveColumnReader {
this.values = new double[initialBatchSize];
}

@Override
public void addInt(int value) {
resizeIfNeeded();
this.nullability[currentRowIndex] = false;
this.values[currentRowIndex] = value;
}

@Override
public void addFloat(float value) {
resizeIfNeeded();
this.nullability[currentRowIndex] = false;
this.values[currentRowIndex] = value;
}

@Override
public void addDouble(double value) {
resizeIfNeeded();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
package io.delta.kernel.defaults.internal.parquet;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;

import io.delta.kernel.defaults.internal.DefaultKernelUtils;
import io.delta.kernel.types.*;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.ZoneOffset;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.schema.*;
Expand Down Expand Up @@ -75,6 +77,10 @@ public static Converter createTimestampConverter(
throw new UnsupportedOperationException(
String.format("Unsupported Parquet TimeType unit=%s", timestamp.getUnit()));
}
} else if (typeFromClient == TimestampNTZType.TIMESTAMP_NTZ
&& primType.getPrimitiveTypeName() == INT32
&& typeAnnotation instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
return new DateToTimestampNTZConverter(typeFromClient, initialBatchSize);
} else {
throw new RuntimeException(
String.format("Unsupported timestamp column with Parquet type %s.", typeFromFile));
Expand Down Expand Up @@ -126,6 +132,18 @@ public void addLong(long value) {
}
}

public static class DateToTimestampNTZConverter extends ParquetColumnReaders.LongColumnReader {

DateToTimestampNTZConverter(DataType dataType, int initialBatchSize) {
super(validTimestampType(dataType), initialBatchSize);
}

@Override
public void addInt(int value) {
super.addLong(DefaultKernelUtils.daysToMicros(value, ZoneOffset.UTC));
}
}

private static DataType validTimestampType(DataType dataType) {
checkArgument(dataType instanceof TimestampType || dataType instanceof TimestampNTZType);
return dataType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.delta.kernel.defaults.internal.parquet

import java.util.TimeZone

import io.delta.golden.GoldenTableUtils.goldenTablePath
import io.delta.kernel.defaults.utils.TestRow
import io.delta.kernel.types._
import org.scalatest.funsuite.AnyFunSuite

/**
* Suite covering reading Parquet columns with different types.
*/
class ParquetColumnReaderSuite extends AnyFunSuite with ParquetSuiteBase {

/**
* Defines a test case for this suite.
* @param columnName Column to read from the file
* @param toType Read type to use. May be different from the actualy Parquet type.
* @param expectedExpr Expression returning the expected value for each row in the file.
*/
case class TestCase(columnName: String, toType: DataType, expectedExpr: Int => Any)

private val wideningTestCases: Seq[TestCase] = Seq(
TestCase("ByteType", ShortType.SHORT, i => if (i % 72 != 0) i.toByte.toShort else null),
TestCase("ByteType", IntegerType.INTEGER, i => if (i % 72 != 0) i.toByte.toInt else null),
TestCase("ByteType", LongType.LONG, i => if (i % 72 != 0) i.toByte.toLong else null),
TestCase("ShortType", IntegerType.INTEGER, i => if (i % 56 != 0) i else null),
TestCase("ShortType", LongType.LONG, i => if (i % 56 != 0) i.toLong else null),
TestCase("IntegerType", LongType.LONG, i => if (i % 23 != 0) i.toLong else null),
TestCase("IntegerType", DoubleType.DOUBLE, i => if (i % 23 != 0) i.toDouble else null),
TestCase("FloatType", DoubleType.DOUBLE,
i => if (i % 28 != 0) (i * 0.234).toFloat.toDouble else null),
TestCase("decimal", new DecimalType(12, 2),
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 12352, 2) else null),
TestCase("decimal", new DecimalType(12, 4),
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 1235200, 4) else null),
TestCase("decimal", new DecimalType(26, 10),
i => if (i % 67 != 0) java.math.BigDecimal.valueOf(i * 12352, 2).setScale(10)
else null),
TestCase("IntegerType", new DecimalType(10, 0),
i => if (i % 23 != 0) new java.math.BigDecimal(i) else null),
TestCase("IntegerType", new DecimalType(16, 4),
i => if (i % 23 != 0) new java.math.BigDecimal(i).setScale(4) else null),
TestCase("LongType", new DecimalType(20, 0),
i => if (i % 25 != 0) new java.math.BigDecimal(i + 1) else null),
TestCase("LongType", new DecimalType(28, 6),
i => if (i % 25 != 0) new java.math.BigDecimal(i + 1).setScale(6) else null)
)

for (testCase <- wideningTestCases)
test(s"parquet widening conversion - ${testCase.columnName} -> ${testCase.toType.toString}") {
val inputLocation = goldenTablePath("parquet-all-types")
val readSchema = new StructType().add(testCase.columnName, testCase.toType)
val result = readParquetFilesUsingKernel(inputLocation, readSchema)
val expected = (0 until 200)
.map { i => TestRow(testCase.expectedExpr(i))}
checkAnswer(result, expected)
}

test (s"parquet widening conversion - date -> timestamp_ntz") {
val timezones =
Seq("UTC", "Iceland", "PST", "America/Los_Angeles", "Etc/GMT+9", "Asia/Beirut", "JST")
for (fromTimezone <- timezones; toTimezone <- timezones) {
val inputLocation = goldenTablePath(s"data-reader-date-types-$fromTimezone")
TimeZone.setDefault(TimeZone.getTimeZone(toTimezone))

val readSchema = new StructType().add("date", TimestampNTZType.TIMESTAMP_NTZ)
val result = readParquetFilesUsingKernel(inputLocation, readSchema)
// 1577836800000000L -> 2020-01-01 00:00:00 UTC
checkAnswer(result, Seq(TestRow(1577836800000000L)))
}
}
}

0 comments on commit 58f8c86

Please sign in to comment.