Skip to content

Commit

Permalink
[hotfix][pipeline-connector/mysql] Fix primary key restraints missing…
Browse files Browse the repository at this point in the history
… when using inline `PRIMARY KEY` declaration syntax

This closes  #3579.
  • Loading branch information
yuxiqian authored Aug 28, 2024
1 parent a876af2 commit 0e9a176
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId());
if (parser.databaseTables().forTable(tableId) == null) {
tableEditor = parser.databaseTables().editOrCreateTable(tableId);
super.enterColumnCreateTable(ctx);
}
super.enterColumnCreateTable(ctx);
}

@Override
Expand Down Expand Up @@ -113,14 +113,17 @@ public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
.collect(Collectors.toList()));
parser.databaseTables().overwriteTable(tableEditor.create());
parser.signalCreateTable(tableEditor.tableId(), ctx);

Schema.Builder builder = Schema.newBuilder();
tableEditor.columns().forEach(column -> builder.column(toCdcColumn(column)));
if (tableEditor.hasPrimaryKey()) {
builder.primaryKey(tableEditor.primaryKeyColumnNames());
}
changes.add(
new CreateTableEvent(
toCdcTableId(tableEditor.tableId()), builder.build()));
},
tableEditor);
Schema.Builder builder = Schema.newBuilder();
tableEditor.columns().forEach(column -> builder.column(toCdcColumn(column)));
if (tableEditor.hasPrimaryKey()) {
builder.primaryKey(tableEditor.primaryKeyColumnNames());
}
changes.add(new CreateTableEvent(toCdcTableId(tableEditor.tableId()), builder.build()));
super.exitColumnCreateTable(ctx);
}

Expand All @@ -133,7 +136,7 @@ public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
if (columnDefinitionListener == null) {
columnDefinitionListener =
new CustomColumnDefinitionParserListener(
columnEditor, parser, listeners);
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
} else {
columnDefinitionListener.setColumnEditor(columnEditor);
Expand Down Expand Up @@ -194,7 +197,8 @@ public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
String columnName = parser.parseName(ctx.uid(0));
ColumnEditor columnEditor = Column.editor().name(columnName);
columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.exitAlterByAddColumn(ctx);
}
Expand Down Expand Up @@ -246,7 +250,8 @@ public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) {
columnEditors.add(Column.editor().name(columnName));
}
columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditors.get(0), parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditors.get(0), parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByAddColumns(ctx);
}
Expand Down Expand Up @@ -296,7 +301,8 @@ public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
columnEditor.unsetDefaultValueExpression();

columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByChangeColumn(ctx);
}
Expand Down Expand Up @@ -335,7 +341,8 @@ public void enterAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx)
String oldColumnName = parser.parseName(ctx.oldColumn);
ColumnEditor columnEditor = Column.editor().name(oldColumnName);
columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByRenameColumn(ctx);
}
Expand All @@ -347,7 +354,8 @@ public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
columnEditor.unsetDefaultValueExpression();

columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByModifyColumn(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableEditor;
import io.debezium.relational.ddl.DataType;
import io.debezium.util.Strings;
import org.antlr.v4.runtime.tree.ParseTreeListener;
Expand All @@ -50,13 +51,16 @@ public class CustomColumnDefinitionParserListener extends MySqlParserBaseListene
private boolean uniqueColumn;
private AtomicReference<Boolean> optionalColumn = new AtomicReference<>();
private DefaultValueParserListener defaultValueListener;
private final TableEditor tableEditor;

private final List<ParseTreeListener> listeners;

public CustomColumnDefinitionParserListener(
TableEditor tableEditor,
ColumnEditor columnEditor,
MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners) {
this.tableEditor = tableEditor;
this.columnEditor = columnEditor;
this.parser = parser;
this.dataTypeResolver = parser.dataTypeResolver();
Expand Down Expand Up @@ -106,6 +110,8 @@ public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstrai
// this rule will be parsed only if no primary key is set in a table
// otherwise the statement can't be executed due to multiple primary key error
optionalColumn.set(Boolean.FALSE);
tableEditor.addColumn(columnEditor.create());
tableEditor.setPrimaryKeyNames(columnEditor.name());
super.enterPrimaryKeyColumnConstraint(ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,40 @@ public void testSchemaChangeEvents() throws Exception {
.physicalColumn("timestamp6_c", DataTypes.TIMESTAMP_LTZ(6))
.primaryKey("id")
.build()));

// Test create table DDL with inline primary key
statement.execute(
String.format(
"CREATE TABLE `%s`.`newlyAddedTable2`(id SERIAL PRIMARY KEY);",
inventoryDatabase.getDatabaseName()));
expected.add(
new CreateTableEvent(
TableId.tableId(
inventoryDatabase.getDatabaseName(), "newlyAddedTable2"),
Schema.newBuilder()
.physicalColumn("id", DataTypes.DECIMAL(20, 0).notNull())
.primaryKey("id")
.build()));

// Test create table DDL with multiple primary keys
statement.execute(
String.format(
"CREATE TABLE `%s`.`newlyAddedTable3`("
+ "id SERIAL,"
+ "name VARCHAR(17),"
+ "notes TEXT,"
+ "PRIMARY KEY (id, name));",
inventoryDatabase.getDatabaseName()));
expected.add(
new CreateTableEvent(
TableId.tableId(
inventoryDatabase.getDatabaseName(), "newlyAddedTable3"),
Schema.newBuilder()
.physicalColumn("id", DataTypes.DECIMAL(20, 0).notNull())
.physicalColumn("name", DataTypes.VARCHAR(17).notNull())
.physicalColumn("notes", DataTypes.STRING())
.primaryKey("id", "name")
.build()));
}
List<Event> actual = fetchResults(events, expected.size());
assertEqualsInAnyOrder(
Expand Down

0 comments on commit 0e9a176

Please sign in to comment.