Skip to content

Commit

Permalink
[cdc-common][cdc-pipelines][cdc-runtime] Use name instead of column i…
Browse files Browse the repository at this point in the history
…n DropColumnEvent/AddColumnEvent.
  • Loading branch information
joyCurry30 committed Dec 18, 2023
1 parent 1839fb5 commit d16c153
Show file tree
Hide file tree
Showing 17 changed files with 70 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static class ColumnWithPosition implements Serializable {
private final ColumnPosition position;

/** The added column lies in the position relative to this column. */
private final @Nullable Column existingColumn;
private final @Nullable String existingColumn;

/** In the default scenario, we add fields at the end of the column. */
public ColumnWithPosition(Column addColumn) {
Expand All @@ -76,7 +76,7 @@ public ColumnWithPosition(Column addColumn) {
}

public ColumnWithPosition(
Column addColumn, ColumnPosition position, Column existingColumn) {
Column addColumn, ColumnPosition position, @Nullable String existingColumn) {
this.addColumn = addColumn;
this.position = position;
this.existingColumn = existingColumn;
Expand All @@ -90,7 +90,8 @@ public ColumnPosition getPosition() {
return position;
}

public Column getExistingColumn() {
@Nullable
public String getExistingColumn() {
return existingColumn;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.ververica.cdc.common.event;

import com.ververica.cdc.common.annotation.PublicEvolving;
import com.ververica.cdc.common.schema.Column;

import java.util.List;
import java.util.Objects;
Expand All @@ -33,16 +32,15 @@ public class DropColumnEvent implements SchemaChangeEvent {

private final TableId tableId;

private final List<Column> droppedColumns;
private final List<String> droppedColumnNames;

public DropColumnEvent(TableId tableId, List<Column> droppedColumns) {
public DropColumnEvent(TableId tableId, List<String> droppedColumnNames) {
this.tableId = tableId;
this.droppedColumns = droppedColumns;
this.droppedColumnNames = droppedColumnNames;
}

/** Returns the dropped columns. */
public List<Column> getDroppedColumns() {
return droppedColumns;
public List<String> getDroppedColumnNames() {
return droppedColumnNames;
}

@Override
Expand All @@ -55,21 +53,21 @@ public boolean equals(Object o) {
}
DropColumnEvent that = (DropColumnEvent) o;
return Objects.equals(tableId, that.tableId)
&& Objects.equals(droppedColumns, that.droppedColumns);
&& Objects.equals(droppedColumnNames, that.droppedColumnNames);
}

@Override
public int hashCode() {
return Objects.hash(tableId, droppedColumns);
return Objects.hash(tableId, droppedColumnNames);
}

@Override
public String toString() {
return "DropColumnEvent{"
+ "tableId="
+ tableId
+ ", droppedColumns="
+ droppedColumns
+ ", droppedColumnNames="
+ droppedColumnNames
+ '}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,10 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
"existingColumn could not be null in BEFORE type AddColumnEvent");
List<String> columnNames =
columns.stream().map(Column::getName).collect(Collectors.toList());
int index =
columnNames.indexOf(
columnWithPosition.getExistingColumn().getName());
int index = columnNames.indexOf(columnWithPosition.getExistingColumn());
if (index < 0) {
throw new IllegalArgumentException(
columnWithPosition.getExistingColumn().getName()
columnWithPosition.getExistingColumn()
+ " of AddColumnEvent is not existed");
}
columns.add(index, columnWithPosition.getAddColumn());
Expand All @@ -109,12 +107,10 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
"existingColumn could not be null in AFTER type AddColumnEvent");
List<String> columnNames =
columns.stream().map(Column::getName).collect(Collectors.toList());
int index =
columnNames.indexOf(
columnWithPosition.getExistingColumn().getName());
int index = columnNames.indexOf(columnWithPosition.getExistingColumn());
if (index < 0) {
throw new IllegalArgumentException(
columnWithPosition.getExistingColumn().getName()
columnWithPosition.getExistingColumn()
+ " of AddColumnEvent is not existed");
}
columns.add(index + 1, columnWithPosition.getAddColumn());
Expand All @@ -126,17 +122,19 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema
}

private static Schema applyDropColumnEvent(DropColumnEvent event, Schema oldSchema) {
event.getDroppedColumns()
event.getDroppedColumnNames()
.forEach(
column -> {
if (!oldSchema.getColumn(column.getName()).isPresent()) {
if (!oldSchema.getColumn(column).isPresent()) {
throw new IllegalArgumentException(
column.getName() + " of DropColumnEvent is not existed");
column + " of DropColumnEvent is not existed");
}
});
List<Column> columns =
oldSchema.getColumns().stream()
.filter((column -> !event.getDroppedColumns().contains(column)))
.filter(
(column ->
!event.getDroppedColumnNames().contains(column.getName())))
.collect(Collectors.toList());
return oldSchema.copy(columns);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.ververica.cdc.common.testutils.assertions;

import com.ververica.cdc.common.event.DropColumnEvent;
import com.ververica.cdc.common.schema.Column;
import org.assertj.core.internal.Iterables;

import java.util.List;
Expand All @@ -35,8 +34,8 @@ private DropColumnEventAssert(DropColumnEvent event) {
super(event, DropColumnEventAssert.class);
}

public DropColumnEventAssert containsDroppedColumns(Column... droppedColumns) {
List<Column> actualDroppedColumns = actual.getDroppedColumns();
public DropColumnEventAssert containsDroppedColumns(String... droppedColumns) {
List<String> actualDroppedColumns = actual.getDroppedColumnNames();
iterables.assertContainsExactlyInAnyOrder(info, actualDroppedColumns, droppedColumns);
return myself;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

/** A test for the {@link SchemaUtils}. */
public class SchemaUtilsTest {
Expand Down Expand Up @@ -67,7 +63,7 @@ public void testApplySchemaChangeEvent() {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col4", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.BEFORE,
Column.physicalColumn("col3", DataTypes.STRING())));
"col3"));
addColumnEvent = new AddColumnEvent(tableId, addedColumns);
schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assert.assertEquals(
Expand All @@ -85,7 +81,7 @@ public void testApplySchemaChangeEvent() {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col5", DataTypes.STRING()),
AddColumnEvent.ColumnPosition.AFTER,
Column.physicalColumn("col4", DataTypes.STRING())));
"col4"));
addColumnEvent = new AddColumnEvent(tableId, addedColumns);
schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
Assert.assertEquals(
Expand Down Expand Up @@ -120,11 +116,7 @@ public void testApplySchemaChangeEvent() {

// drop columns
DropColumnEvent dropColumnEvent =
new DropColumnEvent(
tableId,
Arrays.asList(
Column.physicalColumn("col3", DataTypes.STRING()),
Column.physicalColumn("col5", DataTypes.STRING())));
new DropColumnEvent(tableId, Arrays.asList("col3", "col5"));
schema = SchemaUtils.applySchemaChangeEvent(schema, dropColumnEvent);
Assert.assertEquals(
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,9 @@ private void applyAddColumnEvent(AddColumnEvent event)
private void applyDropColumnEvent(DropColumnEvent event)
throws IOException, IllegalArgumentException {
TableId tableId = event.tableId();
List<Column> droppedColumns = event.getDroppedColumns();
for (Column col : droppedColumns) {
schemaChangeManager.dropColumn(
tableId.getSchemaName(), tableId.getTableName(), col.getName());
List<String> droppedColumns = event.getDroppedColumnNames();
for (String col : droppedColumns) {
schemaChangeManager.dropColumn(tableId.getSchemaName(), tableId.getTableName(), col);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -223,12 +222,7 @@ public void exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
@Override
public void enterAlterByDropColumn(MySqlParser.AlterByDropColumnContext ctx) {
String removedColName = parser.parseName(ctx.uid());
changes.add(
new DropColumnEvent(
currentTable,
Arrays.asList(
com.ververica.cdc.common.schema.Column.physicalColumn(
removedColName, DataTypes.BIGINT()))));
changes.add(new DropColumnEvent(currentTable, Collections.singletonList(removedColName)));
super.enterAlterByDropColumn(ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,15 +479,15 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
expected.add(
new AddColumnEvent(
tableId,
Arrays.asList(
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col1", DataTypes.VARCHAR(45)),
AddColumnEvent.ColumnPosition.AFTER,
Column.physicalColumn("weight", DataTypes.BIGINT())))));
expected.add(
new AddColumnEvent(
tableId,
Arrays.asList(
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("col2", DataTypes.VARCHAR(55)),
AddColumnEvent.ColumnPosition.AFTER,
Expand All @@ -497,11 +497,7 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
String.format(
"ALTER TABLE `%s`.`products` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;",
inventoryDatabase.getDatabaseName()));
expected.add(
new DropColumnEvent(
tableId,
Collections.singletonList(
Column.physicalColumn("desc2", DataTypes.BIGINT()))));
expected.add(new DropColumnEvent(tableId, Collections.singletonList("desc2")));
expected.add(
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("desc1", DataTypes.VARCHAR(65))));
Expand All @@ -517,11 +513,7 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
String.format(
"ALTER TABLE `%s`.`products` DROP COLUMN `DESC3`;",
inventoryDatabase.getDatabaseName()));
expected.add(
new DropColumnEvent(
tableId,
Collections.singletonList(
Column.physicalColumn("DESC3", DataTypes.BIGINT()))));
expected.add(new DropColumnEvent(tableId, Collections.singletonList("DESC3")));
return expected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import static com.ververica.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType;

Expand Down Expand Up @@ -187,10 +186,7 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) {
}

private void applyDropColumn(DropColumnEvent dropColumnEvent) {
List<String> dropColumns =
dropColumnEvent.getDroppedColumns().stream()
.map(Column::getName)
.collect(Collectors.toList());
List<String> dropColumns = dropColumnEvent.getDroppedColumnNames();
TableId tableId = dropColumnEvent.tableId();
StarRocksCatalogException alterException = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ private static class ValuesTable {

private final LinkedList<Column> columns;

private List<String> columNames;

private final List<String> primaryKeys;

// indexes of primaryKeys in columns
Expand All @@ -195,6 +197,7 @@ public ValuesTable(TableId tableId, Schema schema) {
this.tableId = tableId;
this.lock = new Object();
this.columns = new LinkedList<>(schema.getColumns());
this.columNames = new LinkedList<>(schema.getColumnNames());
this.records = new HashMap<>();
this.primaryKeys = new LinkedList<>(schema.primaryKeys());
this.primaryKeyIndexes = new ArrayList<>();
Expand Down Expand Up @@ -346,11 +349,11 @@ private void applyAddColumnEvent(AddColumnEvent event) {
}

private void applyDropColumnEvent(DropColumnEvent event) {
for (Column column : event.getDroppedColumns()) {
if (!columns.remove(column)) {
throw new IllegalArgumentException(column.getName() + " is not existed");
for (String columnName : event.getDroppedColumnNames()) {
if (!columNames.remove(columnName)) {
throw new IllegalArgumentException(columnName + " is not existed");
}
records.forEach((key, record) -> record.remove(column.getName()));
records.forEach((key, record) -> record.remove(columnName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private SchemaChangeEvent recreateSchemaChangeEvent(
}
if (schemaChangeEvent instanceof DropColumnEvent) {
DropColumnEvent dropColumnEvent = (DropColumnEvent) schemaChangeEvent;
return new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumns());
return new DropColumnEvent(tableId, dropColumnEvent.getDroppedColumnNames());
}
if (schemaChangeEvent instanceof AddColumnEvent) {
AddColumnEvent addColumnEvent = (AddColumnEvent) schemaChangeEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@

import com.ververica.cdc.common.event.DropColumnEvent;
import com.ververica.cdc.common.event.TableId;
import com.ververica.cdc.common.schema.Column;
import com.ververica.cdc.runtime.serializer.ListSerializer;
import com.ververica.cdc.runtime.serializer.StringSerializer;
import com.ververica.cdc.runtime.serializer.TableIdSerializer;
import com.ververica.cdc.runtime.serializer.TypeSerializerSingleton;
import com.ververica.cdc.runtime.serializer.schema.ColumnSerializer;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -42,8 +41,8 @@ public class DropColumnEventSerializer extends TypeSerializerSingleton<DropColum
public static final DropColumnEventSerializer INSTANCE = new DropColumnEventSerializer();

private final TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
private final ListSerializer<Column> columnsSerializer =
new ListSerializer<>(ColumnSerializer.INSTANCE);
private final ListSerializer<String> columnNamesSerializer =
new ListSerializer<>(StringSerializer.INSTANCE);

@Override
public boolean isImmutableType() {
Expand All @@ -58,7 +57,7 @@ public DropColumnEvent createInstance() {
@Override
public DropColumnEvent copy(DropColumnEvent from) {
return new DropColumnEvent(
from.tableId(), columnsSerializer.copy(from.getDroppedColumns()));
from.tableId(), columnNamesSerializer.copy(from.getDroppedColumnNames()));
}

@Override
Expand All @@ -74,13 +73,13 @@ public int getLength() {
@Override
public void serialize(DropColumnEvent record, DataOutputView target) throws IOException {
tableIdSerializer.serialize(record.tableId(), target);
columnsSerializer.serialize(record.getDroppedColumns(), target);
columnNamesSerializer.serialize(record.getDroppedColumnNames(), target);
}

@Override
public DropColumnEvent deserialize(DataInputView source) throws IOException {
return new DropColumnEvent(
tableIdSerializer.deserialize(source), columnsSerializer.deserialize(source));
tableIdSerializer.deserialize(source), columnNamesSerializer.deserialize(source));
}

@Override
Expand Down
Loading

0 comments on commit d16c153

Please sign in to comment.