diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 79583d83e0..8d86895b26 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -19,9 +19,11 @@ import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; +import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.event.DropColumnEvent; import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataType; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; @@ -30,6 +32,7 @@ import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; import io.debezium.relational.Column; import io.debezium.relational.ColumnEditor; +import io.debezium.relational.TableEditor; import io.debezium.relational.TableId; import org.antlr.v4.runtime.tree.ParseTreeListener; import org.slf4j.Logger; @@ -41,6 +44,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn; @@ -58,6 +62,7 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener { private List columnEditors; private CustomColumnDefinitionParserListener columnDefinitionListener; + private TableEditor tableEditor; private int parsingColumnIndex = STARTING_INDEX; @@ -70,6 +75,109 @@ public CustomAlterTableParserListener( this.changes = changes; } + @Override + public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) { + TableId tableId = parser.parseQualifiedTableId(ctx.tableName().fullId()); + if (parser.databaseTables().forTable(tableId) == null) { + tableEditor = parser.databaseTables().editOrCreateTable(tableId); + } + super.enterColumnCreateTable(ctx); + } + + @Override + public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) { + parser.runIfNotNull( + () -> { + // Make sure that the table's character set has been set ... + if (!tableEditor.hasDefaultCharsetName()) { + tableEditor.setDefaultCharsetName( + parser.charsetForTable(tableEditor.tableId())); + } + listeners.remove(columnDefinitionListener); + columnDefinitionListener = null; + // remove column definition parser listener + final String defaultCharsetName = tableEditor.create().defaultCharsetName(); + tableEditor.setColumns( + tableEditor.columns().stream() + .map( + column -> { + final ColumnEditor columnEditor = column.edit(); + if (columnEditor.charsetNameOfTable() == null) { + columnEditor.charsetNameOfTable( + defaultCharsetName); + } + return columnEditor; + }) + .map(ColumnEditor::create) + .collect(Collectors.toList())); + parser.databaseTables().overwriteTable(tableEditor.create()); + parser.signalCreateTable(tableEditor.tableId(), ctx); + + Schema.Builder builder = Schema.newBuilder(); + tableEditor.columns().forEach(column -> builder.column(toCdcColumn(column))); + if (tableEditor.hasPrimaryKey()) { + builder.primaryKey(tableEditor.primaryKeyColumnNames()); + } + changes.add( + new CreateTableEvent( + toCdcTableId(tableEditor.tableId()), builder.build())); + }, + tableEditor); + super.exitColumnCreateTable(ctx); + } + + @Override + public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) { + parser.runIfNotNull( + () -> { + String columnName = parser.parseName(ctx.uid()); + ColumnEditor columnEditor = Column.editor().name(columnName); + if (columnDefinitionListener == null) { + columnDefinitionListener = + new CustomColumnDefinitionParserListener( + tableEditor, columnEditor, parser, listeners); + listeners.add(columnDefinitionListener); + } else { + columnDefinitionListener.setColumnEditor(columnEditor); + } + }, + tableEditor); + super.enterColumnDeclaration(ctx); + } + + @Override + public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) { + parser.runIfNotNull( + () -> { + tableEditor.addColumn(columnDefinitionListener.getColumn()); + }, + tableEditor, + columnDefinitionListener); + super.exitColumnDeclaration(ctx); + } + + @Override + public void enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext ctx) { + parser.runIfNotNull( + () -> { + parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor); + }, + tableEditor); + super.enterPrimaryKeyTableConstraint(ctx); + } + + @Override + public void enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) { + parser.runIfNotNull( + () -> { + if (!tableEditor.hasPrimaryKey()) { + parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor); + } + }, + tableEditor); + super.enterUniqueKeyTableConstraint(ctx); + } + @Override public void enterAlterTable(MySqlParser.AlterTableContext ctx) { this.currentTable = toCdcTableId(parser.parseQualifiedTableId(ctx.tableName().fullId())); @@ -88,7 +196,8 @@ public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) { String columnName = parser.parseName(ctx.uid(0)); ColumnEditor columnEditor = Column.editor().name(columnName); columnDefinitionListener = - new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + new CustomColumnDefinitionParserListener( + tableEditor, columnEditor, parser, listeners); listeners.add(columnDefinitionListener); super.exitAlterByAddColumn(ctx); } @@ -140,7 +249,8 @@ public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) { columnEditors.add(Column.editor().name(columnName)); } columnDefinitionListener = - new CustomColumnDefinitionParserListener(columnEditors.get(0), parser, listeners); + new CustomColumnDefinitionParserListener( + tableEditor, columnEditors.get(0), parser, listeners); listeners.add(columnDefinitionListener); super.enterAlterByAddColumns(ctx); } @@ -190,7 +300,8 @@ public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) columnEditor.unsetDefaultValueExpression(); columnDefinitionListener = - new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + new CustomColumnDefinitionParserListener( + tableEditor, columnEditor, parser, listeners); listeners.add(columnDefinitionListener); super.enterAlterByChangeColumn(ctx); } @@ -229,7 +340,8 @@ public void enterAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx) String oldColumnName = parser.parseName(ctx.oldColumn); ColumnEditor columnEditor = Column.editor().name(oldColumnName); columnDefinitionListener = - new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + new CustomColumnDefinitionParserListener( + tableEditor, columnEditor, parser, listeners); listeners.add(columnDefinitionListener); super.enterAlterByRenameColumn(ctx); } @@ -241,7 +353,8 @@ public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) columnEditor.unsetDefaultValueExpression(); columnDefinitionListener = - new CustomColumnDefinitionParserListener(columnEditor, parser, listeners); + new CustomColumnDefinitionParserListener( + tableEditor, columnEditor, parser, listeners); listeners.add(columnDefinitionListener); super.enterAlterByModifyColumn(ctx); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java index 5918364186..e886580b31 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java @@ -25,6 +25,7 @@ import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener; import io.debezium.relational.Column; import io.debezium.relational.ColumnEditor; +import io.debezium.relational.TableEditor; import io.debezium.relational.ddl.DataType; import io.debezium.util.Strings; import org.antlr.v4.runtime.tree.ParseTreeListener; @@ -50,13 +51,16 @@ public class CustomColumnDefinitionParserListener extends MySqlParserBaseListene private boolean uniqueColumn; private AtomicReference optionalColumn = new AtomicReference<>(); private DefaultValueParserListener defaultValueListener; + private final TableEditor tableEditor; private final List listeners; public CustomColumnDefinitionParserListener( + TableEditor tableEditor, ColumnEditor columnEditor, MySqlAntlrDdlParser parser, List listeners) { + this.tableEditor = tableEditor; this.columnEditor = columnEditor; this.parser = parser; this.dataTypeResolver = parser.dataTypeResolver(); @@ -106,6 +110,8 @@ public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstrai // this rule will be parsed only if no primary key is set in a table // otherwise the statement can't be executed due to multiple primary key error optionalColumn.set(Boolean.FALSE); + tableEditor.addColumn(columnEditor.create()); + tableEditor.setPrimaryKeyNames(columnEditor.name()); super.enterPrimaryKeyColumnConstraint(ctx); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java index f019e98d8c..8f613e0ef9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -62,6 +62,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED; @@ -372,6 +373,281 @@ public void testParseAlterStatement() throws Exception { assertThat(actual).isEqualTo(expected); } + @Test + public void testSchemaChangeEvents() throws Exception { + env.setParallelism(1); + inventoryDatabase.createAndInitialize(); + MySqlSourceConfigFactory configFactory = + new MySqlSourceConfigFactory() + .hostname(MYSQL8_CONTAINER.getHost()) + .port(MYSQL8_CONTAINER.getDatabasePort()) + .username(TEST_USER) + .password(TEST_PASSWORD) + .databaseList(inventoryDatabase.getDatabaseName()) + .tableList(inventoryDatabase.getDatabaseName() + ".*") + .startupOptions(StartupOptions.latest()) + .serverId(getServerId(env.getParallelism())) + .serverTimeZone("UTC") + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); + + FlinkSourceProvider sourceProvider = + (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); + CloseableIterator events = + env.fromSource( + sourceProvider.getSource(), + WatermarkStrategy.noWatermarks(), + MySqlDataSourceFactory.IDENTIFIER, + new EventTypeInfo()) + .executeAndCollect(); + Thread.sleep(5_000); + + List expected = + new ArrayList<>( + getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName())); + + try (Connection connection = inventoryDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` ADD COLUMN `newcol1` INT NULL;", + inventoryDatabase.getDatabaseName())); + expected.add( + new AddColumnEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonList( + new AddColumnEvent.ColumnWithPosition( + Column.physicalColumn("newcol1", DataTypes.INT()))))); + + // Test MODIFY COLUMN DDL + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` MODIFY COLUMN `newcol1` DOUBLE;", + inventoryDatabase.getDatabaseName())); + + expected.add( + new AlterColumnTypeEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("newcol1", DataTypes.DOUBLE()))); + + // Test CHANGE COLUMN DDL + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` CHANGE COLUMN `newcol1` `newcol2` INT;", + inventoryDatabase.getDatabaseName())); + + expected.add( + new AlterColumnTypeEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("newcol1", DataTypes.INT()))); + + expected.add( + new RenameColumnEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("newcol1", "newcol2"))); + + statement.execute( + String.format( + "ALTER TABLE `%s`.`customers` CHANGE COLUMN `newcol2` `newcol1` DOUBLE;", + inventoryDatabase.getDatabaseName())); + + expected.add( + new AlterColumnTypeEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("newcol2", DataTypes.DOUBLE()))); + + expected.add( + new RenameColumnEvent( + TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"), + Collections.singletonMap("newcol2", "newcol1"))); + + // Test create table DDL + statement.execute( + String.format( + "CREATE TABLE `%s`.`newlyAddedTable1`(" + + " id SERIAL,\n" + + " tiny_c TINYINT,\n" + + " tiny_un_c TINYINT UNSIGNED,\n" + + " tiny_un_z_c TINYINT UNSIGNED ZEROFILL,\n" + + " small_c SMALLINT,\n" + + " small_un_c SMALLINT UNSIGNED,\n" + + " small_un_z_c SMALLINT UNSIGNED ZEROFILL,\n" + + " medium_c MEDIUMINT,\n" + + " medium_un_c MEDIUMINT UNSIGNED,\n" + + " medium_un_z_c MEDIUMINT UNSIGNED ZEROFILL,\n" + + " int_c INTEGER,\n" + + " int_un_c INTEGER UNSIGNED,\n" + + " int_un_z_c INTEGER UNSIGNED ZEROFILL,\n" + + " int11_c INT(11),\n" + + " big_c BIGINT,\n" + + " big_un_c BIGINT UNSIGNED,\n" + + " big_un_z_c BIGINT UNSIGNED ZEROFILL,\n" + + " varchar_c VARCHAR(255),\n" + + " char_c CHAR(3),\n" + + " real_c REAL,\n" + + " float_c FLOAT,\n" + + " float_un_c FLOAT UNSIGNED,\n" + + " float_un_z_c FLOAT UNSIGNED ZEROFILL,\n" + + " double_c DOUBLE,\n" + + " double_un_c DOUBLE UNSIGNED,\n" + + " double_un_z_c DOUBLE UNSIGNED ZEROFILL,\n" + + " decimal_c DECIMAL(8, 4),\n" + + " decimal_un_c DECIMAL(8, 4) UNSIGNED,\n" + + " decimal_un_z_c DECIMAL(8, 4) UNSIGNED ZEROFILL,\n" + + " numeric_c NUMERIC(6, 0),\n" + + " big_decimal_c DECIMAL(65, 1),\n" + + " bit1_c BIT,\n" + + " bit3_c BIT(3),\n" + + " tiny1_c TINYINT(1),\n" + + " boolean_c BOOLEAN,\n" + + " file_uuid BINARY(16),\n" + + " bit_c BIT(64),\n" + + " text_c TEXT,\n" + + " tiny_blob_c TINYBLOB,\n" + + " blob_c BLOB,\n" + + " medium_blob_c MEDIUMBLOB,\n" + + " long_blob_c LONGBLOB,\n" + + " enum_c enum('red', 'white'),\n" + + " json_c JSON,\n" + + " point_c POINT,\n" + + " geometry_c GEOMETRY,\n" + + " linestring_c LINESTRING,\n" + + " polygon_c POLYGON,\n" + + " multipoint_c MULTIPOINT,\n" + + " multiline_c MULTILINESTRING,\n" + + " multipolygon_c MULTIPOLYGON,\n" + + " geometrycollection_c GEOMETRYCOLLECTION," + + " year_c YEAR,\n" + + " date_c DATE,\n" + + " time_c TIME(0),\n" + + " time_3_c TIME(3),\n" + + " time_6_c TIME(6),\n" + + " datetime_c DATETIME(0),\n" + + " datetime3_c DATETIME(3),\n" + + " datetime6_c DATETIME(6),\n" + + " decimal_c0 DECIMAL(6, 2),\n" + + " decimal_c1 DECIMAL(9, 4),\n" + + " decimal_c2 DECIMAL(20, 4),\n" + + " timestamp_c TIMESTAMP(0) NULL,\n" + + " timestamp3_c TIMESTAMP(3) NULL,\n" + + " timestamp6_c TIMESTAMP(6) NULL," + + "primary key(id));", + inventoryDatabase.getDatabaseName())); + + expected.add( + new CreateTableEvent( + TableId.tableId( + inventoryDatabase.getDatabaseName(), "newlyAddedTable1"), + Schema.newBuilder() + .physicalColumn("id", DataTypes.DECIMAL(20, 0).notNull()) + .physicalColumn("tiny_c", DataTypes.TINYINT()) + .physicalColumn("tiny_un_c", DataTypes.SMALLINT()) + .physicalColumn("tiny_un_z_c", DataTypes.SMALLINT()) + .physicalColumn("small_c", DataTypes.SMALLINT()) + .physicalColumn("small_un_c", DataTypes.INT()) + .physicalColumn("small_un_z_c", DataTypes.INT()) + .physicalColumn("medium_c", DataTypes.INT()) + .physicalColumn("medium_un_c", DataTypes.INT()) + .physicalColumn("medium_un_z_c", DataTypes.INT()) + .physicalColumn("int_c", DataTypes.INT()) + .physicalColumn("int_un_c", DataTypes.BIGINT()) + .physicalColumn("int_un_z_c", DataTypes.BIGINT()) + .physicalColumn("int11_c", DataTypes.INT()) + .physicalColumn("big_c", DataTypes.BIGINT()) + .physicalColumn("big_un_c", DataTypes.DECIMAL(20, 0)) + .physicalColumn("big_un_z_c", DataTypes.DECIMAL(20, 0)) + .physicalColumn("varchar_c", DataTypes.VARCHAR(255)) + .physicalColumn("char_c", DataTypes.CHAR(3)) + .physicalColumn("real_c", DataTypes.DOUBLE()) + .physicalColumn("float_c", DataTypes.FLOAT()) + .physicalColumn("float_un_c", DataTypes.FLOAT()) + .physicalColumn("float_un_z_c", DataTypes.FLOAT()) + .physicalColumn("double_c", DataTypes.DOUBLE()) + .physicalColumn("double_un_c", DataTypes.DOUBLE()) + .physicalColumn("double_un_z_c", DataTypes.DOUBLE()) + .physicalColumn("decimal_c", DataTypes.DECIMAL(8, 4)) + .physicalColumn("decimal_un_c", DataTypes.DECIMAL(8, 4)) + .physicalColumn("decimal_un_z_c", DataTypes.DECIMAL(8, 4)) + .physicalColumn("numeric_c", DataTypes.DECIMAL(6, 0)) + .physicalColumn("big_decimal_c", DataTypes.STRING()) + .physicalColumn("bit1_c", DataTypes.BOOLEAN()) + .physicalColumn("bit3_c", DataTypes.BINARY(1)) + .physicalColumn("tiny1_c", DataTypes.BOOLEAN()) + .physicalColumn("boolean_c", DataTypes.BOOLEAN()) + .physicalColumn("file_uuid", DataTypes.BINARY(16)) + .physicalColumn("bit_c", DataTypes.BINARY(8)) + .physicalColumn("text_c", DataTypes.STRING()) + .physicalColumn("tiny_blob_c", DataTypes.BYTES()) + .physicalColumn("blob_c", DataTypes.BYTES()) + .physicalColumn("medium_blob_c", DataTypes.BYTES()) + .physicalColumn("long_blob_c", DataTypes.BYTES()) + .physicalColumn("enum_c", DataTypes.STRING()) + .physicalColumn("json_c", DataTypes.STRING()) + .physicalColumn("point_c", DataTypes.STRING()) + .physicalColumn("geometry_c", DataTypes.STRING()) + .physicalColumn("linestring_c", DataTypes.STRING()) + .physicalColumn("polygon_c", DataTypes.STRING()) + .physicalColumn("multipoint_c", DataTypes.STRING()) + .physicalColumn("multiline_c", DataTypes.STRING()) + .physicalColumn("multipolygon_c", DataTypes.STRING()) + .physicalColumn("geometrycollection_c", DataTypes.STRING()) + .physicalColumn("year_c", DataTypes.INT()) + .physicalColumn("date_c", DataTypes.DATE()) + .physicalColumn("time_c", DataTypes.TIME(0)) + .physicalColumn("time_3_c", DataTypes.TIME(3)) + .physicalColumn("time_6_c", DataTypes.TIME(6)) + .physicalColumn("datetime_c", DataTypes.TIMESTAMP(0)) + .physicalColumn("datetime3_c", DataTypes.TIMESTAMP(3)) + .physicalColumn("datetime6_c", DataTypes.TIMESTAMP(6)) + .physicalColumn("decimal_c0", DataTypes.DECIMAL(6, 2)) + .physicalColumn("decimal_c1", DataTypes.DECIMAL(9, 4)) + .physicalColumn("decimal_c2", DataTypes.DECIMAL(20, 4)) + .physicalColumn("timestamp_c", DataTypes.TIMESTAMP_LTZ(0)) + .physicalColumn("timestamp3_c", DataTypes.TIMESTAMP_LTZ(3)) + .physicalColumn("timestamp6_c", DataTypes.TIMESTAMP_LTZ(6)) + .primaryKey("id") + .build())); + + // Test create table DDL with inline primary key + statement.execute( + String.format( + "CREATE TABLE `%s`.`newlyAddedTable2`(id SERIAL PRIMARY KEY);", + inventoryDatabase.getDatabaseName())); + expected.add( + new CreateTableEvent( + TableId.tableId( + inventoryDatabase.getDatabaseName(), "newlyAddedTable2"), + Schema.newBuilder() + .physicalColumn("id", DataTypes.DECIMAL(20, 0).notNull()) + .primaryKey("id") + .build())); + + // Test create table DDL with multiple primary keys + statement.execute( + String.format( + "CREATE TABLE `%s`.`newlyAddedTable3`(" + + "id SERIAL," + + "name VARCHAR(17)," + + "notes TEXT," + + "PRIMARY KEY (id, name));", + inventoryDatabase.getDatabaseName())); + expected.add( + new CreateTableEvent( + TableId.tableId( + inventoryDatabase.getDatabaseName(), "newlyAddedTable3"), + Schema.newBuilder() + .physicalColumn("id", DataTypes.DECIMAL(20, 0).notNull()) + .physicalColumn("name", DataTypes.VARCHAR(17).notNull()) + .physicalColumn("notes", DataTypes.STRING()) + .primaryKey("id", "name") + .build())); + } + List actual = fetchResults(events, expected.size()); + assertEqualsInAnyOrder( + expected.stream().map(Object::toString).collect(Collectors.toList()), + actual.stream().map(Object::toString).collect(Collectors.toList())); + } + private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { return new CreateTableEvent( tableId, @@ -384,6 +660,46 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { .build()); } + private List getInventoryCreateAllTableEvents(String databaseName) { + return Arrays.asList( + new CreateTableEvent( + TableId.tableId(databaseName, "products"), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(255).notNull(), "flink") + .physicalColumn("description", DataTypes.VARCHAR(512)) + .physicalColumn("weight", DataTypes.FLOAT()) + .primaryKey(Collections.singletonList("id")) + .build()), + new CreateTableEvent( + TableId.tableId(databaseName, "customers"), + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT().notNull()) + .physicalColumn("first_name", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("last_name", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("email", DataTypes.VARCHAR(255).notNull()) + .primaryKey(Collections.singletonList("id")) + .build()), + new CreateTableEvent( + TableId.tableId(databaseName, "orders"), + Schema.newBuilder() + .physicalColumn("order_number", DataTypes.INT().notNull()) + .physicalColumn("order_date", DataTypes.DATE().notNull()) + .physicalColumn("purchaser", DataTypes.INT().notNull()) + .physicalColumn("quantity", DataTypes.INT().notNull()) + .physicalColumn("product_id", DataTypes.INT().notNull()) + .primaryKey(Collections.singletonList("order_number")) + .build()), + new CreateTableEvent( + TableId.tableId(databaseName, "multi_max_table"), + Schema.newBuilder() + .physicalColumn("order_id", DataTypes.VARCHAR(128).notNull()) + .physicalColumn("index", DataTypes.INT().notNull()) + .physicalColumn("desc", DataTypes.VARCHAR(512).notNull()) + .primaryKey(Arrays.asList("order_id", "index")) + .build())); + } + private List getSnapshotExpected(TableId tableId) { RowType rowType = RowType.of(