Skip to content

Commit

Permalink
[FLINK-36178][pipeline-connector/mysql] Parse `CREATE TABLE ... LIKE …
Browse files Browse the repository at this point in the history
…...` syntax and create new SchemaChangeEvent for this SQL.
  • Loading branch information
lvyanquan committed Aug 29, 2024
1 parent cb1b232 commit ad50899
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import org.antlr.v4.runtime.tree.ParseTreeListener;
Expand Down Expand Up @@ -76,6 +77,29 @@ public CustomAlterTableParserListener(
this.changes = changes;
}

@Override
public void exitCopyCreateTable(MySqlParser.CopyCreateTableContext ctx) {
TableId tableId = parser.parseQualifiedTableId(ctx.tableName(0).fullId());
TableId originalTableId = parser.parseQualifiedTableId(ctx.tableName(1).fullId());
Table original = parser.databaseTables().forTable(originalTableId);
if (original != null) {
parser.databaseTables()
.overwriteTable(
tableId,
original.columns(),
original.primaryKeyColumnNames(),
original.defaultCharsetName());
parser.signalCreateTable(tableId, ctx);
Schema.Builder builder = Schema.newBuilder();
original.columns().forEach(column -> builder.column(toCdcColumn(column)));
if (!original.primaryKeyColumnNames().isEmpty()) {
builder.primaryKey(original.primaryKeyColumnNames());
}
changes.add(new CreateTableEvent(toCdcTableId(tableId), builder.build()));
}
super.exitCopyCreateTable(ctx);
}

@Override
public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,23 @@ public void testSchemaChangeEvents() throws Exception {
.physicalColumn("notes", DataTypes.STRING())
.primaryKey("id", "name")
.build()));

// Test create table DDL with like syntax
statement.execute(
String.format(
"CREATE TABLE `%s`.`newlyAddedTable4` like `%s`.`newlyAddedTable3`",
inventoryDatabase.getDatabaseName(),
inventoryDatabase.getDatabaseName()));
expected.add(
new CreateTableEvent(
TableId.tableId(
inventoryDatabase.getDatabaseName(), "newlyAddedTable4"),
Schema.newBuilder()
.physicalColumn("id", DataTypes.DECIMAL(20, 0).notNull())
.physicalColumn("name", DataTypes.VARCHAR(17).notNull())
.physicalColumn("notes", DataTypes.STRING())
.primaryKey("id", "name")
.build()));
}
List<Event> actual = fetchResults(events, expected.size());
assertEqualsInAnyOrder(
Expand Down

0 comments on commit ad50899

Please sign in to comment.