Skip to content

Commit

Permalink
Support TIMESTAMP_WITH_LOCAL_TIME_ZONE
Browse files Browse the repository at this point in the history
  • Loading branch information
banmoy committed Dec 4, 2023
1 parent 4db2da4 commit 7688367
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.ververica.cdc.common.utils.Preconditions;
import com.ververica.cdc.common.utils.SchemaUtils;

import java.time.ZoneId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -45,12 +46,21 @@ public class EventRecordSerializationSchema implements RecordSerializationSchema

private static final long serialVersionUID = 1L;

/**
* The local time zone used when converting from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>.
*/
private final ZoneId zoneId;

/** keep the relationship of TableId and table information. */
private transient Map<TableId, TableInfo> tableInfoMap;

private transient DefaultStarRocksRowData reusableRowData;
private transient JsonWrapper jsonWrapper;

public EventRecordSerializationSchema(ZoneId zoneId) {
this.zoneId = zoneId;
}

@Override
public void open(
SerializationSchema.InitializationContext context, StarRocksSinkContext sinkContext) {
Expand Down Expand Up @@ -88,7 +98,7 @@ private void applySchemaChangeEvent(SchemaChangeEvent event) {
tableInfo.fieldGetters = new RecordData.FieldGetter[newSchema.getColumnCount()];
for (int i = 0; i < newSchema.getColumnCount(); i++) {
tableInfo.fieldGetters[i] =
createFieldGetter(newSchema.getColumns().get(i).getType(), i);
createFieldGetter(newSchema.getColumns().get(i).getType(), i, zoneId);
}
tableInfoMap.put(tableId, tableInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.ververica.cdc.common.sink.MetadataApplier;

import java.io.Serializable;
import java.time.ZoneId;

/** A {@link DataSink} for StarRocks connector that supports schema evolution. */
public class StarRocksDataSink implements DataSink, Serializable {
Expand All @@ -42,19 +43,27 @@ public class StarRocksDataSink implements DataSink, Serializable {
/** Configurations for schema change. */
private final SchemaChangeConfig schemaChangeConfig;

/**
* The local time zone used when converting from <code>TIMESTAMP WITH LOCAL TIME ZONE</code>.
*/
private final ZoneId zoneId;

public StarRocksDataSink(
StarRocksSinkOptions sinkOptions,
TableCreateConfig tableCreateConfig,
SchemaChangeConfig schemaChangeConfig) {
SchemaChangeConfig schemaChangeConfig,
ZoneId zoneId) {
this.sinkOptions = sinkOptions;
this.tableCreateConfig = tableCreateConfig;
this.schemaChangeConfig = schemaChangeConfig;
this.zoneId = zoneId;
}

@Override
public EventSinkProvider getEventSinkProvider() {
StarRocksSink<Event> starRocksSink =
SinkFunctionFactory.createSink(sinkOptions, new EventRecordSerializationSchema());
SinkFunctionFactory.createSink(
sinkOptions, new EventRecordSerializationSchema(zoneId));
return FlinkSinkProvider.of(starRocksSink);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import com.ververica.cdc.common.factories.DataSinkFactory;
import com.ververica.cdc.common.sink.DataSink;

import java.time.ZoneId;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static com.ververica.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.JDBC_URL;
import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.LOAD_URL;
import static com.ververica.cdc.connectors.starrocks.sink.StarRocksDataSinkOptions.PASSWORD;
Expand Down Expand Up @@ -57,7 +59,12 @@ public DataSink createDataSink(Context context) {
TableCreateConfig.from(context.getFactoryConfiguration());
SchemaChangeConfig schemaChangeConfig =
SchemaChangeConfig.from(context.getFactoryConfiguration());
return new StarRocksDataSink(sinkOptions, tableCreateConfig, schemaChangeConfig);
String zoneStr = context.getFactoryConfiguration().get(PIPELINE_LOCAL_TIME_ZONE);
ZoneId zoneId =
PIPELINE_LOCAL_TIME_ZONE.defaultValue().equals(zoneStr)
? ZoneId.systemDefault()
: ZoneId.of(zoneStr);
return new StarRocksDataSink(sinkOptions, tableCreateConfig, schemaChangeConfig, zoneId);
}

private StarRocksSinkOptions buildSinkConnectorOptions(Configuration cdcConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -122,8 +124,11 @@ public static void toStarRocksDataType(Column cdcColumn, StarRocksColumn.Builder
*
* @param fieldType the element type of the RecordData
* @param fieldPos the element position of the RecordData
* @param zoneId the time zone used when converting from <code>TIMESTAMP WITH LOCAL TIME ZONE
* </code>
*/
public static RecordData.FieldGetter createFieldGetter(DataType fieldType, int fieldPos) {
public static RecordData.FieldGetter createFieldGetter(
DataType fieldType, int fieldPos, ZoneId zoneId) {
final RecordData.FieldGetter fieldGetter;
// ordered by type root definition
switch (fieldType.getTypeRoot()) {
Expand Down Expand Up @@ -177,10 +182,13 @@ record ->
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
fieldGetter =
record ->
DATETIME_FORMATTER.format(
record.getLocalZonedTimestampData(
fieldPos, getPrecision(fieldType))
.toInstant());
ZonedDateTime.ofInstant(
record.getLocalZonedTimestampData(
fieldPos, getPrecision(fieldType))
.toInstant(),
zoneId)
.toLocalDateTime()
.format(DATETIME_FORMATTER);
break;
default:
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.connector.flink.table.sink.v2.DefaultStarRocksSinkContext;
import com.ververica.cdc.common.data.DecimalData;
import com.ververica.cdc.common.data.LocalZonedTimestampData;
import com.ververica.cdc.common.data.TimestampData;
import com.ververica.cdc.common.data.binary.BinaryStringData;
import com.ververica.cdc.common.event.AddColumnEvent;
Expand All @@ -51,6 +52,7 @@
import com.ververica.cdc.common.types.DecimalType;
import com.ververica.cdc.common.types.FloatType;
import com.ververica.cdc.common.types.IntType;
import com.ververica.cdc.common.types.LocalZonedTimestampType;
import com.ververica.cdc.common.types.SmallIntType;
import com.ververica.cdc.common.types.TimestampType;
import com.ververica.cdc.common.types.VarCharType;
Expand All @@ -64,6 +66,9 @@
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.OptionalLong;
Expand All @@ -81,7 +86,7 @@ public class EventRecordSerializationSchemaTest {

@Before
public void setup() {
this.serializer = new EventRecordSerializationSchema();
this.serializer = new EventRecordSerializationSchema(ZoneId.of("+08"));
this.serializer.open(
new MockInitializationContext(),
new DefaultStarRocksSinkContext(
Expand Down Expand Up @@ -201,7 +206,10 @@ public void testMixedSchemaAndDataChanges() throws Exception {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col4", new DecimalType(20, 5))),
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col5", new SmallIntType()))));
Column.physicalColumn("col5", new SmallIntType())),
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"col6", new LocalZonedTimestampType()))));
Schema newSchema1 = SchemaUtils.applySchemaChangeEvent(schema1, addColumnEvent);
BinaryRecordDataGenerator newGenerator1 =
new BinaryRecordDataGenerator(
Expand All @@ -218,11 +226,14 @@ public void testMixedSchemaAndDataChanges() throws Exception {
TimestampData.fromTimestamp(
Timestamp.valueOf("2023-11-27 21:00:00")),
DecimalData.fromBigDecimal(new BigDecimal("83.23"), 20, 5),
(short) 9
(short) 9,
LocalZonedTimestampData.fromInstant(
LocalDateTime.of(2023, 11, 27, 21, 0, 0)
.toInstant(ZoneOffset.of("+10")))
}));
verifySerializeResult(
table1,
"{\"col1\":4,\"col2\":true,\"col3\":\"2023-11-27 21:00:00\",\"col4\":83.23,\"col5\":9,\"__op\":1}",
"{\"col1\":4,\"col2\":true,\"col3\":\"2023-11-27 21:00:00\",\"col4\":83.23,\"col5\":9,\"col6\":\"2023-11-27 19:00:00\",\"__op\":1}",
serializer.serialize(deleteEvent2));

// 4. drop columns from table2, and insert data
Expand Down

0 comments on commit 7688367

Please sign in to comment.