From deae29102439a29e73813e74bd239f81101d7f0f Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Sat, 2 Dec 2023 22:23:48 +0800 Subject: [PATCH] [pipeline-common] use column name to judge whether a column is existed in a specific schema. --- .../com/ververica/cdc/common/utils/SchemaUtils.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/SchemaUtils.java index b01952e616..b352c319f9 100644 --- a/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/com/ververica/cdc/common/utils/SchemaUtils.java @@ -89,7 +89,11 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema Preconditions.checkNotNull( columnWithPosition.getExistingColumn(), "existingColumn could not be null in BEFORE type AddColumnEvent"); - int index = columns.indexOf(columnWithPosition.getExistingColumn()); + List columnNames = + columns.stream().map(Column::getName).collect(Collectors.toList()); + int index = + columnNames.indexOf( + columnWithPosition.getExistingColumn().getName()); if (index < 0) { throw new IllegalArgumentException( columnWithPosition.getExistingColumn().getName() @@ -103,7 +107,11 @@ private static Schema applyAddColumnEvent(AddColumnEvent event, Schema oldSchema Preconditions.checkNotNull( columnWithPosition.getExistingColumn(), "existingColumn could not be null in AFTER type AddColumnEvent"); - int index = columns.indexOf(columnWithPosition.getExistingColumn()); + List columnNames = + columns.stream().map(Column::getName).collect(Collectors.toList()); + int index = + columnNames.indexOf( + columnWithPosition.getExistingColumn().getName()); if (index < 0) { throw new IllegalArgumentException( columnWithPosition.getExistingColumn().getName()