Skip to content

Commit

Permalink
[FLINK-36184][transform] Fix transform operator swallows schema chang…
Browse files Browse the repository at this point in the history
…es from tables not present in transform rules (#3589)
  • Loading branch information
yuxiqian authored Aug 30, 2024
1 parent cb1b232 commit 06fc939
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,102 @@ public void testUnexpectedBehavior() {
() -> submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar));
}

@Test
public void testByDefaultTransform() throws Exception {
String dbName = schemaEvolveDatabase.getDatabaseName();

// We put a dummy transform block that matches nothing
// to ensure TransformOperator exists, so we could verify if TransformOperator could
// correctly handle such "bypass" tables with schema changes.
String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.members\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "sink:\n"
+ " type: values\n"
+ "transform:\n"
+ " - source-table: another.irrelevant\n"
+ " projection: \"'irrelevant' AS tag\"\n"
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: evolve\n"
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
dbName,
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
validateSnapshotData(dbName, "members");

LOG.info("Starting schema evolution");
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s", MYSQL.getHost(), MYSQL.getDatabasePort(), dbName);

try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stmt = conn.createStatement()) {

waitForIncrementalStage(dbName, "members", stmt);

// triggers AddColumnEvent
stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;");
stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);");

// triggers AlterColumnTypeEvent and RenameColumnEvent
stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;");

// triggers RenameColumnEvent
stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;");

// triggers DropColumnEvent
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");

// triggers TruncateTableEvent
stmt.execute("TRUNCATE TABLE members;");
stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");

// triggers DropTableEvent
stmt.execute("DROP TABLE members;");
}

List<String> expectedTaskManagerEvents =
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={age=precise_age}}",
"RenameColumnEvent{tableId=%s.members, nameMapping={gender=biological_sex}}",
"DropColumnEvent{tableId=%s.members, droppedColumnNames=[biological_sex]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, 16.0], op=INSERT, meta=()}",
"TruncateTableEvent{tableId=%s.members}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, 17.0], op=INSERT, meta=()}",
"DropTableEvent{tableId=%s.members}");

List<String> expectedTmEvents =
expectedTaskManagerEvents.stream()
.map(s -> String.format(s, dbName, dbName))
.collect(Collectors.toList());

validateResult(expectedTmEvents, taskManagerConsumer);
}

private void testGenericSchemaEvolution(
String behavior,
boolean mergeTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,17 +258,29 @@ private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws
.stream())
.map(ProjectionColumn::getColumnName)
.collect(Collectors.toSet());
boolean hasAsterisk =
transforms.stream()
.filter(t -> t.getSelectors().isMatch(tableId))
.anyMatch(
t ->
TransformParser.hasAsterisk(
t.getProjection()
.map(TransformProjection::getProjection)
.orElse(null)));

hasAsteriskMap.put(tableId, hasAsterisk);
boolean notTransformed =
transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));

if (notTransformed) {
// If this TableId isn't presented in any transform block, it should behave like a
// "*" projection and should be regarded as asterisk-ful.
hasAsteriskMap.put(tableId, true);
} else {
boolean hasAsterisk =
transforms.stream()
.filter(t -> t.getSelectors().isMatch(tableId))
.anyMatch(
t ->
TransformParser.hasAsterisk(
t.getProjection()
.map(
TransformProjection
::getProjection)
.orElse(null)));

hasAsteriskMap.put(tableId, hasAsterisk);
}
projectedColumnsMap.put(
tableId,
createTableEvent.getSchema().getColumnNames().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,17 +300,26 @@ private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
.map(Column::getName)
.collect(Collectors.toSet());

boolean hasAsterisk =
transforms.stream()
.filter(t -> t.getSelectors().isMatch(tableId))
.anyMatch(
t ->
TransformParser.hasAsterisk(
t.getProjection()
.map(TransformProjection::getProjection)
.orElse(null)));
boolean notTransformed =
transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));

hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
if (notTransformed) {
// If this TableId isn't presented in any transform block, it should behave like a "*"
// projection and should be regarded as asterisk-ful.
hasAsteriskMap.put(tableId, true);
} else {
boolean hasAsterisk =
transforms.stream()
.filter(t -> t.getSelectors().isMatch(tableId))
.anyMatch(
t ->
TransformParser.hasAsterisk(
t.getProjection()
.map(TransformProjection::getProjection)
.orElse(null)));

hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
}
referencedColumnsMap.put(
createTableEvent.tableId(),
createTableEvent.getSchema().getColumnNames().stream()
Expand Down

0 comments on commit 06fc939

Please sign in to comment.