diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 3b30b3c494..da71c413c1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -34,6 +34,7 @@ import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; import io.debezium.relational.Column; import io.debezium.relational.ColumnEditor; +import io.debezium.relational.Table; import io.debezium.relational.TableEditor; import io.debezium.relational.TableId; import org.antlr.v4.runtime.tree.ParseTreeListener; @@ -76,6 +77,29 @@ public CustomAlterTableParserListener( this.changes = changes; } + @Override + public void exitCopyCreateTable(MySqlParser.CopyCreateTableContext ctx) { + TableId tableId = parser.parseQualifiedTableId(ctx.tableName(0).fullId()); + TableId originalTableId = parser.parseQualifiedTableId(ctx.tableName(1).fullId()); + Table original = parser.databaseTables().forTable(originalTableId); + if (original != null) { + parser.databaseTables() + .overwriteTable( + tableId, + original.columns(), + original.primaryKeyColumnNames(), + original.defaultCharsetName()); + parser.signalCreateTable(tableId, ctx); + Schema.Builder builder = Schema.newBuilder(); + original.columns().forEach(column -> builder.column(toCdcColumn(column))); + if (!original.primaryKeyColumnNames().isEmpty()) { + builder.primaryKey(original.primaryKeyColumnNames()); + } + changes.add(new CreateTableEvent(toCdcTableId(tableId), builder.build())); + } + super.exitCopyCreateTable(ctx); + } + @Override public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) { TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId()); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index f2b4ccb82f..ae2b292d68 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -684,6 +684,23 @@ public void testSchemaChangeEvents() throws Exception { .physicalColumn("notes", DataTypes.STRING()) .primaryKey("id", "name") .build())); + + // Test create table DDL with like syntax + statement.execute( + String.format( + "CREATE TABLE `%s`.`newlyAddedTable4` like `%s`.`newlyAddedTable3`", + inventoryDatabase.getDatabaseName(), + inventoryDatabase.getDatabaseName())); + expected.add( + new CreateTableEvent( + TableId.tableId( + inventoryDatabase.getDatabaseName(), "newlyAddedTable4"), + Schema.newBuilder() + .physicalColumn("id", DataTypes.DECIMAL(20, 0).notNull()) + .physicalColumn("name", DataTypes.VARCHAR(17).notNull()) + .physicalColumn("notes", DataTypes.STRING()) + .primaryKey("id", "name") + .build())); } List actual = fetchResults(events, expected.size()); assertEqualsInAnyOrder(