From 7688367b2f0b099467b3b71826bccc77e851949c Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Mon, 4 Dec 2023 22:57:44 +0800 Subject: [PATCH] Support TIMESTAMP_WITH_LOCAL_TIME_ZONE --- .../sink/EventRecordSerializationSchema.java | 12 +++++++++++- .../starrocks/sink/StarRocksDataSink.java | 13 +++++++++++-- .../sink/StarRocksDataSinkFactory.java | 9 ++++++++- .../starrocks/sink/StarRocksUtils.java | 18 +++++++++++++----- .../EventRecordSerializationSchemaTest.java | 19 +++++++++++++++---- 5 files changed, 58 insertions(+), 13 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/EventRecordSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/EventRecordSerializationSchema.java index 6b55f1ae69..4a1b4a9798 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/EventRecordSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/EventRecordSerializationSchema.java @@ -34,6 +34,7 @@ import com.ververica.cdc.common.utils.Preconditions; import com.ververica.cdc.common.utils.SchemaUtils; +import java.time.ZoneId; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,12 +46,21 @@ public class EventRecordSerializationSchema implements RecordSerializationSchema private static final long serialVersionUID = 1L; + /** + * The local time zone used when converting from TIMESTAMP WITH LOCAL TIME ZONE. + */ + private final ZoneId zoneId; + /** keep the relationship of TableId and table information. */ private transient Map tableInfoMap; private transient DefaultStarRocksRowData reusableRowData; private transient JsonWrapper jsonWrapper; + public EventRecordSerializationSchema(ZoneId zoneId) { + this.zoneId = zoneId; + } + @Override public void open( SerializationSchema.InitializationContext context, StarRocksSinkContext sinkContext) { @@ -88,7 +98,7 @@ private void applySchemaChangeEvent(SchemaChangeEvent event) { tableInfo.fieldGetters = new RecordData.FieldGetter[newSchema.getColumnCount()]; for (int i = 0; i < newSchema.getColumnCount(); i++) { tableInfo.fieldGetters[i] = - createFieldGetter(newSchema.getColumns().get(i).getType(), i); + createFieldGetter(newSchema.getColumns().get(i).getType(), i, zoneId); } tableInfoMap.put(tableId, tableInfo); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSink.java index 96ba05eaa8..bf130c4e4c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSink.java @@ -27,6 +27,7 @@ import com.ververica.cdc.common.sink.MetadataApplier; import java.io.Serializable; +import java.time.ZoneId; /** A {@link DataSink} for StarRocks connector that supports schema evolution. */ public class StarRocksDataSink implements DataSink, Serializable { @@ -42,19 +43,27 @@ public class StarRocksDataSink implements DataSink, Serializable { /** Configurations for schema change. */ private final SchemaChangeConfig schemaChangeConfig; + /** + * The local time zone used when converting from TIMESTAMP WITH LOCAL TIME ZONE. + */ + private final ZoneId zoneId; + public StarRocksDataSink( StarRocksSinkOptions sinkOptions, TableCreateConfig tableCreateConfig, - SchemaChangeConfig schemaChangeConfig) { + SchemaChangeConfig schemaChangeConfig, + ZoneId zoneId) { this.sinkOptions = sinkOptions; this.tableCreateConfig = tableCreateConfig; this.schemaChangeConfig = schemaChangeConfig; + this.zoneId = zoneId; } @Override public EventSinkProvider getEventSinkProvider() { StarRocksSink starRocksSink = - SinkFunctionFactory.createSink(sinkOptions, new EventRecordSerializationSchema()); + SinkFunctionFactory.createSink( + sinkOptions, new EventRecordSerializationSchema(zoneId)); return FlinkSinkProvider.of(starRocksSink); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java index a221f08d78..eac903277c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksDataSinkFactory.java @@ -22,11 +22,13 @@ import com.ververica.cdc.common.factories.DataSinkFactory; import com.ververica.cdc.common.sink.DataSink; +import java.time.ZoneId; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL; import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL; import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD; @@ -57,7 +59,12 @@ public DataSink createDataSink(Context context) { TableCreateConfig.from(context.getFactoryConfiguration()); SchemaChangeConfig schemaChangeConfig = SchemaChangeConfig.from(context.getFactoryConfiguration()); - return new StarRocksDataSink(sinkOptions, tableCreateConfig, schemaChangeConfig); + String zoneStr = context.getFactoryConfiguration().get(PIPELINE_LOCAL_TIME_ZONE); + ZoneId zoneId = + PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zoneStr) + ? ZoneId.systemDefault() + : ZoneId.of(zoneStr); + return new StarRocksDataSink(sinkOptions, tableCreateConfig, schemaChangeConfig, zoneId); } private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksUtils.java index 665e3bcf2c..4f1394caae 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksUtils.java @@ -41,6 +41,8 @@ import java.sql.Date; import java.text.SimpleDateFormat; import java.time.LocalDate; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; @@ -122,8 +124,11 @@ public static void toStarRocksDataType(Column cdcColumn, StarRocksColumn.Builder * * @param fieldType the element type of the RecordData * @param fieldPos the element position of the RecordData + * @param zoneId the time zone used when converting from TIMESTAMP WITH LOCAL TIME ZONE + * */ - public static RecordData.FieldGetter createFieldGetter(DataType fieldType, int fieldPos) { + public static RecordData.FieldGetter createFieldGetter( + DataType fieldType, int fieldPos, ZoneId zoneId) { final RecordData.FieldGetter fieldGetter; // ordered by type root definition switch (fieldType.getTypeRoot()) { @@ -177,10 +182,13 @@ record -> case TIMESTAMP_WITH_LOCAL_TIME_ZONE: fieldGetter = record -> - DATETIME_FORMATTER.format( - record.getLocalZonedTimestampData( - fieldPos, getPrecision(fieldType)) - .toInstant()); + ZonedDateTime.ofInstant( + record.getLocalZonedTimestampData( + fieldPos, getPrecision(fieldType)) + .toInstant(), + zoneId) + .toLocalDateTime() + .format(DATETIME_FORMATTER); break; default: throw new UnsupportedOperationException( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java index 661c1bd24f..0616f64155 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java @@ -36,6 +36,7 @@ import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.starrocks.connector.flink.table.sink.v2.DefaultStarRocksSinkContext; import com.ververica.cdc.common.data.DecimalData; +import com.ververica.cdc.common.data.LocalZonedTimestampData; import com.ververica.cdc.common.data.TimestampData; import com.ververica.cdc.common.data.binary.BinaryStringData; import com.ververica.cdc.common.event.AddColumnEvent; @@ -51,6 +52,7 @@ import com.ververica.cdc.common.types.DecimalType; import com.ververica.cdc.common.types.FloatType; import com.ververica.cdc.common.types.IntType; +import com.ververica.cdc.common.types.LocalZonedTimestampType; import com.ververica.cdc.common.types.SmallIntType; import com.ververica.cdc.common.types.TimestampType; import com.ververica.cdc.common.types.VarCharType; @@ -64,6 +66,9 @@ import java.math.BigDecimal; import java.sql.Timestamp; import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.HashMap; import java.util.OptionalLong; @@ -81,7 +86,7 @@ public class EventRecordSerializationSchemaTest { @Before public void setup() { - this.serializer = new EventRecordSerializationSchema(); + this.serializer = new EventRecordSerializationSchema(ZoneId.of("+08")); this.serializer.open( new MockInitializationContext(), new DefaultStarRocksSinkContext( @@ -201,7 +206,10 @@ public void testMixedSchemaAndDataChanges() throws Exception { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("col4", new DecimalType(20, 5))), new AddColumnEvent.ColumnWithPosition( - Column.physicalColumn("col5", new SmallIntType())))); + Column.physicalColumn("col5", new SmallIntType())), + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn( + "col6", new LocalZonedTimestampType())))); Schema newSchema1 = SchemaUtils.applySchemaChangeEvent(schema1, addColumnEvent); BinaryRecordDataGenerator newGenerator1 = new BinaryRecordDataGenerator( @@ -218,11 +226,14 @@ public void testMixedSchemaAndDataChanges() throws Exception { TimestampData.fromTimestamp( Timestamp.valueOf("2023-11-27 21:00:00")), DecimalData.fromBigDecimal(new BigDecimal("83.23"), 20, 5), - (short) 9 + (short) 9, + LocalZonedTimestampData.fromInstant( + LocalDateTime.of(2023, 11, 27, 21, 0, 0) + .toInstant(ZoneOffset.of("+10"))) })); verifySerializeResult( table1, - "{\"col1\":4,\"col2\":true,\"col3\":\"2023-11-27 21:00:00\",\"col4\":83.23,\"col5\":9,\"__op\":1}", + "{\"col1\":4,\"col2\":true,\"col3\":\"2023-11-27 21:00:00\",\"col4\":83.23,\"col5\":9,\"col6\":\"2023-11-27 19:00:00\",\"__op\":1}", serializer.serialize(deleteEvent2)); // 4. drop columns from table2, and insert data