Skip to content

Commit

Permalink
add lenient + route schema evolution tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Aug 29, 2024
1 parent 85169e6 commit 2aa81e0
Showing 1 changed file with 92 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,98 @@ public void testFineGrainedSchemaEvolution() throws Exception {
"Ignored schema change DropTableEvent{tableId=%s.members} to table %s.members."));
}

@Test
public void testLenientWithRoute() throws Exception {
String dbName = schemaEvolveDatabase.getDatabaseName();

String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.members\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "\n"
+ "route:\n"
+ " - source-table: %s.members\n"
+ " sink-table: %s.redirect\n"
+ "sink:\n"
+ " type: values\n"
+ "\n"
+ "pipeline:\n"
+ " schema.change.behavior: lenient\n"
+ " parallelism: %d",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
dbName,
dbName,
dbName,
parallelism);
Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar");
Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");
validateSnapshotData(dbName, "redirect");

LOG.info("Starting schema evolution");
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s", MYSQL.getHost(), MYSQL.getDatabasePort(), dbName);

try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stmt = conn.createStatement()) {

waitForIncrementalStage(dbName, "redirect", stmt);

// triggers AddColumnEvent
stmt.execute("ALTER TABLE members ADD COLUMN gender TINYINT AFTER age;");
stmt.execute("INSERT INTO members VALUES (1012, 'Eve', 17, 0);");

// triggers AlterColumnTypeEvent and RenameColumnEvent
stmt.execute("ALTER TABLE members CHANGE COLUMN age precise_age DOUBLE;");

// triggers RenameColumnEvent
stmt.execute("ALTER TABLE members RENAME COLUMN gender TO biological_sex;");

// triggers DropColumnEvent
stmt.execute("ALTER TABLE members DROP COLUMN biological_sex");
stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);");

// triggers TruncateTableEvent
stmt.execute("TRUNCATE TABLE members;");
stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);");

// triggers DropTableEvent
stmt.execute("DROP TABLE members;");
}

List<String> expectedTaskManagerEvents =
Arrays.asList(
"AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.redirect, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.redirect, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}",
"AddColumnEvent{tableId=%s.redirect, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.redirect, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.redirect, before=[], after=[1014, Gem, null, null, 17.0, null], op=INSERT, meta=()}");

List<String> expectedTmEvents =
expectedTaskManagerEvents.stream()
.map(s -> String.format(s, dbName, dbName))
.collect(Collectors.toList());

validateResult(expectedTmEvents, taskManagerConsumer);
}

@Test
public void testUnexpectedBehavior() {
String pipelineJob =
Expand Down

0 comments on commit 2aa81e0

Please sign in to comment.