diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 7551add086..43ff5305bd 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -255,6 +255,102 @@ public void testUnexpectedBehavior() { () -> submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar)); } + @Test + public void testByDefaultTransform() throws Exception { + String dbName = schemaEvolveDatabase.getDatabaseName(); + + // We put a dummy transform block that matches nothing + // to ensure TransformOperator exists, so we could verify if TransformOperator could + // correctly handle such "bypass" tables with schema changes. + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.members\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "transform:\n" + + " - source-table: another.irrelevant\n" + + " projection: \"'irrelevant' AS tag\"\n" + + "\n" + + "pipeline:\n" + + " schema.change.behavior: evolve\n" + + " parallelism: %d", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + dbName, + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + validateSnapshotData(dbName, "members"); + + LOG.info("Starting schema evolution"); + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", MYSQL.getHost(), MYSQL.getDatabasePort(), dbName); + + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stmt = conn.createStatement()) { + + waitForIncrementalStage(dbName, "members", stmt); + + // triggers AddColumnEvent + stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;"); + stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);"); + + // triggers AlterColumnTypeEvent and RenameColumnEvent + stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;"); + + // triggers RenameColumnEvent + stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;"); + + // triggers DropColumnEvent + stmt.execute("ALTER TABLE members DROP COLUMN biological_sex"); + stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);"); + + // triggers TruncateTableEvent + stmt.execute("TRUNCATE TABLE members;"); + stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);"); + + // triggers DropTableEvent + stmt.execute("DROP TABLE members;"); + } + + List expectedTaskManagerEvents = + Arrays.asList( + "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", + "RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}", + "RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}", + "DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0], op=INSERT, meta=()}", + "TruncateTableEvent{tableId=%s.members}", + "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, 17.0], op=INSERT, meta=()}", + "DropTableEvent{tableId=%s.members}"); + + List expectedTmEvents = + expectedTaskManagerEvents.stream() + .map(s -> String.format(s, dbName, dbName)) + .collect(Collectors.toList()); + + validateResult(expectedTmEvents, taskManagerConsumer); + } + private void testGenericSchemaEvolution( String behavior, boolean mergeTable, diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index 30c78d202f..83dd07c53e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -258,17 +258,29 @@ private Optional cacheSchema(SchemaChangeEvent event) throws .stream()) .map(ProjectionColumn::getColumnName) .collect(Collectors.toSet()); - boolean hasAsterisk = - transforms.stream() - .filter(t -> t.getSelectors().isMatch(tableId)) - .anyMatch( - t -> - TransformParser.hasAsterisk( - t.getProjection() - .map(TransformProjection::getProjection) - .orElse(null))); - hasAsteriskMap.put(tableId, hasAsterisk); + boolean notTransformed = + transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId)); + + if (notTransformed) { + // If this TableId isn't presented in any transform block, it should behave like a + // "*" projection and should be regarded as asterisk-ful. + hasAsteriskMap.put(tableId, true); + } else { + boolean hasAsterisk = + transforms.stream() + .filter(t -> t.getSelectors().isMatch(tableId)) + .anyMatch( + t -> + TransformParser.hasAsterisk( + t.getProjection() + .map( + TransformProjection + ::getProjection) + .orElse(null))); + + hasAsteriskMap.put(tableId, hasAsterisk); + } projectedColumnsMap.put( tableId, createTableEvent.getSchema().getColumnNames().stream() diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index 5d87980684..3b050f993c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -300,17 +300,26 @@ private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) { .map(Column::getName) .collect(Collectors.toSet()); - boolean hasAsterisk = - transforms.stream() - .filter(t -> t.getSelectors().isMatch(tableId)) - .anyMatch( - t -> - TransformParser.hasAsterisk( - t.getProjection() - .map(TransformProjection::getProjection) - .orElse(null))); + boolean notTransformed = + transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId)); - hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk); + if (notTransformed) { + // If this TableId isn't presented in any transform block, it should behave like a "*" + // projection and should be regarded as asterisk-ful. + hasAsteriskMap.put(tableId, true); + } else { + boolean hasAsterisk = + transforms.stream() + .filter(t -> t.getSelectors().isMatch(tableId)) + .anyMatch( + t -> + TransformParser.hasAsterisk( + t.getProjection() + .map(TransformProjection::getProjection) + .orElse(null))); + + hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk); + } referencedColumnsMap.put( createTableEvent.tableId(), createTableEvent.getSchema().getColumnNames().stream()