Skip to content

Commit

Permalink
[hotfix][mysql] Fix primary key restraints missing when using inline …
Browse files Browse the repository at this point in the history
…`PRIMARY KEY` declaration syntax

This closes #3582
  • Loading branch information
yuxiqian authored Aug 28, 2024
1 parent 1ebefd6 commit 6ef4734
Show file tree
Hide file tree
Showing 3 changed files with 440 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;

import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
Expand All @@ -30,6 +32,7 @@
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import org.slf4j.Logger;
Expand All @@ -41,6 +44,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn;

Expand All @@ -58,6 +62,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
private List<ColumnEditor> columnEditors;

private CustomColumnDefinitionParserListener columnDefinitionListener;
private TableEditor tableEditor;

private int parsingColumnIndex = STARTING_INDEX;

Expand All @@ -70,6 +75,109 @@ public CustomAlterTableParserListener(
this.changes = changes;
}

@Override
public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId());
if (parser.databaseTables().forTable(tableId) == null) {
tableEditor = parser.databaseTables().editOrCreateTable(tableId);
}
super.enterColumnCreateTable(ctx);
}

@Override
public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
parser.runIfNotNull(
() -> {
// Make sure that the table's character set has been set ...
if (!tableEditor.hasDefaultCharsetName()) {
tableEditor.setDefaultCharsetName(
parser.charsetForTable(tableEditor.tableId()));
}
listeners.remove(columnDefinitionListener);
columnDefinitionListener = null;
// remove column definition parser listener
final String defaultCharsetName = tableEditor.create().defaultCharsetName();
tableEditor.setColumns(
tableEditor.columns().stream()
.map(
column -> {
final ColumnEditor columnEditor = column.edit();
if (columnEditor.charsetNameOfTable() == null) {
columnEditor.charsetNameOfTable(
defaultCharsetName);
}
return columnEditor;
})
.map(ColumnEditor::create)
.collect(Collectors.toList()));
parser.databaseTables().overwriteTable(tableEditor.create());
parser.signalCreateTable(tableEditor.tableId(), ctx);

Schema.Builder builder = Schema.newBuilder();
tableEditor.columns().forEach(column -> builder.column(toCdcColumn(column)));
if (tableEditor.hasPrimaryKey()) {
builder.primaryKey(tableEditor.primaryKeyColumnNames());
}
changes.add(
new CreateTableEvent(
toCdcTableId(tableEditor.tableId()), builder.build()));
},
tableEditor);
super.exitColumnCreateTable(ctx);
}

@Override
public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
parser.runIfNotNull(
() -> {
String columnName = parser.parseName(ctx.uid());
ColumnEditor columnEditor = Column.editor().name(columnName);
if (columnDefinitionListener == null) {
columnDefinitionListener =
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
} else {
columnDefinitionListener.setColumnEditor(columnEditor);
}
},
tableEditor);
super.enterColumnDeclaration(ctx);
}

@Override
public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
parser.runIfNotNull(
() -> {
tableEditor.addColumn(columnDefinitionListener.getColumn());
},
tableEditor,
columnDefinitionListener);
super.exitColumnDeclaration(ctx);
}

@Override
public void enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext ctx) {
parser.runIfNotNull(
() -> {
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
},
tableEditor);
super.enterPrimaryKeyTableConstraint(ctx);
}

@Override
public void enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) {
parser.runIfNotNull(
() -> {
if (!tableEditor.hasPrimaryKey()) {
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
}
},
tableEditor);
super.enterUniqueKeyTableConstraint(ctx);
}

@Override
public void enterAlterTable(MySqlParser.AlterTableContext ctx) {
this.currentTable = toCdcTableId(parser.parseQualifiedTableId(ctx.tableName().fullId()));
Expand All @@ -88,7 +196,8 @@ public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
String columnName = parser.parseName(ctx.uid(0));
ColumnEditor columnEditor = Column.editor().name(columnName);
columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.exitAlterByAddColumn(ctx);
}
Expand Down Expand Up @@ -140,7 +249,8 @@ public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) {
columnEditors.add(Column.editor().name(columnName));
}
columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditors.get(0), parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditors.get(0), parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByAddColumns(ctx);
}
Expand Down Expand Up @@ -190,7 +300,8 @@ public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
columnEditor.unsetDefaultValueExpression();

columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByChangeColumn(ctx);
}
Expand Down Expand Up @@ -229,7 +340,8 @@ public void enterAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx)
String oldColumnName = parser.parseName(ctx.oldColumn);
ColumnEditor columnEditor = Column.editor().name(oldColumnName);
columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByRenameColumn(ctx);
}
Expand All @@ -241,7 +353,8 @@ public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
columnEditor.unsetDefaultValueExpression();

columnDefinitionListener =
new CustomColumnDefinitionParserListener(columnEditor, parser, listeners);
new CustomColumnDefinitionParserListener(
tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByModifyColumn(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.TableEditor;
import io.debezium.relational.ddl.DataType;
import io.debezium.util.Strings;
import org.antlr.v4.runtime.tree.ParseTreeListener;
Expand All @@ -50,13 +51,16 @@ public class CustomColumnDefinitionParserListener extends MySqlParserBaseListene
private boolean uniqueColumn;
private AtomicReference<Boolean> optionalColumn = new AtomicReference<>();
private DefaultValueParserListener defaultValueListener;
private final TableEditor tableEditor;

private final List<ParseTreeListener> listeners;

public CustomColumnDefinitionParserListener(
TableEditor tableEditor,
ColumnEditor columnEditor,
MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners) {
this.tableEditor = tableEditor;
this.columnEditor = columnEditor;
this.parser = parser;
this.dataTypeResolver = parser.dataTypeResolver();
Expand Down Expand Up @@ -106,6 +110,8 @@ public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstrai
// this rule will be parsed only if no primary key is set in a table
// otherwise the statement can't be executed due to multiple primary key error
optionalColumn.set(Boolean.FALSE);
tableEditor.addColumn(columnEditor.create());
tableEditor.setPrimaryKeyNames(columnEditor.name());
super.enterPrimaryKeyColumnConstraint(ctx);
}

Expand Down
Loading

0 comments on commit 6ef4734

Please sign in to comment.