Skip to content

Commit

Permalink
[HUDI-4782] Support TIMESTAMP_LTZ type for flink (#6607)
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 authored Sep 7, 2022
1 parent d2d1cb8 commit 8ffcb2f
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.MapType;
Expand Down Expand Up @@ -149,8 +150,12 @@ public static DataType convertToDataType(Schema schema) {
// logical timestamp type
if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
return DataTypes.TIMESTAMP(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
return DataTypes.TIMESTAMP(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
Expand Down Expand Up @@ -242,19 +247,36 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
// use long to represents Timestamp
final TimestampType timestampType = (TimestampType) logicalType;
precision = timestampType.getPrecision();
org.apache.avro.LogicalType avroLogicalType;
org.apache.avro.LogicalType timestampLogicalType;
if (precision <= 3) {
avroLogicalType = LogicalTypes.timestampMillis();
timestampLogicalType = LogicalTypes.timestampMillis();
} else if (precision <= 6) {
avroLogicalType = LogicalTypes.timestampMicros();
timestampLogicalType = LogicalTypes.timestampMicros();
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
}
Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(timestamp) : timestamp;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
// use long to represents LocalZonedTimestampType
final LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType;
precision = localZonedTimestampType.getPrecision();
org.apache.avro.LogicalType localZonedTimestampLogicalType;
if (precision <= 3) {
localZonedTimestampLogicalType = LogicalTypes.localTimestampMillis();
} else if (precision <= 6) {
localZonedTimestampLogicalType = LogicalTypes.localTimestampMicros();
} else {
throw new IllegalArgumentException(
"Avro does not support LOCAL TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
}
Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp;
case DATE:
// use int to represents Date
Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
Expand Down Expand Up @@ -319,7 +341,6 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) {
.items(convertToSchema(arrayType.getElementType(), rowName));
return nullable ? nullableSchema(array) : array;
case RAW:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
default:
throw new UnsupportedOperationException(
"Unsupported to derive Schema for type: " + logicalType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
Expand Down Expand Up @@ -127,6 +128,8 @@ public static AvroToRowDataConverter createConverter(LogicalType type) {
return AvroToRowDataConverters::convertToDate;
case TIME_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTime;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return createTimestampConverter(((LocalZonedTimestampType) type).getPrecision());
case TIMESTAMP_WITHOUT_TIME_ZONE:
return createTimestampConverter(((TimestampType) type).getPrecision());
case CHAR:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

package org.apache.hudi.util;

import org.apache.hudi.common.util.ValidationUtils;

import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
Expand All @@ -47,9 +46,13 @@ public static boolean isTimestampType(DataType type) {
* Returns the precision of the given TIMESTAMP type.
*/
public static int precision(LogicalType logicalType) {
ValidationUtils.checkArgument(logicalType instanceof TimestampType);
TimestampType timestampType = (TimestampType) logicalType;
return timestampType.getPrecision();
if (logicalType instanceof TimestampType) {
return ((TimestampType) logicalType).getPrecision();
} else if (logicalType instanceof LocalZonedTimestampType) {
return ((LocalZonedTimestampType) logicalType).getPrecision();
} else {
throw new AssertionError("Unexpected type: " + logicalType);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

import java.io.Serializable;
import java.math.BigDecimal;
Expand Down Expand Up @@ -157,8 +156,9 @@ public Object convert(Schema schema, Object object) {
}
};
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int precision = ((TimestampType) type).getPrecision();
final int precision = DataTypeUtils.precision(type);
if (precision <= 3) {
converter =
new RowDataToAvroConverter() {
Expand Down Expand Up @@ -231,7 +231,7 @@ public Object convert(Schema schema, Object object) {
actualSchema = types.get(1);
} else {
throw new IllegalArgumentException(
"The Avro schema is not a nullable type: " + schema.toString());
"The Avro schema is not a nullable type: " + schema);
}
} else {
actualSchema = schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.time.ZoneId;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -1438,6 +1439,37 @@ void testWriteReadWithComputedColumns() {
assertRowsEquals(result2, "[+I[3]]");
}

@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testWriteReadWithLocalTimestamp(HoodieTableType tableType) {
TableEnvironment tableEnv = batchTableEnv;
tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
String createTable = sql("t1")
.field("f0 int")
.field("f1 varchar(10)")
.field("f2 TIMESTAMP_LTZ(3)")
.field("f4 TIMESTAMP_LTZ(6)")
.option(FlinkOptions.PATH, tempFile.getAbsolutePath())
.option(FlinkOptions.PRECOMBINE_FIELD, "f1")
.option(FlinkOptions.TABLE_TYPE, tableType)
.pkField("f0")
.noPartition()
.end();
tableEnv.executeSql(createTable);

String insertInto = "insert into t1 values\n"
+ "(1, 'abc', TIMESTAMP '1970-01-01 08:00:01', TIMESTAMP '1970-01-01 08:00:02'),\n"
+ "(2, 'def', TIMESTAMP '1970-01-01 08:00:03', TIMESTAMP '1970-01-01 08:00:04')";
execInsertSql(tableEnv, insertInto);

List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
final String expected = "["
+ "+I[1, abc, 1970-01-01T00:00:01Z, 1970-01-01T00:00:02Z], "
+ "+I[2, def, 1970-01-01T00:00:03Z, 1970-01-01T00:00:04Z]]";
assertRowsEquals(result, expected);
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.util.AvroSchemaConverter;

import org.apache.avro.Schema;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -50,4 +51,41 @@ void testUnionSchemaWithMultipleRecordTypes() {
+ "`isDeleted` BOOLEAN NOT NULL>";
assertThat(dataType.getChildren().get(pos).toString(), is(expected));
}

@Test
void testLocalTimestampType() {
DataType dataType = DataTypes.ROW(
DataTypes.FIELD("f_localtimestamp_millis", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
DataTypes.FIELD("f_localtimestamp_micros", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
);
// convert to avro schema
Schema schema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType());
final String expectedSchema = ""
+ "[ \"null\", {\n"
+ " \"type\" : \"record\",\n"
+ " \"name\" : \"record\",\n"
+ " \"fields\" : [ {\n"
+ " \"name\" : \"f_localtimestamp_millis\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"long\",\n"
+ " \"logicalType\" : \"local-timestamp-millis\"\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " }, {\n"
+ " \"name\" : \"f_localtimestamp_micros\",\n"
+ " \"type\" : [ \"null\", {\n"
+ " \"type\" : \"long\",\n"
+ " \"logicalType\" : \"local-timestamp-micros\"\n"
+ " } ],\n"
+ " \"default\" : null\n"
+ " } ]\n"
+ "} ]";
assertThat(schema.toString(true), is(expectedSchema));
// convert it back
DataType convertedDataType = AvroSchemaConverter.convertToDataType(schema);
final String expectedDataType = "ROW<"
+ "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), "
+ "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>";
assertThat(convertedDataType.toString(), is(expectedDataType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -334,7 +335,10 @@ private static ColumnReader createColumnReader(
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
int precision = fieldType instanceof TimestampType
? ((TimestampType) fieldType).getPrecision()
: ((LocalZonedTimestampType) fieldType).getPrecision();
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -334,7 +335,10 @@ private static ColumnReader createColumnReader(
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
int precision = fieldType instanceof TimestampType
? ((TimestampType) fieldType).getPrecision()
: ((LocalZonedTimestampType) fieldType).getPrecision();
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -334,7 +335,10 @@ private static ColumnReader createColumnReader(
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
case INT64:
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision());
int precision = fieldType instanceof TimestampType
? ((TimestampType) fieldType).getPrecision()
: ((LocalZonedTimestampType) fieldType).getPrecision();
return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision);
case INT96:
return new TimestampColumnReader(utcTimestamp, descriptor, pageReader);
default:
Expand Down

0 comments on commit 8ffcb2f

Please sign in to comment.