diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java index 050d1728f7c37..5f5060795219c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java @@ -28,6 +28,7 @@ import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.table.types.logical.MapType; @@ -149,8 +150,12 @@ public static DataType convertToDataType(Schema schema) { // logical timestamp type if (schema.getLogicalType() == LogicalTypes.timestampMillis()) { return DataTypes.TIMESTAMP(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(); } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { return DataTypes.TIMESTAMP(6).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull(); } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) { return DataTypes.TIME(3).notNull(); } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { @@ -242,19 +247,36 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) { // use long to represents Timestamp final TimestampType timestampType = (TimestampType) logicalType; precision = timestampType.getPrecision(); - org.apache.avro.LogicalType avroLogicalType; + org.apache.avro.LogicalType timestampLogicalType; if (precision <= 3) { - avroLogicalType = LogicalTypes.timestampMillis(); + timestampLogicalType = LogicalTypes.timestampMillis(); } else if (precision <= 6) { - avroLogicalType = LogicalTypes.timestampMicros(); + timestampLogicalType = LogicalTypes.timestampMicros(); } else { throw new IllegalArgumentException( "Avro does not support TIMESTAMP type with precision: " + precision + ", it only supports precision less than 6."); } - Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType()); + Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType()); return nullable ? nullableSchema(timestamp) : timestamp; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + // use long to represents LocalZonedTimestampType + final LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType; + precision = localZonedTimestampType.getPrecision(); + org.apache.avro.LogicalType localZonedTimestampLogicalType; + if (precision <= 3) { + localZonedTimestampLogicalType = LogicalTypes.localTimestampMillis(); + } else if (precision <= 6) { + localZonedTimestampLogicalType = LogicalTypes.localTimestampMicros(); + } else { + throw new IllegalArgumentException( + "Avro does not support LOCAL TIMESTAMP type with precision: " + + precision + + ", it only supports precision less than 6."); + } + Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType()); + return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp; case DATE: // use int to represents Date Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); @@ -319,7 +341,6 @@ public static Schema convertToSchema(LogicalType logicalType, String rowName) { .items(convertToSchema(arrayType.getElementType(), rowName)); return nullable ? nullableSchema(array) : array; case RAW: - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: default: throw new UnsupportedOperationException( "Unsupported to derive Schema for type: " + logicalType); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java index 558bb41f90490..5c9988dc0b2ed 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java @@ -32,6 +32,7 @@ import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.TimestampType; @@ -127,6 +128,8 @@ public static AvroToRowDataConverter createConverter(LogicalType type) { return AvroToRowDataConverters::convertToDate; case TIME_WITHOUT_TIME_ZONE: return AvroToRowDataConverters::convertToTime; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return createTimestampConverter(((LocalZonedTimestampType) type).getPrecision()); case TIMESTAMP_WITHOUT_TIME_ZONE: return createTimestampConverter(((TimestampType) type).getPrecision()); case CHAR: diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java index d63cd7689b0ef..35737f8aeab75 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java @@ -18,9 +18,8 @@ package org.apache.hudi.util; -import org.apache.hudi.common.util.ValidationUtils; - import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.table.types.logical.LogicalTypeRoot; @@ -47,9 +46,13 @@ public static boolean isTimestampType(DataType type) { * Returns the precision of the given TIMESTAMP type. */ public static int precision(LogicalType logicalType) { - ValidationUtils.checkArgument(logicalType instanceof TimestampType); - TimestampType timestampType = (TimestampType) logicalType; - return timestampType.getPrecision(); + if (logicalType instanceof TimestampType) { + return ((TimestampType) logicalType).getPrecision(); + } else if (logicalType instanceof LocalZonedTimestampType) { + return ((LocalZonedTimestampType) logicalType).getPrecision(); + } else { + throw new AssertionError("Unexpected type: " + logicalType); + } } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java index 446a6d04178ce..ecebd1adcdbc8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java @@ -32,7 +32,6 @@ import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.TimestampType; import java.io.Serializable; import java.math.BigDecimal; @@ -157,8 +156,9 @@ public Object convert(Schema schema, Object object) { } }; break; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: - final int precision = ((TimestampType) type).getPrecision(); + final int precision = DataTypeUtils.precision(type); if (precision <= 3) { converter = new RowDataToAvroConverter() { @@ -231,7 +231,7 @@ public Object convert(Schema schema, Object object) { actualSchema = types.get(1); } else { throw new IllegalArgumentException( - "The Avro schema is not a nullable type: " + schema.toString()); + "The Avro schema is not a nullable type: " + schema); } } else { actualSchema = schema; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index e59a393fd3033..9b2a47e73893b 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -1438,6 +1438,36 @@ void testWriteReadWithComputedColumns() { assertRowsEquals(result2, "[+I[3]]"); } + @ParameterizedTest + @EnumSource(value = HoodieTableType.class) + void testWriteReadWithLocalTimestamp(HoodieTableType tableType) { + TableEnvironment tableEnv = batchTableEnv; + String createTable = sql("t1") + .field("f0 int") + .field("f1 varchar(10)") + .field("f2 TIMESTAMP_LTZ(3)") + .field("f4 TIMESTAMP_LTZ(6)") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.PRECOMBINE_FIELD, "f1") + .option(FlinkOptions.TABLE_TYPE, tableType) + .pkField("f0") + .noPartition() + .end(); + tableEnv.executeSql(createTable); + + String insertInto = "insert into t1 values\n" + + "(1, 'abc', TIMESTAMP '1970-01-01 08:00:01', TIMESTAMP '1970-01-01 08:00:02'),\n" + + "(2, 'def', TIMESTAMP '1970-01-01 08:00:03', TIMESTAMP '1970-01-01 08:00:04')"; + execInsertSql(tableEnv, insertInto); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + final String expected = "[" + + "+I[1, abc, 1970-01-01T00:00:01Z, 1970-01-01T00:00:02Z], " + + "+I[2, def, 1970-01-01T00:00:03Z, 1970-01-01T00:00:04Z]]"; + assertRowsEquals(result, expected); + } + // ------------------------------------------------------------------------- // Utilities // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java index e56541ed57cb3..b297b627ba3ab 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java @@ -23,6 +23,7 @@ import org.apache.hudi.util.AvroSchemaConverter; import org.apache.avro.Schema; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; import org.junit.jupiter.api.Test; @@ -50,4 +51,41 @@ void testUnionSchemaWithMultipleRecordTypes() { + "`isDeleted` BOOLEAN NOT NULL>"; assertThat(dataType.getChildren().get(pos).toString(), is(expected)); } + + @Test + void testLocalTimestampType() { + DataType dataType = DataTypes.ROW( + DataTypes.FIELD("f_localtimestamp_millis", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), + DataTypes.FIELD("f_localtimestamp_micros", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) + ); + // convert to avro schema + Schema schema = AvroSchemaConverter.convertToSchema(dataType.getLogicalType()); + final String expectedSchema = "" + + "[ \"null\", {\n" + + " \"type\" : \"record\",\n" + + " \"name\" : \"record\",\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"f_localtimestamp_millis\",\n" + + " \"type\" : [ \"null\", {\n" + + " \"type\" : \"long\",\n" + + " \"logicalType\" : \"local-timestamp-millis\"\n" + + " } ],\n" + + " \"default\" : null\n" + + " }, {\n" + + " \"name\" : \"f_localtimestamp_micros\",\n" + + " \"type\" : [ \"null\", {\n" + + " \"type\" : \"long\",\n" + + " \"logicalType\" : \"local-timestamp-micros\"\n" + + " } ],\n" + + " \"default\" : null\n" + + " } ]\n" + + "} ]"; + assertThat(schema.toString(true), is(expectedSchema)); + // convert it back + DataType convertedDataType = AvroSchemaConverter.convertToDataType(schema); + final String expectedDataType = "ROW<" + + "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), " + + "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>"; + assertThat(convertedDataType.toString(), is(expectedDataType)); + } } diff --git a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index aa63856040e01..75cf3272d6611 100644 --- a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -59,6 +59,7 @@ import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; @@ -334,7 +335,10 @@ private static ColumnReader createColumnReader( case TIMESTAMP_WITH_LOCAL_TIME_ZONE: switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) { case INT64: - return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision()); + int precision = fieldType instanceof TimestampType + ? ((TimestampType) fieldType).getPrecision() + : ((LocalZonedTimestampType) fieldType).getPrecision(); + return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision); case INT96: return new TimestampColumnReader(utcTimestamp, descriptor, pageReader); default: diff --git a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index c636b36100fea..dc59abe460212 100644 --- a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -59,6 +59,7 @@ import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; @@ -334,7 +335,10 @@ private static ColumnReader createColumnReader( case TIMESTAMP_WITH_LOCAL_TIME_ZONE: switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) { case INT64: - return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision()); + int precision = fieldType instanceof TimestampType + ? ((TimestampType) fieldType).getPrecision() + : ((LocalZonedTimestampType) fieldType).getPrecision(); + return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision); case INT96: return new TimestampColumnReader(utcTimestamp, descriptor, pageReader); default: diff --git a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java index ae79966e881a0..5eeb42514a2cc 100644 --- a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java +++ b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java @@ -59,6 +59,7 @@ import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; @@ -334,7 +335,10 @@ private static ColumnReader createColumnReader( case TIMESTAMP_WITH_LOCAL_TIME_ZONE: switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) { case INT64: - return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, ((TimestampType) fieldType).getPrecision()); + int precision = fieldType instanceof TimestampType + ? ((TimestampType) fieldType).getPrecision() + : ((LocalZonedTimestampType) fieldType).getPrecision(); + return new Int64TimestampColumnReader(utcTimestamp, descriptor, pageReader, precision); case INT96: return new TimestampColumnReader(utcTimestamp, descriptor, pageReader); default: