diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index feb5121bc0..0949db372c 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -33,6 +33,10 @@ import org.apache.flink.cdc.common.types.DataTypeRoot; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.DecimalType; +import org.apache.flink.cdc.common.types.LocalZonedTimestampType; +import org.apache.flink.cdc.common.types.TimeType; +import org.apache.flink.cdc.common.types.TimestampType; +import org.apache.flink.cdc.common.types.ZonedTimestampType; import javax.annotation.Nullable; @@ -176,6 +180,27 @@ public static DataType inferWiderType(DataType lType, DataType rType) { if (lType.equals(rType)) { // identical type mergedType = rType; + } else if (lType instanceof TimestampType && rType instanceof TimestampType) { + return DataTypes.TIMESTAMP( + Math.max( + ((TimestampType) lType).getPrecision(), + ((TimestampType) rType).getPrecision())); + } else if (lType instanceof ZonedTimestampType && rType instanceof ZonedTimestampType) { + return DataTypes.TIMESTAMP_TZ( + Math.max( + ((ZonedTimestampType) lType).getPrecision(), + ((ZonedTimestampType) rType).getPrecision())); + } else if (lType instanceof LocalZonedTimestampType + && rType instanceof LocalZonedTimestampType) { + return DataTypes.TIMESTAMP_LTZ( + Math.max( + ((LocalZonedTimestampType) lType).getPrecision(), + ((LocalZonedTimestampType) rType).getPrecision())); + } else if (lType.is(DataTypeFamily.TIMESTAMP) && rType.is(DataTypeFamily.TIMESTAMP)) { + return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION); + } else if (lType instanceof TimeType && rType instanceof TimeType) { + return DataTypes.TIME( + Math.max(((TimeType) lType).getPrecision(), ((TimeType) rType).getPrecision())); } else if (lType.is(DataTypeFamily.INTEGER_NUMERIC) && rType.is(DataTypeFamily.INTEGER_NUMERIC)) { mergedType = DataTypes.BIGINT(); @@ -185,7 +210,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) { } else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC) && rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { mergedType = DataTypes.DOUBLE(); - } else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) { + } else if (lType instanceof DecimalType && rType instanceof DecimalType) { // Merge two decimal types DecimalType lhsDecimal = (DecimalType) lType; DecimalType rhsDecimal = (DecimalType) rType; @@ -195,7 +220,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) { rhsDecimal.getPrecision() - rhsDecimal.getScale()); int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale()); mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale); - } else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) { + } else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int DecimalType lhsDecimal = (DecimalType) lType; mergedType = @@ -204,7 +229,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) { lhsDecimal.getPrecision(), lhsDecimal.getScale() + getNumericPrecision(rType)), lhsDecimal.getScale()); - } else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) { + } else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) { // Merge decimal and int DecimalType rhsDecimal = (DecimalType) rType; mergedType = diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java index 11fa5d8149..79f508ac98 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/utils/SchemaUtilsTest.java @@ -291,6 +291,38 @@ public void testInferWiderType() { DataTypes.INT().nullable(), DataTypes.INT().nullable())) .isEqualTo(DataTypes.INT().nullable()); + // Test merging temporal types + Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.TIME(1), DataTypes.TIME(7))) + .isEqualTo(DataTypes.TIME(7)); + + Assertions.assertThat( + SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(9), DataTypes.TIMESTAMP(6))) + .isEqualTo(DataTypes.TIMESTAMP(9)); + + Assertions.assertThat( + SchemaUtils.inferWiderType( + DataTypes.TIMESTAMP_TZ(3), DataTypes.TIMESTAMP_TZ(7))) + .isEqualTo(DataTypes.TIMESTAMP_TZ(7)); + + Assertions.assertThat( + SchemaUtils.inferWiderType( + DataTypes.TIMESTAMP_LTZ(2), DataTypes.TIMESTAMP_LTZ(1))) + .isEqualTo(DataTypes.TIMESTAMP_LTZ(2)); + + Assertions.assertThat( + SchemaUtils.inferWiderType( + DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP())) + .isEqualTo(DataTypes.TIMESTAMP(9)); + + Assertions.assertThat( + SchemaUtils.inferWiderType(DataTypes.TIMESTAMP_TZ(), DataTypes.TIMESTAMP())) + .isEqualTo(DataTypes.TIMESTAMP(9)); + + Assertions.assertThat( + SchemaUtils.inferWiderType( + DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP_TZ())) + .isEqualTo(DataTypes.TIMESTAMP(9)); + // incompatible type merges test Assertions.assertThatThrownBy( () -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DOUBLE())) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index b3f17b2e52..114035fe94 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; @@ -88,17 +89,19 @@ private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking @Override public PipelineExecution compose(PipelineDef pipelineDef) { - int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM); + Configuration pipelineDefConfig = pipelineDef.getConfig(); + + int parallelism = pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM); env.getConfig().setParallelism(parallelism); SchemaChangeBehavior schemaChangeBehavior = - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR); + pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR); // Build Source Operator DataSourceTranslator sourceTranslator = new DataSourceTranslator(); DataStream stream = sourceTranslator.translate( - pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism); + pipelineDef.getSource(), env, pipelineDefConfig, parallelism); // Build PreTransformOperator for processing Schema Event TransformTranslator transformTranslator = new TransformTranslator(); @@ -110,10 +113,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) { SchemaOperatorTranslator schemaOperatorTranslator = new SchemaOperatorTranslator( schemaChangeBehavior, - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID), - pipelineDef - .getConfig() - .get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT)); + pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID), + pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT), + pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE)); OperatorIDGenerator schemaOperatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); @@ -122,13 +124,13 @@ public PipelineExecution compose(PipelineDef pipelineDef) { transformTranslator.translatePostTransform( stream, pipelineDef.getTransforms(), - pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE), + pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE), pipelineDef.getUdfs()); // Build DataSink in advance as schema operator requires MetadataApplier DataSinkTranslator sinkTranslator = new DataSinkTranslator(); DataSink dataSink = - sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env); + sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env); stream = schemaOperatorTranslator.translate( @@ -157,7 +159,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) { addFrameworkJars(); return new FlinkPipelineExecution( - env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking); + env, pipelineDefConfig.get(PipelineOptions.PIPELINE_NAME), isBlocking); } private void addFrameworkJars() { diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java index c965c88a62..c5cadcd1e4 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java @@ -19,7 +19,6 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -39,16 +38,18 @@ public class SchemaOperatorTranslator { private final SchemaChangeBehavior schemaChangeBehavior; private final String schemaOperatorUid; - private final Duration rpcTimeOut; + private final String timezone; public SchemaOperatorTranslator( SchemaChangeBehavior schemaChangeBehavior, String schemaOperatorUid, - Duration rpcTimeOut) { + Duration rpcTimeOut, + String timezone) { this.schemaChangeBehavior = schemaChangeBehavior; this.schemaOperatorUid = schemaOperatorUid; this.rpcTimeOut = rpcTimeOut; + this.timezone = timezone; } public DataStream translate( @@ -56,7 +57,8 @@ public DataStream translate( int parallelism, MetadataApplier metadataApplier, List routes) { - return addSchemaOperator(input, parallelism, metadataApplier, routes, schemaChangeBehavior); + return addSchemaOperator( + input, parallelism, metadataApplier, routes, schemaChangeBehavior, timezone); } public String getSchemaOperatorUid() { @@ -68,7 +70,8 @@ private DataStream addSchemaOperator( int parallelism, MetadataApplier metadataApplier, List routes, - SchemaChangeBehavior schemaChangeBehavior) { + SchemaChangeBehavior schemaChangeBehavior, + String timezone) { List routingRules = new ArrayList<>(); for (RouteDef route : routes) { routingRules.add( @@ -82,27 +85,12 @@ private DataStream addSchemaOperator( "SchemaOperator", new EventTypeInfo(), new SchemaOperatorFactory( - metadataApplier, routingRules, rpcTimeOut, schemaChangeBehavior)); + metadataApplier, + routingRules, + rpcTimeOut, + schemaChangeBehavior, + timezone)); stream.uid(schemaOperatorUid).setParallelism(parallelism); return stream; } - - private DataStream dropSchemaChangeEvent(DataStream input, int parallelism) { - return input.filter(event -> !(event instanceof SchemaChangeEvent)) - .setParallelism(parallelism); - } - - private DataStream exceptionOnSchemaChange(DataStream input, int parallelism) { - return input.map( - event -> { - if (event instanceof SchemaChangeEvent) { - throw new RuntimeException( - String.format( - "Aborting execution as the pipeline encountered a schema change event: %s", - event)); - } - return event; - }) - .setParallelism(parallelism); - } } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 81d466aab8..74afa8c4db 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -18,6 +18,10 @@ package org.apache.flink.cdc.composer.flink; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; +import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.data.binary.BinaryStringData; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.CreateTableEvent; @@ -57,10 +61,16 @@ import java.io.ByteArrayOutputStream; import java.io.PrintStream; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_1; import static org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper.TABLE_2; @@ -1092,4 +1102,199 @@ void testRouteWithReplaceSymbol(ValuesDataSink.SinkApi sinkApi) throws Exception "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}", "DataChangeEvent{tableId=replaced_namespace.replaced_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); } + + @ParameterizedTest + @EnumSource + void testMergingTemporalTypesWithPromotedPrecisions(ValuesDataSink.SinkApi sinkApi) + throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + List events = generateTemporalColumnEvents("default_table_"); + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, "America/New_York"); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Arrays.asList( + new RouteDef( + "default_namespace.default_schema.default_table_ts_\\.*", + "default_namespace.default_schema.default_table_timestamp_merged", + null, + "Merge timestamp columns with different precision"), + new RouteDef( + "default_namespace.default_schema.default_table_tz_\\.*", + "default_namespace.default_schema.default_table_zoned_timestamp_merged", + null, + "Merge timestamp_tz columns with different precision"), + new RouteDef( + "default_namespace.default_schema.default_table_ltz_\\.*", + "default_namespace.default_schema.default_table_local_zoned_timestamp_merged", + null, + "Merge timestamp_ltz columns with different precision"), + new RouteDef( + "default_namespace.default_schema.default_table_\\.*", + "default_namespace.default_schema.default_everything_merged", + null, + "Merge all timestamp family columns with different precision")), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + String[] expected = + Stream.of( + // Merging timestamp with different precision + "CreateTableEvent{tableId={}_table_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}_table_timestamp_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}}", + "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_timestamp_merged, before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + + // Merging zoned timestamp with different precision + "CreateTableEvent{tableId={}_table_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0) WITH TIME ZONE}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[3, Alice, 17, 2020-01-01T14:28:57Z], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}_table_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP(9) WITH TIME ZONE}, oldTypeMapping={birthday=TIMESTAMP(0) WITH TIME ZONE}}", + "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[4, Alice, 17, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[103, Zen, 19, 2020-01-01T14:28:57Z], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_zoned_timestamp_merged, before=[], after=[104, Zen, 19, 2020-01-01T14:28:57.123456789Z], op=INSERT, meta=()}", + + // Merging local-zoned timestamp with different precision + "CreateTableEvent{tableId={}_table_local_zoned_timestamp_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP_LTZ(0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[5, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}_table_local_zoned_timestamp_merged, typeMapping={birthday=TIMESTAMP_LTZ(9)}, oldTypeMapping={birthday=TIMESTAMP_LTZ(0)}}", + "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[6, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[105, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_table_local_zoned_timestamp_merged, before=[], after=[106, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + + // Merging all + "CreateTableEvent{tableId={}_everything_merged, schema=columns={`id` INT,`name` STRING,`age` INT,`birthday` TIMESTAMP(0)}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[1, Alice, 17, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId={}_everything_merged, typeMapping={birthday=TIMESTAMP(9)}, oldTypeMapping={birthday=TIMESTAMP(0)}}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[2, Alice, 17, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[3, Alice, 17, 2020-01-01T09:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[4, Alice, 17, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[5, Alice, 17, 2020-01-01T09:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[6, Alice, 17, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[101, Zen, 19, 2020-01-01T14:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[102, Zen, 19, 2020-01-01T14:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[103, Zen, 19, 2020-01-01T09:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[104, Zen, 19, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[105, Zen, 19, 2020-01-01T09:28:57], op=INSERT, meta=()}", + "DataChangeEvent{tableId={}_everything_merged, before=[], after=[106, Zen, 19, 2020-01-01T09:28:57.123456789], op=INSERT, meta=()}") + .map(s -> s.replace("{}", "default_namespace.default_schema.default")) + .toArray(String[]::new); + + assertThat(outputEvents).containsExactlyInAnyOrder(expected); + } + + private List generateTemporalColumnEvents(String tableNamePrefix) { + List events = new ArrayList<>(); + + // Initialize schemas + List names = Arrays.asList("ts_0", "ts_9", "tz_0", "tz_9", "ltz_0", "ltz_9"); + + List types = + Arrays.asList( + DataTypes.TIMESTAMP(0), + DataTypes.TIMESTAMP(9), + DataTypes.TIMESTAMP_TZ(0), + DataTypes.TIMESTAMP_TZ(9), + DataTypes.TIMESTAMP_LTZ(0), + DataTypes.TIMESTAMP_LTZ(9)); + + Instant lowPrecisionTimestamp = Instant.parse("2020-01-01T14:28:57Z"); + Instant highPrecisionTimestamp = Instant.parse("2020-01-01T14:28:57.123456789Z"); + + List values = + Arrays.asList( + TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant(lowPrecisionTimestamp, ZoneId.of("UTC"))), + TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant(highPrecisionTimestamp, ZoneId.of("UTC"))), + ZonedTimestampData.fromZonedDateTime( + ZonedDateTime.ofInstant(lowPrecisionTimestamp, ZoneId.of("UTC"))), + ZonedTimestampData.fromZonedDateTime( + ZonedDateTime.ofInstant(highPrecisionTimestamp, ZoneId.of("UTC"))), + LocalZonedTimestampData.fromInstant(lowPrecisionTimestamp), + LocalZonedTimestampData.fromInstant(highPrecisionTimestamp)); + + List schemas = + types.stream() + .map( + temporalColumnType -> + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("birthday", temporalColumnType) + .primaryKey("id") + .build()) + .collect(Collectors.toList()); + + for (int i = 0; i < names.size(); i++) { + TableId generatedTableId = + TableId.tableId( + "default_namespace", "default_schema", tableNamePrefix + names.get(i)); + Schema generatedSchema = schemas.get(i); + events.add(new CreateTableEvent(generatedTableId, generatedSchema)); + events.add( + DataChangeEvent.insertEvent( + generatedTableId, + generate(generatedSchema, 1 + i, "Alice", 17, values.get(i)))); + } + + for (int i = 0; i < names.size(); i++) { + TableId generatedTableId = + TableId.tableId( + "default_namespace", "default_schema", tableNamePrefix + names.get(i)); + Schema generatedSchema = schemas.get(i); + events.add( + DataChangeEvent.insertEvent( + generatedTableId, + generate(generatedSchema, 101 + i, "Zen", 19, values.get(i)))); + } + + return events; + } + + BinaryRecordData generate(Schema schema, Object... fields) { + return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) + .generate( + Arrays.stream(fields) + .map( + e -> + (e instanceof String) + ? BinaryStringData.fromString((String) e) + : e) + .toArray()); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java index 438cc37815..84eeebbca3 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -417,7 +417,8 @@ private void runJobWithEvents(List events) throws Exception { new SchemaOperatorTranslator( SchemaChangeBehavior.EVOLVE, "$$_schema_operator_$$", - DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT); + DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT, + "UTC"); OperatorIDGenerator schemaOperatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java index c5e833e545..8b433ae4ff 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java @@ -366,7 +366,8 @@ private void runJobWithEvents(List events) throws Exception { new SchemaOperatorTranslator( SchemaChangeBehavior.EVOLVE, "$$_schema_operator_$$", - DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT); + DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT, + "UTC"); OperatorIDGenerator schemaOperatorIDGenerator = new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid()); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 619dce8bc0..bdea7a879c 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -115,7 +115,7 @@ public void testSchemaEvolveWithIncompatibleChanges() throws Exception { false, Collections.emptyList(), Arrays.asList( - "java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"", + "java.lang.IllegalStateException: Incompatible types found for column `age': \"INT\" and \"DOUBLE\"", "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy")); } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java index 50d5dfb1d0..775fd1151a 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java @@ -115,7 +115,7 @@ public void testSchemaEvolveWithIncompatibleChanges() throws Exception { false, Collections.emptyList(), Arrays.asList( - "java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"", + "java.lang.IllegalStateException: Incompatible types found for column `age': \"INT\" and \"DOUBLE\"", "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy")); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 58f524a4bb..c31912c0a0 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -20,8 +20,11 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.data.LocalZonedTimestampData; import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.StringData; +import org.apache.flink.cdc.common.data.TimestampData; +import org.apache.flink.cdc.common.data.ZonedTimestampData; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.DropTableEvent; import org.apache.flink.cdc.common.event.Event; @@ -37,6 +40,7 @@ import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypeFamily; import org.apache.flink.cdc.common.types.DataTypeRoot; +import org.apache.flink.cdc.common.types.TimeType; import org.apache.flink.cdc.common.utils.ChangeEventUtils; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils; @@ -72,6 +76,8 @@ import java.io.Serializable; import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -97,6 +103,8 @@ public class SchemaOperator extends AbstractStreamOperator private final List routingRules; + private final String timezone; + /** * Storing route source table selector, sink table name (before symbol replacement), and replace * symbol in a tuple. @@ -127,6 +135,7 @@ public SchemaOperator(List routingRules) { this.chainingStrategy = ChainingStrategy.ALWAYS; this.rpcTimeOutInMillis = DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis(); this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE; + this.timezone = "UTC"; } @VisibleForTesting @@ -135,8 +144,10 @@ public SchemaOperator(List routingRules, Duration rpcTimeOut) { this.chainingStrategy = ChainingStrategy.ALWAYS; this.rpcTimeOutInMillis = rpcTimeOut.toMillis(); this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE; + this.timezone = "UTC"; } + @VisibleForTesting public SchemaOperator( List routingRules, Duration rpcTimeOut, @@ -145,6 +156,19 @@ public SchemaOperator( this.chainingStrategy = ChainingStrategy.ALWAYS; this.rpcTimeOutInMillis = rpcTimeOut.toMillis(); this.schemaChangeBehavior = schemaChangeBehavior; + this.timezone = "UTC"; + } + + public SchemaOperator( + List routingRules, + Duration rpcTimeOut, + SchemaChangeBehavior schemaChangeBehavior, + String timezone) { + this.routingRules = routingRules; + this.chainingStrategy = ChainingStrategy.ALWAYS; + this.rpcTimeOutInMillis = rpcTimeOut.toMillis(); + this.schemaChangeBehavior = schemaChangeBehavior; + this.timezone = timezone; } @Override @@ -372,7 +396,11 @@ private RecordData regenerateRecordData( } else { fieldGetters.add( new TypeCoercionFieldGetter( - column.getType(), fieldGetter, tolerantMode)); + originalSchema.getColumn(columnName).get().getType(), + column.getType(), + fieldGetter, + tolerantMode, + timezone)); } } } @@ -541,17 +569,23 @@ public Object getFieldOrNull(RecordData recordData) { } private static class TypeCoercionFieldGetter implements RecordData.FieldGetter { + private final DataType originalType; private final DataType destinationType; private final RecordData.FieldGetter originalFieldGetter; private final boolean tolerantMode; + private final String timezone; public TypeCoercionFieldGetter( + DataType originalType, DataType destinationType, RecordData.FieldGetter originalFieldGetter, - boolean tolerantMode) { + boolean tolerantMode, + String timezone) { + this.originalType = originalType; this.destinationType = destinationType; this.originalFieldGetter = originalFieldGetter; this.tolerantMode = tolerantMode; + this.timezone = timezone; } private Object fail(IllegalArgumentException e) throws IllegalArgumentException { @@ -609,6 +643,34 @@ public Object getFieldOrNull(RecordData recordData) { + "Currently only CHAR / VARCHAR can be accepted by a STRING column", originalField.getClass()))); } + } else if (destinationType.is(DataTypeRoot.TIME_WITHOUT_TIME_ZONE) + && originalType.is(DataTypeRoot.TIME_WITHOUT_TIME_ZONE)) { + Integer value = (Integer) originalField; + int precisionDifference = + ((TimeType) originalType).getPrecision() + - ((TimeType) destinationType).getPrecision(); + if (precisionDifference > 0) { + return value / (int) Math.pow(10, precisionDifference); + } else if (precisionDifference < 0) { + return value * (int) Math.pow(10, -precisionDifference); + } else { + return value; + } + } else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) + && originalType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { + // For now, TimestampData / ZonedTimestampData / LocalZonedTimestampData has no + // difference in its internal representation, so there's no need to do any precision + // conversion. + return originalField; + } else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE) + && originalType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) { + return originalField; + } else if (destinationType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) + && originalType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) { + return originalField; + } else if (destinationType.is(DataTypeFamily.TIMESTAMP) + && originalType.is(DataTypeFamily.TIMESTAMP)) { + return castToTimestamp(originalField, timezone); } else { return fail( new IllegalArgumentException( @@ -624,4 +686,21 @@ public void snapshotState(StateSnapshotContext context) throws Exception { // Needless to do anything, since AbstractStreamOperator#snapshotState and #processElement // is guaranteed not to be mixed together. } + + private static TimestampData castToTimestamp(Object object, String timezone) { + if (object == null) { + return null; + } + if (object instanceof LocalZonedTimestampData) { + return TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant( + ((LocalZonedTimestampData) object).toInstant(), ZoneId.of(timezone))); + } else if (object instanceof ZonedTimestampData) { + return TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant( + ((ZonedTimestampData) object).toInstant(), ZoneId.of(timezone))); + } else { + return TimestampData.fromLocalDateTime(LocalDateTime.parse(String.valueOf(object))); + } + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java index 7cd35a20d3..367f655977 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java @@ -47,8 +47,9 @@ public SchemaOperatorFactory( MetadataApplier metadataApplier, List routingRules, Duration rpcTimeOut, - SchemaChangeBehavior schemaChangeBehavior) { - super(new SchemaOperator(routingRules, rpcTimeOut, schemaChangeBehavior)); + SchemaChangeBehavior schemaChangeBehavior, + String timezone) { + super(new SchemaOperator(routingRules, rpcTimeOut, schemaChangeBehavior, timezone)); this.metadataApplier = metadataApplier; this.routingRules = routingRules; this.schemaChangeBehavior = schemaChangeBehavior; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java index 9883f3d5f5..98e348ca12 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java @@ -31,9 +31,8 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.types.DataType; -import org.apache.flink.cdc.common.types.DataTypeFamily; -import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.utils.ChangeEventUtils; +import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.runtime.serializer.TableIdSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; @@ -254,7 +253,9 @@ private List handleAlterColumnTypeEvent( // Check type compatibility DataType widerType = getWiderType( - existedColumnInDerivedTable.getType(), dataType); + columnName, + existedColumnInDerivedTable.getType(), + dataType); if (!widerType.equals(existedColumnInDerivedTable.getType())) { typeDifference.put( existedColumnInDerivedTable.getName(), widerType); @@ -289,6 +290,7 @@ private List handleAddColumnEvent( .equals(addedColumn.getAddColumn().getType())) { DataType widerType = getWiderType( + existedColumnInDerivedTable.getName(), existedColumnInDerivedTable.getType(), addedColumn.getAddColumn().getType()); if (!widerType.equals(existedColumnInDerivedTable.getType())) { @@ -325,7 +327,10 @@ private List handleCreateTableEvent( Column existedColumnInDerivedTable = optionalColumnInDerivedTable.get(); if (!existedColumnInDerivedTable.getType().equals(column.getType())) { DataType widerType = - getWiderType(existedColumnInDerivedTable.getType(), column.getType()); + getWiderType( + existedColumnInDerivedTable.getName(), + existedColumnInDerivedTable.getType(), + column.getType()); if (!widerType.equals(existedColumnInDerivedTable.getType())) { newTypeMapping.put(existedColumnInDerivedTable.getName(), widerType); } @@ -343,23 +348,14 @@ private List handleCreateTableEvent( return schemaChangeEvents; } - private DataType getWiderType(DataType thisType, DataType thatType) { - if (thisType.equals(thatType)) { - return thisType; - } - if (thisType.is(DataTypeFamily.INTEGER_NUMERIC) - && thatType.is(DataTypeFamily.INTEGER_NUMERIC)) { - return DataTypes.BIGINT(); - } - if (thisType.is(DataTypeFamily.CHARACTER_STRING) - && thatType.is(DataTypeFamily.CHARACTER_STRING)) { - return DataTypes.STRING(); - } - if (thisType.is(DataTypeFamily.APPROXIMATE_NUMERIC) - && thatType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { - return DataTypes.DOUBLE(); + private DataType getWiderType(String columnName, DataType thisType, DataType thatType) { + try { + return SchemaUtils.inferWiderType(thisType, thatType); + } catch (IllegalStateException e) { + throw new IllegalStateException( + String.format( + "Incompatible types found for column `%s': \"%s\" and \"%s\"", + columnName, thisType, thatType)); } - throw new IllegalStateException( - String.format("Incompatible types: \"%s\" and \"%s\"", thisType, thatType)); } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java index 9a2d1cfb4f..8620a730f6 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivationTest.java @@ -381,7 +381,7 @@ void testIncompatibleTypes() { schemaDerivation.applySchemaChange( new CreateTableEvent(TABLE_2, INCOMPATIBLE_SCHEMA))) .isInstanceOf(IllegalStateException.class) - .hasMessage("Incompatible types: \"INT\" and \"STRING\""); + .hasMessage("Incompatible types found for column `age': \"INT\" and \"STRING\""); } @Test