From bd7aca03d6be8ec6ce1675285562aa7ff92b8e98 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 10 Oct 2024 13:51:13 +0800 Subject: [PATCH] [FLINK-36461] Fix schema evolution failure with un-transformed tables Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../flink/FlinkPipelineTransformITCase.java | 95 +++++++++++++++++++ .../transform/PostTransformOperator.java | 11 ++- .../transform/PreTransformOperator.java | 20 +++- 3 files changed, 122 insertions(+), 4 deletions(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 4c2536011f..6332034cc2 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -688,6 +688,101 @@ void testPostAsteriskWithSchemaEvolution() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[15 -> Oops, 12th, 15, Oops], after=[], op=DELETE, meta=()}"); } + @Test + void testTransformUnmatchedSchemaEvolution() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1"); + List events = generateSchemaEvolutionEvents(tableId); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList( + new TransformDef( + "foo.bar.baz", // This doesn't match given tableId + "*", + null, + null, + null, + null, + null)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + assertThat(outputEvents) + .containsExactly( + // Initial stage + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22], after=[], op=DELETE, meta=()}", + + // Add column stage + "AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", + + // Alter column type stage + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}", + + // Rename column stage + "RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}", + + // Drop column stage + "DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops], after=[], op=DELETE, meta=()}"); + } + private List generateSchemaEvolutionEvents(TableId tableId) { List events = new ArrayList<>(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index 83dd07c53e..8a607ffb56 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -44,6 +44,7 @@ import java.lang.reflect.InvocationTargetException; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -242,6 +243,8 @@ public void processElement(StreamRecord element) throws Exception { private Optional cacheSchema(SchemaChangeEvent event) throws Exception { TableId tableId = event.tableId(); + List columnNamesBeforeChange = Collections.emptyList(); + if (event instanceof CreateTableEvent) { CreateTableEvent createTableEvent = (CreateTableEvent) event; Set projectedColumnsSet = @@ -286,6 +289,9 @@ private Optional cacheSchema(SchemaChangeEvent event) throws createTableEvent.getSchema().getColumnNames().stream() .filter(projectedColumnsSet::contains) .collect(Collectors.toList())); + } else { + columnNamesBeforeChange = + getPostTransformChangeInfo(tableId).getPreTransformedSchema().getColumnNames(); } Schema schema; @@ -304,9 +310,12 @@ private Optional cacheSchema(SchemaChangeEvent event) throws if (event instanceof CreateTableEvent) { return Optional.of(new CreateTableEvent(tableId, projectedSchema)); + } else if (hasAsteriskMap.getOrDefault(tableId, true)) { + // See comments in PreTransformOperator#cacheChangeSchema method. + return SchemaUtils.transformSchemaChangeEvent(true, columnNamesBeforeChange, event); } else { return SchemaUtils.transformSchemaChangeEvent( - hasAsteriskMap.get(tableId), projectedColumnsMap.get(tableId), event); + false, projectedColumnsMap.get(tableId), event); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index 3b050f993c..9f80c89b22 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -265,12 +265,26 @@ private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) { private Optional cacheChangeSchema(SchemaChangeEvent event) { TableId tableId = event.tableId(); PreTransformChangeInfo tableChangeInfo = preTransformChangeInfoMap.get(tableId); + List columnNamesBeforeChange = tableChangeInfo.getSourceSchema().getColumnNames(); + Schema originalSchema = SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event); Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema(); - Optional schemaChangeEvent = - SchemaUtils.transformSchemaChangeEvent( - hasAsteriskMap.get(tableId), referencedColumnsMap.get(tableId), event); + + Optional schemaChangeEvent; + if (hasAsteriskMap.getOrDefault(tableId, true)) { + // If this TableId is asterisk-ful, we should use the latest upstream schema as + // referenced columns to perform schema evolution, not of the original ones generated + // when creating tables. If hasAsteriskMap has no entry for this TableId, it means that + // this TableId has not been captured by any transform rules, and should be regarded as + // asterisk-ful by default. + schemaChangeEvent = + SchemaUtils.transformSchemaChangeEvent(true, columnNamesBeforeChange, event); + } else { + schemaChangeEvent = + SchemaUtils.transformSchemaChangeEvent( + false, referencedColumnsMap.get(tableId), event); + } if (schemaChangeEvent.isPresent()) { preTransformedSchema = SchemaUtils.applySchemaChangeEvent(