From e137bdf9570e968c1960a247716f2757d309f4ac Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Mon, 19 Aug 2024 15:38:57 +0800 Subject: [PATCH] hotfix: transform with schema evolution issue --- .../pipeline/tests/SchemaEvolveE2eITCase.java | 19 +++++++++++++++++-- .../SchemaRegistryRequestHandler.java | 3 +++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 801fd164e10..bc7d69cfa3e 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -192,8 +192,12 @@ public void testLenientSchemaEvolution() throws Exception { "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`biological_sex` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null, null, 16.0, null], op=INSERT, meta=()}", "TruncateTableEvent{tableId=%s.members}", - "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, null, null, 17.0, null], op=INSERT, meta=()}", - "DropTableEvent{tableId=%s.members}")); + "DataChangeEvent{tableId=%s.members, before=[], after=[1014, Gem, null, null, 17.0, null], op=INSERT, meta=()}")); + + assertNotExists( + Collections.singletonList( + "Applied schema change event DropTableEvent{tableId=%s.members}"), + taskManagerConsumer); } @Test @@ -404,6 +408,17 @@ private void validateResult(List expectedEvents, ToStringConsumer consum } } + private void assertNotExists(List unexpectedEvents, ToStringConsumer consumer) { + String consumerLog = consumer.toUtf8String(); + System.out.println(consumerLog); + for (String event : unexpectedEvents) { + System.out.println("Assuming there's no " + event + " in logs..."); + Assert.assertFalse( + consumerLog.contains( + String.format(event, schemaEvolveDatabase.getDatabaseName()))); + } + } + private void waitUntilSpecificEvent(String event, ToStringConsumer consumer) throws Exception { boolean result = false; long endTimeout = System.currentTimeMillis() + SchemaEvolveE2eITCase.EVENT_WAITING_TIMEOUT; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index aedf75c1812..50924ceba63 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -428,6 +428,9 @@ private List lenientizeSchemaChangeEvent(SchemaChangeEvent ev } return events; } + case DROP_TABLE: + // We don't drop any tables in Lenient mode. + return Collections.emptyList(); default: return Collections.singletonList(event); }