Skip to content

Commit

Permalink
[cdc-common][cdc-pipelines][cdc-runtime] Use name instead of column i…
Browse files Browse the repository at this point in the history
…n DropColumnEvent/AddColumnEvent.
  • Loading branch information
joyCurry30 committed Dec 27, 2023
1 parent a0df108 commit 53aaaf4
Show file tree
Hide file tree
Showing 27 changed files with 133 additions and 190 deletions.
4 changes: 0 additions & 4 deletions .github/action/get-workflow-origin/__tests__/main.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
import * as process from 'process'
import * as cp from 'child_process'
import * as path from 'path'

test('no op', () => {})
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
import * as process from 'process'
import * as cp from 'child_process'
import * as path from 'path'

test('no op', () => {})
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,8 @@ void testMissingFlinkHome() {

@Test
void testMissingFlinkHomeWhenUseMiniCluster() throws Exception {
CliExecutor executor =
createExecutor(
pipelineDef(),
"--use-mini-cluster",
"true");
assertThat(Objects.nonNull(executor.getComposer())).isTrue();
CliExecutor executor = createExecutor(pipelineDef(), "--use-mini-cluster", "true");
assertThat(Objects.nonNull(executor.getComposer())).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,20 @@ public static class ColumnWithPosition implements Serializable {
private final ColumnPosition position;

/** The added column lies in the position relative to this column. */
private final @Nullable Column existingColumn;
private final @Nullable String existedColumnName;

/** In the default scenario, we add fields at the end of the column. */
public ColumnWithPosition(Column addColumn) {
this.addColumn = addColumn;
position = ColumnPosition.LAST;
existingColumn = null;
existedColumnName = null;
}

public ColumnWithPosition(
Column addColumn, ColumnPosition position, Column existingColumn) {
Column addColumn, ColumnPosition position, @Nullable String existedColumnName) {
this.addColumn = addColumn;
this.position = position;
this.existingColumn = existingColumn;
this.existedColumnName = existedColumnName;
}

public Column getAddColumn() {
Expand All @@ -90,8 +90,9 @@ public ColumnPosition getPosition() {
return position;
}

public Column getExistingColumn() {
return existingColumn;
@Nullable
public String getExistedColumnName() {
return existedColumnName;
}

@Override
Expand All @@ -105,12 +106,12 @@ public boolean equals(Object o) {
ColumnWithPosition position1 = (ColumnWithPosition) o;
return Objects.equals(addColumn, position1.addColumn)
&& position == position1.position
&& Objects.equals(existingColumn, position1.existingColumn);
&& Objects.equals(existedColumnName, position1.existedColumnName);
}

@Override
public int hashCode() {
return Objects.hash(addColumn, position, existingColumn);
return Objects.hash(addColumn, position, existedColumnName);
}

@Override
Expand All @@ -120,8 +121,8 @@ public String toString() {
+ addColumn
+ ", position="
+ position
+ ", existingColumn="
+ existingColumn
+ ", existedColumnName="
+ existedColumnName
+ '}';
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.ververica.cdc.common.event;

import com.ververica.cdc.common.annotation.PublicEvolving;
import com.ververica.cdc.common.schema.Column;

import java.util.List;
import java.util.Objects;
Expand All @@ -33,16 +32,15 @@ public class DropColumnEvent implements SchemaChangeEvent {

private final TableId tableId;

private final List<Column> droppedColumns;
private final List<String> droppedColumnNames;

public DropColumnEvent(TableId tableId, List<Column> droppedColumns) {
public DropColumnEvent(TableId tableId, List<String> droppedColumnNames) {
this.tableId = tableId;
this.droppedColumns = droppedColumns;
this.droppedColumnNames = droppedColumnNames;
}

/** Returns the dropped columns. */
public List<Column> getDroppedColumns() {
return droppedColumns;
public List<String> getDroppedColumnNames() {
return droppedColumnNames;
}

@Override
Expand All @@ -55,21 +53,21 @@ public boolean equals(Object o) {
}
DropColumnEvent that = (DropColumnEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(droppedColumns, that.droppedColumns);
&& Objects.equals(droppedColumnNames, that.droppedColumnNames);
}

@Override
public int hashCode() {
return Objects.hash(tableId, droppedColumns);
return Objects.hash(tableId, droppedColumnNames);
}

@Override
public String toString() {
return "DropColumnEvent{"
+ "tableId="
+ tableId
+ ", droppedColumns="
+ droppedColumns
+ ", droppedColumnNames="
+ droppedColumnNames
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,14 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
case BEFORE:
{
Preconditions.checkNotNull(
columnWithPosition.getExistingColumn(),
"existingColumn could not be null in BEFORE type AddColumnEvent");
columnWithPosition.getExistedColumnName(),
"existedColumnName could not be null in BEFORE type AddColumnEvent");
List<String> columnNames =
columns.stream().map(Column::getName).collect(Collectors.toList());
int index =
columnNames.indexOf(
columnWithPosition.getExistingColumn().getName());
int index = columnNames.indexOf(columnWithPosition.getExistedColumnName());
if (index < 0) {
throw new IllegalArgumentException(
columnWithPosition.getExistingColumn().getName()
columnWithPosition.getExistedColumnName()
+ " of AddColumnEvent is not existed");
}
columns.add(index, columnWithPosition.getAddColumn());
Expand All @@ -105,16 +103,14 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
case AFTER:
{
Preconditions.checkNotNull(
columnWithPosition.getExistingColumn(),
"existingColumn could not be null in AFTER type AddColumnEvent");
columnWithPosition.getExistedColumnName(),
"existedColumnName could not be null in AFTER type AddColumnEvent");
List<String> columnNames =
columns.stream().map(Column::getName).collect(Collectors.toList());
int index =
columnNames.indexOf(
columnWithPosition.getExistingColumn().getName());
int index = columnNames.indexOf(columnWithPosition.getExistedColumnName());
if (index < 0) {
throw new IllegalArgumentException(
columnWithPosition.getExistingColumn().getName()
columnWithPosition.getExistedColumnName()
+ " of AddColumnEvent is not existed");
}
columns.add(index + 1, columnWithPosition.getAddColumn());
Expand All @@ -126,17 +122,19 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
}

private static Schema applyDropColumnEvent(DropColumnEvent event, Schema oldSchema) {
event.getDroppedColumns()
event.getDroppedColumnNames()
.forEach(
column -> {
if (!oldSchema.getColumn(column.getName()).isPresent()) {
if (!oldSchema.getColumn(column).isPresent()) {
throw new IllegalArgumentException(
column.getName() + " of DropColumnEvent is not existed");
column + " of DropColumnEvent is not existed");
}
});
List<Column> columns =
oldSchema.getColumns().stream()
.filter((column -> !event.getDroppedColumns().contains(column)))
.filter(
(column ->
!event.getDroppedColumnNames().contains(column.getName())))
.collect(Collectors.toList());
return oldSchema.copy(columns);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.ververica.cdc.common.testutils.assertions;

import com.ververica.cdc.common.event.DropColumnEvent;
import com.ververica.cdc.common.schema.Column;
import org.assertj.core.internal.Iterables;

import java.util.List;
Expand All @@ -35,8 +34,8 @@ private DropColumnEventAssert(DropColumnEvent event) {
super(event, DropColumnEventAssert.class);
}

public DropColumnEventAssert containsDroppedColumns(Column... droppedColumns) {
List<Column> actualDroppedColumns = actual.getDroppedColumns();
public DropColumnEventAssert containsDroppedColumns(String... droppedColumns) {
List<String> actualDroppedColumns = actual.getDroppedColumnNames();
iterables.assertContainsExactlyInAnyOrder(info, actualDroppedColumns, droppedColumns);
return myself;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testApplySchemaChangeEvent() {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col4", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.BEFORE,
Column.physicalColumn("col3", DataTypes.STRING())));
"col3"));
addColumnEvent = new AddColumnEvent(tableId, addedColumns);
schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assert.assertEquals(
Expand All @@ -85,7 +85,7 @@ public void testApplySchemaChangeEvent() {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col5", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.AFTER,
Column.physicalColumn("col4", DataTypes.STRING())));
"col4"));
addColumnEvent = new AddColumnEvent(tableId, addedColumns);
schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assert.assertEquals(
Expand Down Expand Up @@ -120,11 +120,7 @@ public void testApplySchemaChangeEvent() {

// drop columns
DropColumnEvent dropColumnEvent =
new DropColumnEvent(
tableId,
Arrays.asList(
Column.physicalColumn("col3", DataTypes.STRING()),
Column.physicalColumn("col5", DataTypes.STRING())));
new DropColumnEvent(tableId, Arrays.asList("col3", "col5"));
schema = SchemaUtils.applySchemaChangeEvent(schema, dropColumnEvent);
Assert.assertEquals(
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ void testSingleSplitSingleTable() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ], after=[2, x], op=UPDATE, meta=()}");
}
Expand Down Expand Up @@ -200,12 +200,12 @@ void testSingleSplitMultipleTables() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], after=[3, 3], op=INSERT, meta=()}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existingColumn=null}]}",
"AddColumnEvent{tableId=default_namespace.default_schema.table1, addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[1, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[2, 2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table2, before=[], after=[3, 3], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, nameMapping={col2=newCol2, col3=newCol3}}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumns=[`newCol2` STRING]}",
"DropColumnEvent{tableId=default_namespace.default_schema.table1, droppedColumnNames=[newCol2]}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1], after=[], op=DELETE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 2], after=[2, x], op=UPDATE, meta=()}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,9 @@ private void applyAddColumnEvent(AddColumnEvent event)
private void applyDropColumnEvent(DropColumnEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
List<Column> droppedColumns = event.getDroppedColumns();
for (Column col : droppedColumns) {
schemaChangeManager.dropColumn(
tableId.getSchemaName(), tableId.getTableName(), col.getName());
List<String> droppedColumns = event.getDroppedColumnNames();
for (String col : droppedColumns) {
schemaChangeManager.dropColumn(tableId.getSchemaName(), tableId.getTableName(), col);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.ververica.cdc.common.event.RenameColumnEvent;
import com.ververica.cdc.common.event.SchemaChangeEvent;
import com.ververica.cdc.common.types.DataType;
import com.ververica.cdc.common.types.DataTypes;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.antlr.listener.AlterTableParserListener;
import io.debezium.ddl.parser.mysql.generated.MySqlParser;
Expand All @@ -35,7 +34,6 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -116,10 +114,7 @@ public void exitAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
new AddColumnEvent.ColumnWithPosition(
toCdcColumn(column),
AddColumnEvent.ColumnPosition.AFTER,
com.ververica.cdc.common.schema.Column
.physicalColumn(
afterColumn,
DataTypes.BIGINT())))));
afterColumn))));
} else {
changes.add(
new AddColumnEvent(
Expand Down Expand Up @@ -223,12 +218,7 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
@Override
public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) {
String removedColName = parser.parseName(ctx.uid());
changes.add(
new DropColumnEvent(
currentTable,
Arrays.asList(
com.ververica.cdc.common.schema.Column.physicalColumn(
removedColName, DataTypes.BIGINT()))));
changes.add(new DropColumnEvent(currentTable, Collections.singletonList(removedColName)));
super.enterAlterByDropColumn(ctx);
}

Expand Down
Loading

0 comments on commit 53aaaf4

Please sign in to comment.