diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AddColumnEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AddColumnEvent.java index ab23ef59e7..9881e0c9ee 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AddColumnEvent.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/AddColumnEvent.java @@ -66,20 +66,20 @@ public static class ColumnWithPosition implements Serializable { private final ColumnPosition position; /** The added column lies in the position relative to this column. */ - private final @Nullable Column existingColumn; + private final @Nullable String existedColumnName; /** In the default scenario, we add fields at the end of the column. */ public ColumnWithPosition(Column addColumn) { this.addColumn = addColumn; position = ColumnPosition.LAST; - existingColumn = null; + existedColumnName = null; } public ColumnWithPosition( - Column addColumn, ColumnPosition position, Column existingColumn) { + Column addColumn, ColumnPosition position, @Nullable String existedColumnName) { this.addColumn = addColumn; this.position = position; - this.existingColumn = existingColumn; + this.existedColumnName = existedColumnName; } public Column getAddColumn() { @@ -90,8 +90,9 @@ public ColumnPosition getPosition() { return position; } - public Column getExistingColumn() { - return existingColumn; + @Nullable + public String getExistedColumnName() { + return existedColumnName; } @Override @@ -105,12 +106,12 @@ public boolean equals(Object o) { ColumnWithPosition position1 = (ColumnWithPosition) o; return Objects.equals(addColumn, position1.addColumn) && position == position1.position - && Objects.equals(existingColumn, position1.existingColumn); + && Objects.equals(existedColumnName, position1.existedColumnName); } @Override public int hashCode() { - return Objects.hash(addColumn, position, existingColumn); + return Objects.hash(addColumn, position, existedColumnName); } @Override @@ -120,8 +121,8 @@ public String toString() { + addColumn + ", position=" + position - + ", existingColumn=" - + existingColumn + + ", existedColumnName=" + + existedColumnName + '}'; } } diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DropColumnEvent.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DropColumnEvent.java index 47afa8fb9d..50251c80d0 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DropColumnEvent.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/event/DropColumnEvent.java @@ -17,7 +17,6 @@ package com.ververica.cdc.common.event; import com.ververica.cdc.common.annotation.PublicEvolving; -import com.ververica.cdc.common.schema.Column; import java.util.List; import java.util.Objects; @@ -33,16 +32,15 @@ public class DropColumnEvent implements SchemaChangeEvent { private final TableId tableId; - private final List droppedColumns; + private final List droppedColumnNames; - public DropColumnEvent(TableId tableId, List droppedColumns) { + public DropColumnEvent(TableId tableId, List droppedColumnNames) { this.tableId = tableId; - this.droppedColumns = droppedColumns; + this.droppedColumnNames = droppedColumnNames; } - /** Returns the dropped columns. */ - public List getDroppedColumns() { - return droppedColumns; + public List getDroppedColumnNames() { + return droppedColumnNames; } @Override @@ -55,12 +53,12 @@ public boolean equals(Object o) { } DropColumnEvent that = (DropColumnEvent) o; return Objects.equals(tableId, that.tableId) - && Objects.equals(droppedColumns, that.droppedColumns); + && Objects.equals(droppedColumnNames, that.droppedColumnNames); } @Override public int hashCode() { - return Objects.hash(tableId, droppedColumns); + return Objects.hash(tableId, droppedColumnNames); } @Override @@ -68,8 +66,8 @@ public String toString() { return "DropColumnEvent{" + "tableId=" + tableId - + ", droppedColumns=" - + droppedColumns + + ", droppedColumnNames=" + + droppedColumnNames + '}'; } diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/SchemaUtils.java index b352c319f9..463cb4a7e2 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/SchemaUtils.java @@ -87,16 +87,14 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema case BEFORE: { Preconditions.checkNotNull( - columnWithPosition.getExistingColumn(), - "existingColumn could not be null in BEFORE type AddColumnEvent"); + columnWithPosition.getExistedColumnName(), + "existedColumnName could not be null in BEFORE type AddColumnEvent"); List columnNames = columns.stream().map(Column::getName).collect(Collectors.toList()); - int index = - columnNames.indexOf( - columnWithPosition.getExistingColumn().getName()); + int index = columnNames.indexOf(columnWithPosition.getExistedColumnName()); if (index < 0) { throw new IllegalArgumentException( - columnWithPosition.getExistingColumn().getName() + columnWithPosition.getExistedColumnName() + " of AddColumnEvent is not existed"); } columns.add(index, columnWithPosition.getAddColumn()); @@ -105,16 +103,14 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema case AFTER: { Preconditions.checkNotNull( - columnWithPosition.getExistingColumn(), - "existingColumn could not be null in AFTER type AddColumnEvent"); + columnWithPosition.getExistedColumnName(), + "existedColumnName could not be null in AFTER type AddColumnEvent"); List columnNames = columns.stream().map(Column::getName).collect(Collectors.toList()); - int index = - columnNames.indexOf( - columnWithPosition.getExistingColumn().getName()); + int index = columnNames.indexOf(columnWithPosition.getExistedColumnName()); if (index < 0) { throw new IllegalArgumentException( - columnWithPosition.getExistingColumn().getName() + columnWithPosition.getExistedColumnName() + " of AddColumnEvent is not existed"); } columns.add(index + 1, columnWithPosition.getAddColumn()); @@ -126,17 +122,19 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema } private static Schema applyDropColumnEvent(DropColumnEvent event, Schema oldSchema) { - event.getDroppedColumns() + event.getDroppedColumnNames() .forEach( column -> { - if (!oldSchema.getColumn(column.getName()).isPresent()) { + if (!oldSchema.getColumn(column).isPresent()) { throw new IllegalArgumentException( - column.getName() + " of DropColumnEvent is not existed"); + column + " of DropColumnEvent is not existed"); } }); List columns = oldSchema.getColumns().stream() - .filter((column -> !event.getDroppedColumns().contains(column))) + .filter( + (column -> + !event.getDroppedColumnNames().contains(column.getName()))) .collect(Collectors.toList()); return oldSchema.copy(columns); } diff --git a/flink-cdc-common/src/test/java/com/ververica/cdc/common/testutils/assertions/DropColumnEventAssert.java b/flink-cdc-common/src/test/java/com/ververica/cdc/common/testutils/assertions/DropColumnEventAssert.java index 633b47997b..1065bd485a 100644 --- a/flink-cdc-common/src/test/java/com/ververica/cdc/common/testutils/assertions/DropColumnEventAssert.java +++ b/flink-cdc-common/src/test/java/com/ververica/cdc/common/testutils/assertions/DropColumnEventAssert.java @@ -17,7 +17,6 @@ package com.ververica.cdc.common.testutils.assertions; import com.ververica.cdc.common.event.DropColumnEvent; -import com.ververica.cdc.common.schema.Column; import org.assertj.core.internal.Iterables; import java.util.List; @@ -35,8 +34,8 @@ private DropColumnEventAssert(DropColumnEvent event) { super(event, DropColumnEventAssert.class); } - public DropColumnEventAssert containsDroppedColumns(Column... droppedColumns) { - List actualDroppedColumns = actual.getDroppedColumns(); + public DropColumnEventAssert containsDroppedColumns(String... droppedColumns) { + List actualDroppedColumns = actual.getDroppedColumnNames(); iterables.assertContainsExactlyInAnyOrder(info, actualDroppedColumns, droppedColumns); return myself; } diff --git a/flink-cdc-common/src/test/java/com/ververica/cdc/common/utils/SchemaUtilsTest.java b/flink-cdc-common/src/test/java/com/ververica/cdc/common/utils/SchemaUtilsTest.java index d0483d74b0..16cd404ab0 100644 --- a/flink-cdc-common/src/test/java/com/ververica/cdc/common/utils/SchemaUtilsTest.java +++ b/flink-cdc-common/src/test/java/com/ververica/cdc/common/utils/SchemaUtilsTest.java @@ -67,7 +67,7 @@ public void testApplySchemaChangeEvent() { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("col4", DataTypes.STRING()), AddColumnEvent.ColumnPosition.BEFORE, - Column.physicalColumn("col3", DataTypes.STRING()))); + "col3")); addColumnEvent = new AddColumnEvent(tableId, addedColumns); schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent); Assert.assertEquals( @@ -85,7 +85,7 @@ public void testApplySchemaChangeEvent() { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("col5", DataTypes.STRING()), AddColumnEvent.ColumnPosition.AFTER, - Column.physicalColumn("col4", DataTypes.STRING()))); + "col4")); addColumnEvent = new AddColumnEvent(tableId, addedColumns); schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent); Assert.assertEquals( @@ -120,11 +120,7 @@ public void testApplySchemaChangeEvent() { // drop columns DropColumnEvent dropColumnEvent = - new DropColumnEvent( - tableId, - Arrays.asList( - Column.physicalColumn("col3", DataTypes.STRING()), - Column.physicalColumn("col5", DataTypes.STRING()))); + new DropColumnEvent(tableId, Arrays.asList("col3", "col5")); schema = SchemaUtils.applySchemaChangeEvent(schema, dropColumnEvent); Assert.assertEquals( schema, diff --git a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java index cb0758923b..1e580d731d 100644 --- a/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/com/ververica/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -137,9 +137,9 @@ void testSingleSplitSingleTable() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}", - "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", - "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}", + "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ], after=[2, x], op=UPDATE, meta=()}"); } @@ -198,12 +198,12 @@ void testSingleSplitMultipleTables() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}", - "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}", + "AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}", "RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}", - "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}", + "DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}"); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisMetadataApplier.java index a7691a5aa0..b7f5917c40 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/com/ververica/cdc/connectors/doris/sink/DorisMetadataApplier.java @@ -173,10 +173,9 @@ private void applyAddColumnEvent(AddColumnEvent event) private void applyDropColumnEvent(DropColumnEvent event) throws IOException, IllegalArgumentException { TableId tableId = event.tableId(); - List droppedColumns = event.getDroppedColumns(); - for (Column col : droppedColumns) { - schemaChangeManager.dropColumn( - tableId.getSchemaName(), tableId.getTableName(), col.getName()); + List droppedColumns = event.getDroppedColumnNames(); + for (String col : droppedColumns) { + schemaChangeManager.dropColumn(tableId.getSchemaName(), tableId.getTableName(), col); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java index 93d738767a..6bc7a8a555 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java @@ -22,7 +22,6 @@ import com.ververica.cdc.common.event.RenameColumnEvent; import com.ververica.cdc.common.event.SchemaChangeEvent; import com.ververica.cdc.common.types.DataType; -import com.ververica.cdc.common.types.DataTypes; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.connector.mysql.antlr.listener.AlterTableParserListener; import io.debezium.ddl.parser.mysql.generated.MySqlParser; @@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -116,10 +114,7 @@ public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) { new AddColumnEvent.ColumnWithPosition( toCdcColumn(column), AddColumnEvent.ColumnPosition.AFTER, - com.ververica.cdc.common.schema.Column - .physicalColumn( - afterColumn, - DataTypes.BIGINT()))))); + afterColumn)))); } else { changes.add( new AddColumnEvent( @@ -223,12 +218,7 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx) @Override public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) { String removedColName = parser.parseName(ctx.uid()); - changes.add( - new DropColumnEvent( - currentTable, - Arrays.asList( - com.ververica.cdc.common.schema.Column.physicalColumn( - removedColName, DataTypes.BIGINT())))); + changes.add(new DropColumnEvent(currentTable, Collections.singletonList(removedColName))); super.enterAlterByDropColumn(ctx); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 00bdda4bae..5e6ed2ff39 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -313,7 +313,7 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) { .physicalColumn("name", DataTypes.VARCHAR(255).notNull()) .physicalColumn("description", DataTypes.VARCHAR(512)) .physicalColumn("weight", DataTypes.FLOAT()) - .primaryKey(Arrays.asList("id")) + .primaryKey(Collections.singletonList("id")) .build()); } @@ -466,11 +466,11 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st expected.add( new AddColumnEvent( tableId, - Arrays.asList( + Collections.singletonList( new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("desc1", DataTypes.VARCHAR(45)), AddColumnEvent.ColumnPosition.AFTER, - Column.physicalColumn("weight", DataTypes.BIGINT()))))); + "weight")))); statement.execute( String.format( @@ -479,29 +479,25 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st expected.add( new AddColumnEvent( tableId, - Arrays.asList( + Collections.singletonList( new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("col1", DataTypes.VARCHAR(45)), AddColumnEvent.ColumnPosition.AFTER, - Column.physicalColumn("weight", DataTypes.BIGINT()))))); + "weight")))); expected.add( new AddColumnEvent( tableId, - Arrays.asList( + Collections.singletonList( new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("col2", DataTypes.VARCHAR(55)), AddColumnEvent.ColumnPosition.AFTER, - Column.physicalColumn("desc1", DataTypes.BIGINT()))))); + "desc1")))); statement.execute( String.format( "ALTER TABLE `%s`.`products` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;", inventoryDatabase.getDatabaseName())); - expected.add( - new DropColumnEvent( - tableId, - Collections.singletonList( - Column.physicalColumn("desc2", DataTypes.BIGINT())))); + expected.add(new DropColumnEvent(tableId, Collections.singletonList("desc2"))); expected.add( new AlterColumnTypeEvent( tableId, Collections.singletonMap("desc1", DataTypes.VARCHAR(65)))); @@ -517,11 +513,7 @@ private List executeAlterAndProvideExpected(TableId tableId, Statement st String.format( "ALTER TABLE `%s`.`products` DROP COLUMN `DESC3`;", inventoryDatabase.getDatabaseName())); - expected.add( - new DropColumnEvent( - tableId, - Collections.singletonList( - Column.physicalColumn("DESC3", DataTypes.BIGINT())))); + expected.add(new DropColumnEvent(tableId, Collections.singletonList("DESC3"))); // Should not catch SchemaChangeEvent of tables other than `products` statement.execute( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java index ba5c5b4515..b43c687dfa 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/com/ververica/cdc/connectors/starrocks/sink/StarRocksMetadataApplier.java @@ -34,7 +34,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; import static com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType; @@ -187,10 +186,7 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) { } private void applyDropColumn(DropColumnEvent dropColumnEvent) { - List dropColumns = - dropColumnEvent.getDroppedColumns().stream() - .map(Column::getName) - .collect(Collectors.toList()); + List dropColumns = dropColumnEvent.getDroppedColumnNames(); TableId tableId = dropColumnEvent.tableId(); StarRocksCatalogException alterException = null; try { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java index 0616f64155..47bf9458b7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/EventRecordSerializationSchemaTest.java @@ -71,6 +71,7 @@ import java.time.ZoneOffset; import java.util.Arrays; import java.util.HashMap; +import java.util.Objects; import java.util.OptionalLong; import java.util.SortedMap; import java.util.TreeMap; @@ -234,15 +235,11 @@ public void testMixedSchemaAndDataChanges() throws Exception { verifySerializeResult( table1, "{\"col1\":4,\"col2\":true,\"col3\":\"2023-11-27 21:00:00\",\"col4\":83.23,\"col5\":9,\"col6\":\"2023-11-27 19:00:00\",\"__op\":1}", - serializer.serialize(deleteEvent2)); + Objects.requireNonNull(serializer.serialize(deleteEvent2))); // 4. drop columns from table2, and insert data DropColumnEvent dropColumnEvent = - new DropColumnEvent( - table2, - Arrays.asList( - Column.physicalColumn("col2", new FloatType()), - Column.physicalColumn("col3", new VarCharType(20)))); + new DropColumnEvent(table2, Arrays.asList("col2", "col3")); Schema newSchema2 = SchemaUtils.applySchemaChangeEvent(schema2, dropColumnEvent); BinaryRecordDataGenerator newGenerator2 = new BinaryRecordDataGenerator( @@ -255,7 +252,9 @@ public void testMixedSchemaAndDataChanges() throws Exception { newGenerator2.generate( new Object[] {(int) LocalDate.of(2023, 11, 28).toEpochDay()})); verifySerializeResult( - table2, "{\"col1\":\"2023-11-28\",\"__op\":0}", serializer.serialize(insertEvent3)); + table2, + "{\"col1\":\"2023-11-28\",\"__op\":0}", + Objects.requireNonNull(serializer.serialize(insertEvent3))); } private void verifySerializeResult( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java index 8430ce25f8..549a9194fd 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/com/ververical/cdc/connectors/starrocks/sink/StarRocksMetadataApplierTest.java @@ -200,11 +200,7 @@ public void testDropColumn() throws Exception { metadataApplier.applySchemaChange(createTableEvent); DropColumnEvent dropColumnEvent = - new DropColumnEvent( - tableId, - Arrays.asList( - Column.physicalColumn("col2", new BooleanType()), - Column.physicalColumn("col3", new TimestampType()))); + new DropColumnEvent(tableId, Arrays.asList("col2", "col3")); metadataApplier.applySchemaChange(dropColumnEvent); StarRocksTable actualTable = diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/ValuesDatabase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/ValuesDatabase.java index 3bfc3654bf..7969b7b673 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/ValuesDatabase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/ValuesDatabase.java @@ -45,6 +45,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -186,6 +187,8 @@ private static class ValuesTable { private final LinkedList columns; + private final LinkedList columnNames; + private final List primaryKeys; // indexes of primaryKeys in columns @@ -195,6 +198,7 @@ public ValuesTable(TableId tableId, Schema schema) { this.tableId = tableId; this.lock = new Object(); this.columns = new LinkedList<>(schema.getColumns()); + this.columnNames = new LinkedList<>(schema.getColumnNames()); this.records = new HashMap<>(); this.primaryKeys = new LinkedList<>(schema.primaryKeys()); this.primaryKeyIndexes = new ArrayList<>(); @@ -278,7 +282,7 @@ public void applySchemaChangeEvent(SchemaChangeEvent event) { } private void updatePrimaryKeyIndexes() { - Preconditions.checkArgument(primaryKeys.size() > 0, "primaryKeys couldn't be empty"); + Preconditions.checkArgument(!primaryKeys.isEmpty(), "primaryKeys couldn't be empty"); primaryKeyIndexes.clear(); for (String primaryKey : primaryKeys) { for (int i = 0; i < columns.size(); i++) { @@ -322,23 +326,29 @@ private void applyAddColumnEvent(AddColumnEvent event) { case FIRST: { columns.addFirst(columnWithPosition.getAddColumn()); + columnNames.addFirst(columnWithPosition.getAddColumn().getName()); break; } case LAST: { columns.addLast(columnWithPosition.getAddColumn()); + columnNames.addLast(columnWithPosition.getAddColumn().getName()); break; } case BEFORE: { - int index = columns.indexOf(columnWithPosition.getExistingColumn()); + int index = + columnNames.indexOf(columnWithPosition.getExistedColumnName()); columns.add(index, columnWithPosition.getAddColumn()); + columnNames.add(index, columnWithPosition.getAddColumn().getName()); break; } case AFTER: { - int index = columns.indexOf(columnWithPosition.getExistingColumn()); + int index = + columnNames.indexOf(columnWithPosition.getExistedColumnName()); columns.add(index + 1, columnWithPosition.getAddColumn()); + columnNames.add(index + 1, columnWithPosition.getAddColumn().getName()); break; } } @@ -346,14 +356,24 @@ private void applyAddColumnEvent(AddColumnEvent event) { } private void applyDropColumnEvent(DropColumnEvent event) { - for (Column column : event.getDroppedColumns()) { - if (!columns.remove(column)) { - throw new IllegalArgumentException(column.getName() + " is not existed"); + for (String columnName : event.getDroppedColumnNames()) { + if (!removeColumn(columnName)) { + throw new IllegalArgumentException(columnName + " is not existed"); } - records.forEach((key, record) -> record.remove(column.getName())); + records.forEach((key, record) -> record.remove(columnName)); } } + private boolean removeColumn(String columnName) { + int index = columnNames.indexOf(columnName); + if (index == -1) { + return false; + } + + return Objects.nonNull(columnNames.remove(index)) + && Objects.nonNull(columns.remove(index)); + } + private void applyRenameColumnEvent(RenameColumnEvent event) { event.getNameMapping() .forEach( @@ -364,6 +384,7 @@ private void applyRenameColumnEvent(RenameColumnEvent event) { columns.set( i, Column.physicalColumn(afterName, column.getType())); + columnNames.set(i, afterName); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java index 0a3b7549a7..6eba2e29ad 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/com/ververica/cdc/connectors/values/source/ValuesDataSourceHelper.java @@ -168,10 +168,7 @@ public static List> singleSplitSingleTable() { // drop column DropColumnEvent dropColumnEvent = - new DropColumnEvent( - TABLE_1, - Collections.singletonList( - Column.physicalColumn("newCol2", DataTypes.STRING()))); + new DropColumnEvent(TABLE_1, Collections.singletonList("newCol2")); split1.add(dropColumnEvent); // delete @@ -296,10 +293,7 @@ public static List> singleSplitMultiTables() { // drop column DropColumnEvent dropColumnEvent = - new DropColumnEvent( - TABLE_1, - Collections.singletonList( - Column.physicalColumn("newCol2", DataTypes.STRING()))); + new DropColumnEvent(TABLE_1, Collections.singletonList("newCol2")); split1.add(dropColumnEvent); // delete diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/ValuesDatabaseTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/ValuesDatabaseTest.java index b286279942..2214eaca44 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/ValuesDatabaseTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/test/java/com/ververica/cdc/connectors/values/ValuesDatabaseTest.java @@ -188,10 +188,7 @@ public void testApplySchemaChangeEvent() { Assert.assertEquals(schema, metadataAccessor.getTableSchema(table1)); DropColumnEvent dropColumnEvent = - new DropColumnEvent( - table1, - Collections.singletonList( - Column.physicalColumn("newCol2", new CharType()))); + new DropColumnEvent(table1, Collections.singletonList("newCol2")); metadataApplier.applySchemaChange(dropColumnEvent); schema = Schema.newBuilder() @@ -284,10 +281,7 @@ public void testSchemaChangeWithExistedData() { Assert.assertEquals(results, ValuesDatabase.getResults(table1)); DropColumnEvent dropColumnEvent = - new DropColumnEvent( - table1, - Collections.singletonList( - Column.physicalColumn("newCol2", new CharType()))); + new DropColumnEvent(table1, Collections.singletonList("newCol2")); metadataApplier.applySchemaChange(dropColumnEvent); results.clear(); results.add("default.default.table1:col1=1;newCol3="); diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java index 3d8d1c5b32..6fde31817d 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/operators/route/RouteFunction.java @@ -157,7 +157,7 @@ private SchemaChangeEvent recreateSchemaChangeEvent( } if (schemaChangeEvent instanceof DropColumnEvent) { DropColumnEvent dropColumnEvent = (DropColumnEvent) schemaChangeEvent; - return new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumns()); + return new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumnNames()); } if (schemaChangeEvent instanceof AddColumnEvent) { AddColumnEvent addColumnEvent = (AddColumnEvent) schemaChangeEvent; diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/event/DropColumnEventSerializer.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/event/DropColumnEventSerializer.java index 157da6de3c..2e0ed6be4e 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/event/DropColumnEventSerializer.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/event/DropColumnEventSerializer.java @@ -24,11 +24,10 @@ import com.ververica.cdc.common.event.DropColumnEvent; import com.ververica.cdc.common.event.TableId; -import com.ververica.cdc.common.schema.Column; import com.ververica.cdc.runtime.serializer.ListSerializer; +import com.ververica.cdc.runtime.serializer.StringSerializer; import com.ververica.cdc.runtime.serializer.TableIdSerializer; import com.ververica.cdc.runtime.serializer.TypeSerializerSingleton; -import com.ververica.cdc.runtime.serializer.schema.ColumnSerializer; import java.io.IOException; import java.util.Collections; @@ -42,8 +41,8 @@ public class DropColumnEventSerializer extends TypeSerializerSingleton columnsSerializer = - new ListSerializer<>(ColumnSerializer.INSTANCE); + private final ListSerializer columnNamesSerializer = + new ListSerializer<>(StringSerializer.INSTANCE); @Override public boolean isImmutableType() { @@ -58,7 +57,7 @@ public DropColumnEvent createInstance() { @Override public DropColumnEvent copy(DropColumnEvent from) { return new DropColumnEvent( - from.tableId(), columnsSerializer.copy(from.getDroppedColumns())); + from.tableId(), columnNamesSerializer.copy(from.getDroppedColumnNames())); } @Override @@ -74,13 +73,13 @@ public int getLength() { @Override public void serialize(DropColumnEvent record, DataOutputView target) throws IOException { tableIdSerializer.serialize(record.tableId(), target); - columnsSerializer.serialize(record.getDroppedColumns(), target); + columnNamesSerializer.serialize(record.getDroppedColumnNames(), target); } @Override public DropColumnEvent deserialize(DataInputView source) throws IOException { return new DropColumnEvent( - tableIdSerializer.deserialize(source), columnsSerializer.deserialize(source)); + tableIdSerializer.deserialize(source), columnNamesSerializer.deserialize(source)); } @Override diff --git a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/schema/ColumnWithPositionSerializer.java b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/schema/ColumnWithPositionSerializer.java index d3080c1bd3..87776b85ac 100644 --- a/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/schema/ColumnWithPositionSerializer.java +++ b/flink-cdc-runtime/src/main/java/com/ververica/cdc/runtime/serializer/schema/ColumnWithPositionSerializer.java @@ -27,6 +27,7 @@ import com.ververica.cdc.common.types.DataTypes; import com.ververica.cdc.runtime.serializer.EnumSerializer; import com.ververica.cdc.runtime.serializer.NullableSerializerWrapper; +import com.ververica.cdc.runtime.serializer.StringSerializer; import com.ververica.cdc.runtime.serializer.TypeSerializerSingleton; import java.io.IOException; @@ -40,8 +41,11 @@ public class ColumnWithPositionSerializer /** Sharable instance of the TableIdSerializer. */ public static final ColumnWithPositionSerializer INSTANCE = new ColumnWithPositionSerializer(); - private final TypeSerializer columnSerializer = + private final TypeSerializer addColumnSerializer = new NullableSerializerWrapper<>(ColumnSerializer.INSTANCE); + + private final TypeSerializer existedColumnNameSerializer = StringSerializer.INSTANCE; + private final EnumSerializer positionEnumSerializer = new EnumSerializer<>(AddColumnEvent.ColumnPosition.class); @@ -59,9 +63,9 @@ public AddColumnEvent.ColumnWithPosition createInstance() { @Override public AddColumnEvent.ColumnWithPosition copy(AddColumnEvent.ColumnWithPosition from) { return new AddColumnEvent.ColumnWithPosition( - columnSerializer.copy(from.getAddColumn()), + addColumnSerializer.copy(from.getAddColumn()), from.getPosition(), - columnSerializer.copy(from.getExistingColumn())); + existedColumnNameSerializer.copy(from.getExistedColumnName())); } @Override @@ -78,25 +82,17 @@ public int getLength() { @Override public void serialize(AddColumnEvent.ColumnWithPosition record, DataOutputView target) throws IOException { - columnSerializer.serialize(record.getAddColumn(), target); + addColumnSerializer.serialize(record.getAddColumn(), target); positionEnumSerializer.serialize(record.getPosition(), target); - if (record.getExistingColumn() == null) { - target.writeInt(0); - } else { - target.writeInt(1); - columnSerializer.serialize(record.getExistingColumn(), target); - } + existedColumnNameSerializer.serialize(record.getExistedColumnName(), target); } @Override public AddColumnEvent.ColumnWithPosition deserialize(DataInputView source) throws IOException { - Column addColumn = columnSerializer.deserialize(source); + Column addColumn = addColumnSerializer.deserialize(source); AddColumnEvent.ColumnPosition position = positionEnumSerializer.deserialize(source); - if (source.readInt() == 1) { - return new AddColumnEvent.ColumnWithPosition( - addColumn, position, columnSerializer.deserialize(source)); - } - return new AddColumnEvent.ColumnWithPosition(addColumn, position, null); + return new AddColumnEvent.ColumnWithPosition( + addColumn, position, existedColumnNameSerializer.deserialize(source)); } @Override diff --git a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java index 78adc6f64d..057745fff9 100644 --- a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java +++ b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/route/RouteFunctionTest.java @@ -164,12 +164,12 @@ void testSchemaChangeEventRouting() throws Exception { // DropColumnEvent PhysicalColumn droppedColumn = Column.physicalColumn("address", DataTypes.STRING()); - List droppedColumns = Collections.singletonList(droppedColumn); + List droppedColumns = Collections.singletonList(droppedColumn.getName()); DropColumnEvent dropColumnEvent = new DropColumnEvent(CUSTOMERS, droppedColumns); assertThat(router.map(dropColumnEvent)) .asSchemaChangeEvent() .asDropColumnEvent() - .containsDroppedColumns(droppedColumn) + .containsDroppedColumns(droppedColumn.getName()) .hasTableId(NEW_CUSTOMERS); // RenameColumnEvent diff --git a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java index a1c096a41f..488f07cb65 100644 --- a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java +++ b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/operators/schema/coordinator/SchemaManagerTest.java @@ -86,11 +86,11 @@ void testHandlingAddColumnEvent() { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("append_after_id", DataTypes.BIGINT()), AddColumnEvent.ColumnPosition.AFTER, - Column.physicalColumn("id", DataTypes.INT())), + "id"), new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("append_before_phone", DataTypes.BIGINT()), AddColumnEvent.ColumnPosition.BEFORE, - Column.physicalColumn("phone", DataTypes.BIGINT()))); + "phone")); schemaManager.applySchemaChange(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)); schemaManager.applySchemaChange(new AddColumnEvent(CUSTOMERS, newColumns)); @@ -129,11 +129,7 @@ void testHandlingDropColumnEvent() { SchemaManager schemaManager = new SchemaManager(); schemaManager.applySchemaChange(new CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA)); schemaManager.applySchemaChange( - new DropColumnEvent( - CUSTOMERS, - Arrays.asList( - Column.physicalColumn("name", DataTypes.STRING()), - Column.physicalColumn("phone", DataTypes.BIGINT())))); + new DropColumnEvent(CUSTOMERS, Arrays.asList("name", "phone"))); assertThat(schemaManager.getLatestSchema(CUSTOMERS)) .contains( Schema.newBuilder() diff --git a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/AddColumnEventSerializerTest.java b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/AddColumnEventSerializerTest.java index f7f4844472..15476f2c86 100644 --- a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/AddColumnEventSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/AddColumnEventSerializerTest.java @@ -54,7 +54,7 @@ protected AddColumnEvent[] getTestData() { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("testCol2", DataTypes.DOUBLE(), "desc"), AddColumnEvent.ColumnPosition.AFTER, - Column.physicalColumn("testCol1", DataTypes.TIMESTAMP())))), + "testCol1"))), new AddColumnEvent( TableId.tableId("schema", "table"), Arrays.asList( @@ -65,7 +65,7 @@ protected AddColumnEvent[] getTestData() { new AddColumnEvent.ColumnWithPosition( Column.metadataColumn("testCol2", DataTypes.DOUBLE(), "mKey"), AddColumnEvent.ColumnPosition.BEFORE, - Column.metadataColumn("testCol1", DataTypes.TIMESTAMP())))), + "testCol1"))), new AddColumnEvent( TableId.tableId("namespace", "schema", "table"), Arrays.asList( @@ -77,7 +77,7 @@ protected AddColumnEvent[] getTestData() { Column.metadataColumn( "testCol2", DataTypes.DOUBLE(), "mKey", "desc"), AddColumnEvent.ColumnPosition.BEFORE, - Column.physicalColumn("testCol1", DataTypes.TIMESTAMP())))) + "testCol1"))) }; } } diff --git a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/DropColumnEventSerializerTest.java b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/DropColumnEventSerializerTest.java index d35917c888..f8efd20342 100644 --- a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/DropColumnEventSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/DropColumnEventSerializerTest.java @@ -20,8 +20,6 @@ import com.ververica.cdc.common.event.DropColumnEvent; import com.ververica.cdc.common.event.TableId; -import com.ververica.cdc.common.schema.Column; -import com.ververica.cdc.common.types.DataTypes; import com.ververica.cdc.runtime.serializer.SerializerTestBase; import java.util.Arrays; @@ -46,22 +44,11 @@ protected Class getTypeClass() { @Override protected DropColumnEvent[] getTestData() { return new DropColumnEvent[] { + new DropColumnEvent(TableId.tableId("table"), Arrays.asList("c1", "c2")), new DropColumnEvent( - TableId.tableId("table"), - Arrays.asList( - Column.physicalColumn("c1", DataTypes.TIMESTAMP()), - Column.physicalColumn("c2", DataTypes.DOUBLE(), "desc"))), + TableId.tableId("schema", "table"), Arrays.asList("m1", "m2", "m3")), new DropColumnEvent( - TableId.tableId("schema", "table"), - Arrays.asList( - Column.metadataColumn("m1", DataTypes.TIMESTAMP()), - Column.metadataColumn("m2", DataTypes.DOUBLE(), "mKey"), - Column.metadataColumn("m3", DataTypes.DOUBLE(), "mKey", "desc"))), - new DropColumnEvent( - TableId.tableId("namespace", "schema", "table"), - Arrays.asList( - Column.physicalColumn("c1", DataTypes.TIMESTAMP()), - Column.physicalColumn("c2", DataTypes.DOUBLE(), "desc"))) + TableId.tableId("namespace", "schema", "table"), Arrays.asList("c1", "c2")) }; } } diff --git a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/SchemaChangeEventSerializerTest.java b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/SchemaChangeEventSerializerTest.java index 310e263e3f..44102b9efb 100644 --- a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/SchemaChangeEventSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/event/SchemaChangeEventSerializerTest.java @@ -80,15 +80,11 @@ protected SchemaChangeEvent[] getTestData() { new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("testCol2", DataTypes.DOUBLE(), "desc"), AddColumnEvent.ColumnPosition.AFTER, - Column.physicalColumn("testCol1", DataTypes.TIMESTAMP())))), + "testCol1"))), new AlterColumnTypeEvent(TableId.tableId("namespace", "schema", "table"), alterTypeMap), new CreateTableEvent(TableId.tableId("schema", "table"), schema), new DropColumnEvent( - TableId.tableId("schema", "table"), - Arrays.asList( - Column.metadataColumn("m1", DataTypes.TIMESTAMP()), - Column.metadataColumn("m2", DataTypes.DOUBLE(), "mKey"), - Column.metadataColumn("m3", DataTypes.DOUBLE(), "mKey", "desc"))), + TableId.tableId("schema", "table"), Arrays.asList("m1", "m2", "m3")), new RenameColumnEvent(TableId.tableId("table"), renameMap) }; } diff --git a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/schema/ColumnWithPositionSerializerTest.java b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/schema/ColumnWithPositionSerializerTest.java index e7d7b98830..500073fd24 100644 --- a/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/schema/ColumnWithPositionSerializerTest.java +++ b/flink-cdc-runtime/src/test/java/com/ververica/cdc/runtime/serializer/schema/ColumnWithPositionSerializerTest.java @@ -51,13 +51,13 @@ protected AddColumnEvent.ColumnWithPosition[] getTestData() { new AddColumnEvent.ColumnWithPosition( Column.metadataColumn("testCol2", DataTypes.DOUBLE(), "mKey"), AddColumnEvent.ColumnPosition.BEFORE, - Column.metadataColumn("testCol1", DataTypes.TIMESTAMP())), + "testCol1"), new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("testCol1", DataTypes.TIMESTAMP())), new AddColumnEvent.ColumnWithPosition( Column.physicalColumn("testCol2", DataTypes.DOUBLE(), "desc"), AddColumnEvent.ColumnPosition.AFTER, - Column.physicalColumn("testCol1", DataTypes.TIMESTAMP())) + "testCol1") }; } }