Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cdc-common][cdc-pipelines][cdc-runtime] Use name instead of column in DropColumnEvent/AddColumnEvent. #2888

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,20 @@ public static class ColumnWithPosition implements Serializable {
private final ColumnPosition position;

/** The added column lies in the position relative to this column. */
private final @Nullable Column existingColumn;
joyCurry30 marked this conversation as resolved.
Show resolved Hide resolved
private final @Nullable String existedColumnName;

/** In the default scenario, we add fields at the end of the column. */
public ColumnWithPosition(Column addColumn) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a public API, could we avoid changing this method ?

Copy link
Contributor

@lvyanquan lvyanquan Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a PublicEvolving API, we change this because column type is unnecessary for sink to apply this SchemaChange, and source now just pass an arbitrary column type to build it.

this.addColumn = addColumn;
position = ColumnPosition.LAST;
existingColumn = null;
existedColumnName = null;
}

public ColumnWithPosition(
Column addColumn, ColumnPosition position, Column existingColumn) {
Column addColumn, ColumnPosition position, @Nullable String existedColumnName) {
this.addColumn = addColumn;
this.position = position;
this.existingColumn = existingColumn;
this.existedColumnName = existedColumnName;
}

public Column getAddColumn() {
Expand All @@ -90,8 +90,9 @@ public ColumnPosition getPosition() {
return position;
}

public Column getExistingColumn() {
return existingColumn;
@Nullable
public String getExistedColumnName() {
return existedColumnName;
}

@Override
Expand All @@ -105,12 +106,12 @@ public boolean equals(Object o) {
ColumnWithPosition position1 = (ColumnWithPosition) o;
return Objects.equals(addColumn, position1.addColumn)
&& position == position1.position
&& Objects.equals(existingColumn, position1.existingColumn);
&& Objects.equals(existedColumnName, position1.existedColumnName);
}

@Override
public int hashCode() {
return Objects.hash(addColumn, position, existingColumn);
return Objects.hash(addColumn, position, existedColumnName);
}

@Override
Expand All @@ -120,8 +121,8 @@ public String toString() {
+ addColumn
+ ", position="
+ position
+ ", existingColumn="
+ existingColumn
+ ", existedColumnName="
+ existedColumnName
+ '}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.ververica.cdc.common.event;

import com.ververica.cdc.common.annotation.PublicEvolving;
import com.ververica.cdc.common.schema.Column;

import java.util.List;
import java.util.Objects;
Expand All @@ -33,16 +32,15 @@ public class DropColumnEvent implements SchemaChangeEvent {

private final TableId tableId;

private final List<Column> droppedColumns;
private final List<String> droppedColumnNames;

public DropColumnEvent(TableId tableId, List<Column> droppedColumns) {
public DropColumnEvent(TableId tableId, List<String> droppedColumnNames) {
this.tableId = tableId;
this.droppedColumns = droppedColumns;
this.droppedColumnNames = droppedColumnNames;
}

/** Returns the dropped columns. */
public List<Column> getDroppedColumns() {
return droppedColumns;
public List<String> getDroppedColumnNames() {
return droppedColumnNames;
}

@Override
Expand All @@ -55,21 +53,21 @@ public boolean equals(Object o) {
}
DropColumnEvent that = (DropColumnEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(droppedColumns, that.droppedColumns);
&& Objects.equals(droppedColumnNames, that.droppedColumnNames);
}

@Override
public int hashCode() {
return Objects.hash(tableId, droppedColumns);
return Objects.hash(tableId, droppedColumnNames);
}

@Override
public String toString() {
return "DropColumnEvent{"
+ "tableId="
+ tableId
+ ", droppedColumns="
+ droppedColumns
+ ", droppedColumnNames="
+ droppedColumnNames
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,14 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
case BEFORE:
{
Preconditions.checkNotNull(
columnWithPosition.getExistingColumn(),
"existingColumn could not be null in BEFORE type AddColumnEvent");
columnWithPosition.getExistedColumnName(),
"existedColumnName could not be null in BEFORE type AddColumnEvent");
List<String> columnNames =
columns.stream().map(Column::getName).collect(Collectors.toList());
int index =
columnNames.indexOf(
columnWithPosition.getExistingColumn().getName());
int index = columnNames.indexOf(columnWithPosition.getExistedColumnName());
if (index < 0) {
throw new IllegalArgumentException(
columnWithPosition.getExistingColumn().getName()
columnWithPosition.getExistedColumnName()
+ " of AddColumnEvent is not existed");
}
columns.add(index, columnWithPosition.getAddColumn());
Expand All @@ -105,16 +103,14 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
case AFTER:
{
Preconditions.checkNotNull(
columnWithPosition.getExistingColumn(),
"existingColumn could not be null in AFTER type AddColumnEvent");
columnWithPosition.getExistedColumnName(),
"existedColumnName could not be null in AFTER type AddColumnEvent");
List<String> columnNames =
columns.stream().map(Column::getName).collect(Collectors.toList());
int index =
columnNames.indexOf(
columnWithPosition.getExistingColumn().getName());
int index = columnNames.indexOf(columnWithPosition.getExistedColumnName());
if (index < 0) {
throw new IllegalArgumentException(
columnWithPosition.getExistingColumn().getName()
columnWithPosition.getExistedColumnName()
+ " of AddColumnEvent is not existed");
}
columns.add(index + 1, columnWithPosition.getAddColumn());
Expand All @@ -126,17 +122,19 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
}

private static Schema applyDropColumnEvent(DropColumnEvent event, Schema oldSchema) {
event.getDroppedColumns()
event.getDroppedColumnNames()
.forEach(
column -> {
if (!oldSchema.getColumn(column.getName()).isPresent()) {
if (!oldSchema.getColumn(column).isPresent()) {
throw new IllegalArgumentException(
column.getName() + " of DropColumnEvent is not existed");
column + " of DropColumnEvent is not existed");
}
});
List<Column> columns =
oldSchema.getColumns().stream()
.filter((column -> !event.getDroppedColumns().contains(column)))
.filter(
(column ->
!event.getDroppedColumnNames().contains(column.getName())))
.collect(Collectors.toList());
return oldSchema.copy(columns);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.ververica.cdc.common.testutils.assertions;

import com.ververica.cdc.common.event.DropColumnEvent;
import com.ververica.cdc.common.schema.Column;
import org.assertj.core.internal.Iterables;

import java.util.List;
Expand All @@ -35,8 +34,8 @@ private DropColumnEventAssert(DropColumnEvent event) {
super(event, DropColumnEventAssert.class);
}

public DropColumnEventAssert containsDroppedColumns(Column... droppedColumns) {
List<Column> actualDroppedColumns = actual.getDroppedColumns();
public DropColumnEventAssert containsDroppedColumns(String... droppedColumns) {
List<String> actualDroppedColumns = actual.getDroppedColumnNames();
iterables.assertContainsExactlyInAnyOrder(info, actualDroppedColumns, droppedColumns);
return myself;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testApplySchemaChangeEvent() {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col4", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.BEFORE,
Column.physicalColumn("col3", DataTypes.STRING())));
"col3"));
addColumnEvent = new AddColumnEvent(tableId, addedColumns);
schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assert.assertEquals(
Expand All @@ -85,7 +85,7 @@ public void testApplySchemaChangeEvent() {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col5", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.AFTER,
Column.physicalColumn("col4", DataTypes.STRING())));
"col4"));
addColumnEvent = new AddColumnEvent(tableId, addedColumns);
schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assert.assertEquals(
Expand Down Expand Up @@ -120,11 +120,7 @@ public void testApplySchemaChangeEvent() {

// drop columns
DropColumnEvent dropColumnEvent =
new DropColumnEvent(
tableId,
Arrays.asList(
Column.physicalColumn("col3", DataTypes.STRING()),
Column.physicalColumn("col5", DataTypes.STRING())));
new DropColumnEvent(tableId, Arrays.asList("col3", "col5"));
schema = SchemaUtils.applySchemaChangeEvent(schema, dropColumnEvent);
Assert.assertEquals(
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ void testSingleSplitSingleTable() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ], after=[2, x], op=UPDATE, meta=()}");
}
Expand Down Expand Up @@ -198,12 +198,12 @@ void testSingleSplitMultipleTables() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,9 @@ private void applyAddColumnEvent(AddColumnEvent event)
private void applyDropColumnEvent(DropColumnEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
List<Column> droppedColumns = event.getDroppedColumns();
for (Column col : droppedColumns) {
schemaChangeManager.dropColumn(
tableId.getSchemaName(), tableId.getTableName(), col.getName());
List<String> droppedColumns = event.getDroppedColumnNames();
for (String col : droppedColumns) {
schemaChangeManager.dropColumn(tableId.getSchemaName(), tableId.getTableName(), col);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.ververica.cdc.common.event.RenameColumnEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.antlr.listener.AlterTableParserListener;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
Expand All @@ -35,7 +34,6 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -116,10 +114,7 @@ public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
new AddColumnEvent.ColumnWithPosition(
toCdcColumn(column),
AddColumnEvent.ColumnPosition.AFTER,
com.ververica.cdc.common.schema.Column
.physicalColumn(
afterColumn,
DataTypes.BIGINT())))));
afterColumn))));
} else {
changes.add(
new AddColumnEvent(
Expand Down Expand Up @@ -223,12 +218,7 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
@Override
public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) {
String removedColName = parser.parseName(ctx.uid());
changes.add(
new DropColumnEvent(
currentTable,
Arrays.asList(
com.ververica.cdc.common.schema.Column.physicalColumn(
removedColName, DataTypes.BIGINT()))));
changes.add(new DropColumnEvent(currentTable, Collections.singletonList(removedColName)));
super.enterAlterByDropColumn(ctx);
}

Expand Down
Loading