diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java index fc5b482f14..52950fb322 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java @@ -24,9 +24,9 @@ /** An exception occurred during schema evolution. */ public class SchemaEvolveException extends FlinkRuntimeException { - private final SchemaChangeEvent applyingEvent; - private final String exceptionMessage; - private final @Nullable Throwable cause; + protected final SchemaChangeEvent applyingEvent; + protected final String exceptionMessage; + protected final @Nullable Throwable cause; public SchemaEvolveException(SchemaChangeEvent applyingEvent, String exceptionMessage) { this(applyingEvent, exceptionMessage, null); diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java index 4be20525ab..55d53e7d91 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java @@ -19,10 +19,36 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import javax.annotation.Nullable; + /** A special kind of {@link SchemaEvolveException} that sink doesn't support such event type. */ public class UnsupportedSchemaChangeEventException extends SchemaEvolveException { public UnsupportedSchemaChangeEventException(SchemaChangeEvent applyingEvent) { - super(applyingEvent, "Sink doesn't support such schema change event.", null); + this(applyingEvent, "Sink doesn't support such schema change event."); + } + + public UnsupportedSchemaChangeEventException( + SchemaChangeEvent applyingEvent, String exceptionMessage) { + this(applyingEvent, exceptionMessage, null); + } + + public UnsupportedSchemaChangeEventException( + SchemaChangeEvent applyingEvent, String exceptionMessage, @Nullable Throwable cause) { + super(applyingEvent, exceptionMessage, cause); + } + + @Override + public String toString() { + return "UnsupportedSchemaChangeEventException{" + + "applyingEvent=" + + applyingEvent + + ", exceptionMessage='" + + exceptionMessage + + '\'' + + ", cause='" + + cause + + '\'' + + '}'; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java index 5ee181985d..cad3b37fc1 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplier.java @@ -221,7 +221,7 @@ private void applyAddColumnEvent(AddColumnEvent event) throws SchemaEvolveExcept tableId.getSchemaName(), tableId.getTableName(), addFieldSchema); } } catch (Exception e) { - throw new SchemaEvolveException(event, e.getMessage(), e); + throw new SchemaEvolveException(event, "fail to apply add column event", e); } } @@ -234,7 +234,7 @@ private void applyDropColumnEvent(DropColumnEvent event) throws SchemaEvolveExce tableId.getSchemaName(), tableId.getTableName(), col); } } catch (Exception e) { - throw new SchemaEvolveException(event, e.getMessage(), e); + throw new SchemaEvolveException(event, "fail to apply drop column event", e); } } @@ -250,7 +250,7 @@ private void applyRenameColumnEvent(RenameColumnEvent event) throws SchemaEvolve entry.getValue()); } } catch (Exception e) { - throw new SchemaEvolveException(event, e.getMessage(), e); + throw new SchemaEvolveException(event, "fail to apply rename column event", e); } } @@ -272,7 +272,7 @@ private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event) // will be fixed after FLINK-35243 got merged. } } catch (Exception e) { - throw new SchemaEvolveException(event, e.getMessage(), e); + throw new SchemaEvolveException(event, "fail to apply alter column type event", e); } } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java index dfea5d0715..0004961f38 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java @@ -31,6 +31,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -152,7 +153,7 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) tableId, ((CreateTableEvent) schemaChangeEvent).getSchema())); } } else { - throw new SchemaEvolveException( + throw new UnsupportedSchemaChangeEventException( schemaChangeEvent, "Rejected schema change event since error.on.schema.change is enabled.", null); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index 3ce48bce23..e340468f38 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -341,7 +341,7 @@ private void waitUntilSpecificEvent(String event) throws Exception { long endTimeout = System.currentTimeMillis() + MysqlE2eITCase.EVENT_WAITING_TIMEOUT; while (System.currentTimeMillis() < endTimeout) { String stdout = taskManagerConsumer.toUtf8String(); - if (stdout.contains(event)) { + if (stdout.contains(event + "\n")) { result = true; break; } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java index 8b3c907b89..0834ce6e77 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java @@ -800,7 +800,7 @@ public void testReplacementSymbol() throws Exception { "DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}", - "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}", + "AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}", "RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}", "DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", "DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA, droppedColumnNames=[VERSION]}", @@ -818,7 +818,7 @@ private void waitUntilSpecificEvent(String event) throws Exception { long endTimeout = System.currentTimeMillis() + EVENT_DEFAULT_TIMEOUT; while (System.currentTimeMillis() < endTimeout) { String stdout = taskManagerConsumer.toUtf8String(); - if (stdout.contains(event)) { + if (stdout.contains(event + "\n")) { result = true; break; } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 8225558fcc..14c6a5912d 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -113,9 +113,9 @@ public void testSchemaEvolveWithIncompatibleChanges() throws Exception { true, false, false, - Collections.singletonList( - "java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\""), - Collections.singletonList( + Collections.emptyList(), + Arrays.asList( + "java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"", "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy")); } @@ -126,11 +126,10 @@ public void testSchemaEvolveWithException() throws Exception { false, true, false, - Collections.singletonList( - "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}"), + Collections.emptyList(), Arrays.asList( - "Failed to apply schema change event AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}.", - "SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", + "Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", + "UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy")); } @@ -146,8 +145,8 @@ public void testSchemaTryEvolveWithException() throws Exception { "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null], op=INSERT, meta=()}"), Arrays.asList( - "Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members.", - "SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}")); + "Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", + "UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}")); } @Test @@ -185,7 +184,7 @@ public void testLenientSchemaEvolution() throws Exception { false, false, Arrays.asList( - "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", + "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", "AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}", @@ -369,7 +368,7 @@ private void testGenericSchemaEvolution( List expectedJmEvents = expectedJobManagerEvents.stream() - .map(s -> String.format(s, dbName, dbName)) + .map(s -> String.format(s, dbName, dbName, dbName)) .collect(Collectors.toList()); validateResult(expectedJmEvents, jobManagerConsumer); @@ -422,7 +421,7 @@ private void waitUntilSpecificEvent(String event, ToStringConsumer consumer) thr long endTimeout = System.currentTimeMillis() + SchemaEvolveE2eITCase.EVENT_WAITING_TIMEOUT; while (System.currentTimeMillis() < endTimeout) { String stdout = consumer.toUtf8String(); - if (stdout.contains(event)) { + if (stdout.contains(event + "\n")) { result = true; break; } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index e279771533..6f612ca951 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -736,7 +736,7 @@ private void waitUntilSpecificEvent(String event, long timeout) throws Exception long endTimeout = System.currentTimeMillis() + timeout; while (System.currentTimeMillis() < endTimeout) { String stdout = taskManagerConsumer.toUtf8String(); - if (stdout.contains(event)) { + if (stdout.contains(event + "\n")) { result = true; break; } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java index c965e4dec7..938e2d98ec 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java @@ -386,7 +386,7 @@ private void waitUntilSpecificEvent(String event, long timeout) throws Exception long endTimeout = System.currentTimeMillis() + timeout; while (System.currentTimeMillis() < endTimeout) { String stdout = taskManagerConsumer.toUtf8String(); - if (stdout.contains(event)) { + if (stdout.contains(event + "\n")) { result = true; break; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index d3a9e158b6..a700fd39ce 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.runtime.operators.schema; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.VisibleForTesting; @@ -29,7 +28,6 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.schema.Column; @@ -40,8 +38,6 @@ import org.apache.flink.cdc.common.types.DataTypeRoot; import org.apache.flink.cdc.common.utils.ChangeEventUtils; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils; import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest; import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; @@ -431,56 +427,9 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh ReleaseUpstreamResponse schemaEvolveResponse = requestReleaseUpstream(); List finishedSchemaChangeEvents = schemaEvolveResponse.getFinishedSchemaChangeEvents(); - List> failedSchemaChangeEvents = - schemaEvolveResponse.getFailedSchemaChangeEvents(); - List ignoredSchemaChangeEvents = - schemaEvolveResponse.getIgnoredSchemaChangeEvents(); - - if (schemaChangeBehavior == SchemaChangeBehavior.EVOLVE - || schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION) { - if (schemaEvolveResponse.hasException()) { - throw new RuntimeException( - String.format( - "Failed to apply schema change event %s.\nExceptions: %s", - schemaChangeEvent, - schemaEvolveResponse.getPrintableFailedSchemaChangeEvents())); - } - } else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE - || schemaChangeBehavior == SchemaChangeBehavior.LENIENT - || schemaChangeBehavior == SchemaChangeBehavior.IGNORE) { - if (schemaEvolveResponse.hasException()) { - schemaEvolveResponse - .getFailedSchemaChangeEvents() - .forEach( - e -> - LOG.warn( - "Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}", - e.f0, - e.f1)); - } - } else { - throw new SchemaEvolveException( - schemaChangeEvent, - "Unexpected schema change behavior: " + schemaChangeBehavior); - } // Update evolved schema changes based on apply results - requestApplyEvolvedSchemaChanges(tableId, finishedSchemaChangeEvents); finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e))); - - LOG.info( - "Applied schema change event {} to downstream. Among {} total evolved events, {} succeeded, {} failed, and {} ignored.", - schemaChangeEvent, - expectedSchemaChangeEvents.size(), - finishedSchemaChangeEvents.size(), - failedSchemaChangeEvents.size(), - ignoredSchemaChangeEvents.size()); - - schemaOperatorMetrics.increaseFinishedSchemaChangeEvents( - finishedSchemaChangeEvents.size()); - schemaOperatorMetrics.increaseFailedSchemaChangeEvents(failedSchemaChangeEvents.size()); - schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents( - ignoredSchemaChangeEvents.size()); } } @@ -489,16 +438,6 @@ private SchemaChangeResponse requestSchemaChange( return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent)); } - private void requestApplyOriginalSchemaChanges( - TableId tableId, SchemaChangeEvent schemaChangeEvent) { - sendRequestToCoordinator(new ApplyOriginalSchemaChangeRequest(tableId, schemaChangeEvent)); - } - - private void requestApplyEvolvedSchemaChanges( - TableId tableId, List schemaChangeEvents) { - sendRequestToCoordinator(new ApplyEvolvedSchemaChangeRequest(tableId, schemaChangeEvents)); - } - private ReleaseUpstreamResponse requestReleaseUpstream() throws InterruptedException, TimeoutException { CoordinationResponse coordinationResponse = @@ -538,7 +477,7 @@ private Schema getLatestEvolvedSchema(TableId tableId) { return optionalSchema.get(); } catch (Exception e) { throw new IllegalStateException( - String.format("Unable to get latest schema for table \"%s\"", tableId)); + String.format("Unable to get latest schema for table \"%s\"", tableId), e); } } @@ -553,7 +492,7 @@ private Schema getLatestOriginalSchema(TableId tableId) { return optionalSchema.get(); } catch (Exception e) { throw new IllegalStateException( - String.format("Unable to get latest schema for table \"%s\"", tableId)); + String.format("Unable to get latest schema for table \"%s\"", tableId), e); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index cc7c682070..9087ae4b36 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -17,15 +17,13 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeResponse; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeResponse; import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent; import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest; import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse; @@ -151,18 +149,23 @@ public void close() throws Exception { @Override public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception { - if (event instanceof FlushSuccessEvent) { - FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event; - LOG.info( - "Sink subtask {} succeed flushing for table {}.", - flushSuccessEvent.getSubtask(), - flushSuccessEvent.getTableId().toString()); - requestHandler.flushSuccess( - flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask()); - } else if (event instanceof SinkWriterRegisterEvent) { - requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask()); - } else { - throw new FlinkException("Unrecognized Operator Event: " + event); + try { + if (event instanceof FlushSuccessEvent) { + FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event; + LOG.info( + "Sink subtask {} succeed flushing for table {}.", + flushSuccessEvent.getSubtask(), + flushSuccessEvent.getTableId().toString()); + requestHandler.flushSuccess( + flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask()); + } else if (event instanceof SinkWriterRegisterEvent) { + requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask()); + } else { + throw new FlinkException("Unrecognized Operator Event: " + event); + } + } catch (Throwable t) { + context.failJob(t); + throw t; } } @@ -180,6 +183,9 @@ public void checkpointCoordinator(long checkpointId, CompletableFuture r // Serialize SchemaDerivation mapping SchemaDerivation.serializeDerivationMapping(schemaDerivation, out); resultFuture.complete(baos.toByteArray()); + } catch (Throwable t) { + context.failJob(t); + throw t; } } @@ -191,33 +197,29 @@ public void notifyCheckpointComplete(long checkpointId) { @Override public CompletableFuture handleCoordinationRequest( CoordinationRequest request) { - if (request instanceof SchemaChangeRequest) { - SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request; - return requestHandler.handleSchemaChangeRequest(schemaChangeRequest); - } else if (request instanceof ReleaseUpstreamRequest) { - return requestHandler.handleReleaseUpstreamRequest(); - } else if (request instanceof GetEvolvedSchemaRequest) { - return CompletableFuture.completedFuture( - wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request)))); - } else if (request instanceof GetOriginalSchemaRequest) { - return CompletableFuture.completedFuture( - wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request))); - } else if (request instanceof ApplyOriginalSchemaChangeRequest) { - return CompletableFuture.completedFuture( - wrap( - handleApplyOriginalSchemaChangeRequest( - (ApplyOriginalSchemaChangeRequest) request))); - } else if (request instanceof ApplyEvolvedSchemaChangeRequest) { - return CompletableFuture.completedFuture( - wrap( - handleApplyEvolvedSchemaChangeRequest( - (ApplyEvolvedSchemaChangeRequest) request))); - } else if (request instanceof SchemaChangeResultRequest) { - return requestHandler.getSchemaChangeResult(); - } else if (request instanceof RefreshPendingListsRequest) { - return requestHandler.refreshPendingLists(); - } else { - throw new IllegalArgumentException("Unrecognized CoordinationRequest type: " + request); + try { + if (request instanceof SchemaChangeRequest) { + SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request; + return requestHandler.handleSchemaChangeRequest(schemaChangeRequest); + } else if (request instanceof ReleaseUpstreamRequest) { + return requestHandler.handleReleaseUpstreamRequest(); + } else if (request instanceof GetEvolvedSchemaRequest) { + return CompletableFuture.completedFuture( + wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request)))); + } else if (request instanceof GetOriginalSchemaRequest) { + return CompletableFuture.completedFuture( + wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request))); + } else if (request instanceof SchemaChangeResultRequest) { + return requestHandler.getSchemaChangeResult(); + } else if (request instanceof RefreshPendingListsRequest) { + return requestHandler.refreshPendingLists(); + } else { + throw new IllegalArgumentException( + "Unrecognized CoordinationRequest type: " + request); + } + } catch (Throwable t) { + context.failJob(t); + throw t; } } @@ -275,6 +277,9 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData throw new IOException( "Unrecognized serialization version " + schemaManagerSerializerVersion); } + } catch (Throwable t) { + context.failJob(t); + throw t; } } @@ -342,18 +347,15 @@ private GetOriginalSchemaResponse handleGetOriginalSchemaRequest( } } - private ApplyOriginalSchemaChangeResponse handleApplyOriginalSchemaChangeRequest( - ApplyOriginalSchemaChangeRequest applyOriginalSchemaChangeRequest) { - schemaManager.applyOriginalSchemaChange( - applyOriginalSchemaChangeRequest.getSchemaChangeEvent()); - return new ApplyOriginalSchemaChangeResponse(); + // --------------------Only visible for test ----------------- + + @VisibleForTesting + public void handleApplyOriginalSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) { + schemaManager.applyOriginalSchemaChange(schemaChangeEvent); } - private ApplyEvolvedSchemaChangeResponse handleApplyEvolvedSchemaChangeRequest( - ApplyEvolvedSchemaChangeRequest applyEvolvedSchemaChangeRequest) { - applyEvolvedSchemaChangeRequest - .getSchemaChangeEvent() - .forEach(schemaManager::applyEvolvedSchemaChange); - return new ApplyEvolvedSchemaChangeResponse(); + @VisibleForTesting + public void handleApplyEvolvedSchemaChangeRequest(SchemaChangeEvent schemaChangeEvent) { + schemaManager.applyEvolvedSchemaChange(schemaChangeEvent); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index c47d556f64..a84ee8d63a 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; @@ -28,7 +27,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema; import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; @@ -88,7 +87,6 @@ public class SchemaRegistryRequestHandler implements Closeable { private final List pendingSchemaChanges; private final List finishedSchemaChanges; - private final List> failedSchemaChanges; private final List ignoredSchemaChanges; /** Sink writers which have sent flush success events for the request. */ @@ -96,6 +94,8 @@ public class SchemaRegistryRequestHandler implements Closeable { /** Status of the execution of current schema change request. */ private volatile boolean isSchemaChangeApplying; + /** Actual exception if failed to apply schema change. */ + private volatile Throwable schemaChangeException; /** Executor service to execute schema change. */ private final ExecutorService schemaChangeThreadPool; @@ -111,7 +111,6 @@ public SchemaRegistryRequestHandler( this.flushedSinkWriters = new HashSet<>(); this.pendingSchemaChanges = new LinkedList<>(); this.finishedSchemaChanges = new LinkedList<>(); - this.failedSchemaChanges = new LinkedList<>(); this.ignoredSchemaChanges = new LinkedList<>(); this.schemaManager = schemaManager; this.schemaDerivation = schemaDerivation; @@ -129,8 +128,8 @@ public SchemaRegistryRequestHandler( private void applySchemaChange( TableId tableId, List derivedSchemaChangeEvents) { isSchemaChangeApplying = true; + schemaChangeException = null; finishedSchemaChanges.clear(); - failedSchemaChanges.clear(); ignoredSchemaChanges.clear(); for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) { @@ -147,17 +146,27 @@ private void applySchemaChange( try { metadataApplier.applySchemaChange(changeEvent); LOG.debug("Applied schema change {} to table {}.", changeEvent, tableId); + schemaManager.applyEvolvedSchemaChange(changeEvent); finishedSchemaChanges.add(changeEvent); - } catch (SchemaEvolveException e) { + } catch (Throwable t) { LOG.error( "Failed to apply schema change {} to table {}. Caused by: {}", changeEvent, tableId, - e); - failedSchemaChanges.add(Tuple2.of(changeEvent, e)); + t); + if (!shouldIgnoreException(t)) { + schemaChangeException = t; + break; + } else { + LOG.warn( + "Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}", + changeEvent, + t); + } } } } + PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0); if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) { startNextSchemaChangeRequest(); @@ -254,6 +263,10 @@ public void flushSuccess(TableId tableId, int sinkSubtask) throws InterruptedExc () -> applySchemaChange(tableId, waitFlushSuccess.derivedSchemaChangeEvents)); Thread.sleep(1000); + if (schemaChangeException != null) { + throw new RuntimeException("Failed to apply schema change.", schemaChangeException); + } + if (isSchemaChangeApplying) { waitFlushSuccess .getResponseFuture() @@ -261,12 +274,7 @@ public void flushSuccess(TableId tableId, int sinkSubtask) throws InterruptedExc } else { waitFlushSuccess .getResponseFuture() - .complete( - wrap( - new ReleaseUpstreamResponse( - finishedSchemaChanges, - failedSchemaChanges, - ignoredSchemaChanges))); + .complete(wrap(new ReleaseUpstreamResponse(finishedSchemaChanges))); } } } @@ -305,16 +313,15 @@ public CompletableFuture refreshPendingLists() { } public CompletableFuture getSchemaChangeResult() { + if (schemaChangeException != null) { + throw new RuntimeException("Failed to apply schema change.", schemaChangeException); + } + if (isSchemaChangeApplying) { return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse())); } else { return CompletableFuture.supplyAsync( - () -> - wrap( - new ReleaseUpstreamResponse( - finishedSchemaChanges, - failedSchemaChanges, - ignoredSchemaChanges))); + () -> wrap(new ReleaseUpstreamResponse(finishedSchemaChanges))); } } @@ -437,6 +444,15 @@ private List lenientizeSchemaChangeEvent(SchemaChangeEvent ev } } + private boolean shouldIgnoreException(Throwable throwable) { + + // In IGNORE mode, will never try to apply schema change events + // In EVOLVE and and LENIENT mode, such failure will not be tolerated + // In EXCEPTION mode, an exception will be thrown once captured + return (throwable instanceof UnsupportedSchemaChangeEventException) + && (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE); + } + private static class PendingSchemaChange { private final SchemaChangeRequest changeRequest; private List derivedSchemaChangeEvents; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java deleted file mode 100644 index f6798af519..0000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema.event; - -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.runtime.operators.coordination.CoordinationRequest; - -import java.util.List; -import java.util.Objects; - -/** - * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request apply evolved schema - * changes. - */ -public class ApplyEvolvedSchemaChangeRequest implements CoordinationRequest { - - private static final long serialVersionUID = 1L; - - /** The sender of the request. */ - private final TableId tableId; - /** The schema changes. */ - private final List schemaChangeEvent; - - public ApplyEvolvedSchemaChangeRequest( - TableId tableId, List schemaChangeEvent) { - this.tableId = tableId; - this.schemaChangeEvent = schemaChangeEvent; - } - - public TableId getTableId() { - return tableId; - } - - public List getSchemaChangeEvent() { - return schemaChangeEvent; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof ApplyEvolvedSchemaChangeRequest)) { - return false; - } - ApplyEvolvedSchemaChangeRequest that = (ApplyEvolvedSchemaChangeRequest) o; - return Objects.equals(tableId, that.tableId) - && Objects.equals(schemaChangeEvent, that.schemaChangeEvent); - } - - @Override - public int hashCode() { - return Objects.hash(tableId, schemaChangeEvent); - } -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java deleted file mode 100644 index 787adfc5e4..0000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema.event; - -import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.runtime.operators.coordination.CoordinationResponse; - -/** - * The response from {@link SchemaRegistry} to {@link SchemaOperator} to request apply original - * schema changes, the evolved schema changes come from original schema changes with different - * schema evolution strategy. - */ -public class ApplyEvolvedSchemaChangeResponse implements CoordinationResponse { - - private static final long serialVersionUID = 1L; -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java deleted file mode 100644 index d4c5d7feef..0000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema.event; - -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.runtime.operators.coordination.CoordinationRequest; - -import java.util.Objects; - -/** - * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request apply original - * schema changes. - */ -public class ApplyOriginalSchemaChangeRequest implements CoordinationRequest { - - private static final long serialVersionUID = 1L; - - /** The sender of the request. */ - private final TableId tableId; - /** The schema changes. */ - private final SchemaChangeEvent schemaChangeEvent; - - public ApplyOriginalSchemaChangeRequest(TableId tableId, SchemaChangeEvent schemaChangeEvent) { - this.tableId = tableId; - this.schemaChangeEvent = schemaChangeEvent; - } - - public TableId getTableId() { - return tableId; - } - - public SchemaChangeEvent getSchemaChangeEvent() { - return schemaChangeEvent; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof ApplyOriginalSchemaChangeRequest)) { - return false; - } - ApplyOriginalSchemaChangeRequest that = (ApplyOriginalSchemaChangeRequest) o; - return Objects.equals(tableId, that.tableId) - && Objects.equals(schemaChangeEvent, that.schemaChangeEvent); - } - - @Override - public int hashCode() { - return Objects.hash(tableId, schemaChangeEvent); - } -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java deleted file mode 100644 index 0a92e96566..0000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema.event; - -import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.runtime.operators.coordination.CoordinationResponse; - -/** - * The response from {@link SchemaRegistry} to {@link SchemaOperator} to request apply original - * schema changes. - */ -public class ApplyOriginalSchemaChangeResponse implements CoordinationResponse { - - private static final long serialVersionUID = 1L; -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java index f577f1120f..bea8809620 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.runtime.operators.schema.event; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; @@ -25,7 +24,6 @@ import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; /** * The response for {@link ReleaseUpstreamRequest} from {@link SchemaRegistry} to {@link @@ -41,50 +39,19 @@ public class ReleaseUpstreamResponse implements CoordinationResponse { */ private final List finishedSchemaChangeEvents; - private final List> failedSchemaChangeEvents; - - private final List ignoredSchemaChangeEvents; - - public ReleaseUpstreamResponse( - List finishedSchemaChangeEvents, - List> failedSchemaChangeEvents, - List ignoredSchemaChangeEvents) { + public ReleaseUpstreamResponse(List finishedSchemaChangeEvents) { this.finishedSchemaChangeEvents = finishedSchemaChangeEvents; - this.failedSchemaChangeEvents = failedSchemaChangeEvents; - this.ignoredSchemaChangeEvents = ignoredSchemaChangeEvents; } public List getFinishedSchemaChangeEvents() { return finishedSchemaChangeEvents; } - public List> getFailedSchemaChangeEvents() { - return failedSchemaChangeEvents; - } - - public List getIgnoredSchemaChangeEvents() { - return ignoredSchemaChangeEvents; - } - - public String getPrintableFailedSchemaChangeEvents() { - return failedSchemaChangeEvents.stream() - .map(e -> "Failed to apply " + e.f0 + ". Caused by: " + e.f1) - .collect(Collectors.joining("\n")); - } - - public boolean hasException() { - return !failedSchemaChangeEvents.isEmpty(); - } - @Override public String toString() { return "ReleaseUpstreamResponse{" + "finishedSchemaChangeEvents=" + finishedSchemaChangeEvents - + ", failedSchemaChangeEvents=" - + failedSchemaChangeEvents - + ", ignoredSchemaChangeEvents=" - + ignoredSchemaChangeEvents + '}'; } @@ -97,14 +64,11 @@ public boolean equals(Object object) { return false; } ReleaseUpstreamResponse that = (ReleaseUpstreamResponse) object; - return Objects.equals(finishedSchemaChangeEvents, that.finishedSchemaChangeEvents) - && Objects.equals(failedSchemaChangeEvents, that.failedSchemaChangeEvents) - && Objects.equals(ignoredSchemaChangeEvents, that.ignoredSchemaChangeEvents); + return Objects.equals(finishedSchemaChangeEvents, that.finishedSchemaChangeEvents); } @Override public int hashCode() { - return Objects.hash( - finishedSchemaChangeEvents, failedSchemaChangeEvents, ignoredSchemaChangeEvents); + return Objects.hash(finishedSchemaChangeEvents); } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java index 325aee7a71..51acbd536e 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java @@ -1040,6 +1040,8 @@ tableId, buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)), Column.physicalColumn( "height", DOUBLE, "Height data"))))); Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents)) + .cause() + .cause() .isExactlyInstanceOf(RuntimeException.class) .hasMessageContaining("Failed to apply schema change"); harness.close(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java index b5262d2d5c..3543045991 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java @@ -25,8 +25,6 @@ import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent; import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest; import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse; @@ -56,7 +54,6 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedList; import java.util.Set; @@ -166,14 +163,10 @@ public OP getOperator() { } public void registerTableSchema(TableId tableId, Schema schema) { - schemaRegistry.handleCoordinationRequest( - new ApplyOriginalSchemaChangeRequest( - tableId, new CreateTableEvent(tableId, schema))); + schemaRegistry.handleApplyOriginalSchemaChangeEvent(new CreateTableEvent(tableId, schema)); schemaRegistry.handleCoordinationRequest( new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema))); - schemaRegistry.handleCoordinationRequest( - new ApplyEvolvedSchemaChangeRequest( - tableId, Collections.singletonList(new CreateTableEvent(tableId, schema)))); + schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new CreateTableEvent(tableId, schema)); } public Schema getLatestOriginalSchema(TableId tableId) throws Exception { diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java index 9398e1f1e9..9384fa7577 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.sink.MetadataApplier; import java.time.Duration; @@ -83,8 +84,7 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) try { Thread.sleep(duration.toMillis()); if (errorsOnEventTypes.contains(schemaChangeEvent.getType())) { - throw new SchemaEvolveException( - schemaChangeEvent, "Dummy metadata apply exception for test.", null); + throw new UnsupportedSchemaChangeEventException(schemaChangeEvent); } } catch (InterruptedException ignore) { // Ignores sleep interruption