Skip to content

Commit

Permalink
[FLINK-36094][cdc-runtime] Improve the Exception that SchemaRegistryR…
Browse files Browse the repository at this point in the history
…equestHandler thrown

 This closes apache#3558.

(cherry picked from commit 6205a5a)
  • Loading branch information
loserwang1024 authored and leonardBang committed Aug 27, 2024
1 parent bd4649c commit 0003a32
Show file tree
Hide file tree
Showing 18 changed files with 149 additions and 414 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

/** An exception occurred during schema evolution. */
public class SchemaEvolveException extends FlinkRuntimeException {
private final SchemaChangeEvent applyingEvent;
private final String exceptionMessage;
private final @Nullable Throwable cause;
protected final SchemaChangeEvent applyingEvent;
protected final String exceptionMessage;
protected final @Nullable Throwable cause;

public SchemaEvolveException(SchemaChangeEvent applyingEvent, String exceptionMessage) {
this(applyingEvent, exceptionMessage, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,36 @@

import org.apache.flink.cdc.common.event.SchemaChangeEvent;

import javax.annotation.Nullable;

/** A special kind of {@link SchemaEvolveException} that sink doesn't support such event type. */
public class UnsupportedSchemaChangeEventException extends SchemaEvolveException {

public UnsupportedSchemaChangeEventException(SchemaChangeEvent applyingEvent) {
super(applyingEvent, "Sink doesn't support such schema change event.", null);
this(applyingEvent, "Sink doesn't support such schema change event.");
}

public UnsupportedSchemaChangeEventException(
SchemaChangeEvent applyingEvent, String exceptionMessage) {
this(applyingEvent, exceptionMessage, null);
}

public UnsupportedSchemaChangeEventException(
SchemaChangeEvent applyingEvent, String exceptionMessage, @Nullable Throwable cause) {
super(applyingEvent, exceptionMessage, cause);
}

@Override
public String toString() {
return "UnsupportedSchemaChangeEventException{"
+ "applyingEvent="
+ applyingEvent
+ ", exceptionMessage='"
+ exceptionMessage
+ '\''
+ ", cause='"
+ cause
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
Expand Down Expand Up @@ -150,7 +151,7 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
tableId, ((CreateTableEvent) schemaChangeEvent).getSchema()));
}
} else {
throw new SchemaEvolveException(
throw new UnsupportedSchemaChangeEventException(
schemaChangeEvent,
"Rejected schema change event since error.on.schema.change is enabled.",
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private void waitUntilSpecificEvent(String event) throws Exception {
long endTimeout = System.currentTimeMillis() + MysqlE2eITCase.EVENT_WAITING_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ public void testSchemaEvolveWithIncompatibleChanges() throws Exception {
true,
false,
false,
Collections.singletonList(
"java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\""),
Collections.singletonList(
Collections.emptyList(),
Arrays.asList(
"java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"",
"org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"));
}

Expand All @@ -123,11 +123,10 @@ public void testSchemaEvolveWithException() throws Exception {
false,
true,
false,
Collections.singletonList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}"),
Collections.emptyList(),
Arrays.asList(
"Failed to apply schema change event AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}.",
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"));
}

Expand All @@ -143,8 +142,8 @@ public void testSchemaTryEvolveWithException() throws Exception {
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null], op=INSERT, meta=()}"),
Arrays.asList(
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members.",
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}"));
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}"));
}

@Test
Expand Down Expand Up @@ -181,7 +180,7 @@ public void testLenientSchemaEvolution() throws Exception {
false,
false,
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}",
Expand Down Expand Up @@ -348,7 +347,7 @@ private void testGenericSchemaEvolution(

List<String> expectedJmEvents =
expectedJobManagerEvents.stream()
.map(s -> String.format(s, dbName, dbName))
.map(s -> String.format(s, dbName, dbName, dbName))
.collect(Collectors.toList());

validateResult(expectedJmEvents, jobManagerConsumer);
Expand Down Expand Up @@ -392,7 +391,7 @@ private void waitUntilSpecificEvent(String event, ToStringConsumer consumer) thr
long endTimeout = System.currentTimeMillis() + SchemaEvolveE2eITCase.EVENT_WAITING_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = consumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ private void waitUntilSpecificEvent(String event, long timeout) throws Exception
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ private void waitUntilSpecificEvent(String event, long timeout) throws Exception
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.cdc.runtime.operators.schema;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
Expand All @@ -29,7 +28,6 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
Expand All @@ -40,8 +38,6 @@
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
Expand Down Expand Up @@ -431,56 +427,9 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh
ReleaseUpstreamResponse schemaEvolveResponse = requestReleaseUpstream();
List<SchemaChangeEvent> finishedSchemaChangeEvents =
schemaEvolveResponse.getFinishedSchemaChangeEvents();
List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChangeEvents =
schemaEvolveResponse.getFailedSchemaChangeEvents();
List<SchemaChangeEvent> ignoredSchemaChangeEvents =
schemaEvolveResponse.getIgnoredSchemaChangeEvents();

if (schemaChangeBehavior == SchemaChangeBehavior.EVOLVE
|| schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION) {
if (schemaEvolveResponse.hasException()) {
throw new RuntimeException(
String.format(
"Failed to apply schema change event %s.\nExceptions: %s",
schemaChangeEvent,
schemaEvolveResponse.getPrintableFailedSchemaChangeEvents()));
}
} else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE
|| schemaChangeBehavior == SchemaChangeBehavior.LENIENT
|| schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
if (schemaEvolveResponse.hasException()) {
schemaEvolveResponse
.getFailedSchemaChangeEvents()
.forEach(
e ->
LOG.warn(
"Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}",
e.f0,
e.f1));
}
} else {
throw new SchemaEvolveException(
schemaChangeEvent,
"Unexpected schema change behavior: " + schemaChangeBehavior);
}

// Update evolved schema changes based on apply results
requestApplyEvolvedSchemaChanges(tableId, finishedSchemaChangeEvents);
finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e)));

LOG.info(
"Applied schema change event {} to downstream. Among {} total evolved events, {} succeeded, {} failed, and {} ignored.",
schemaChangeEvent,
expectedSchemaChangeEvents.size(),
finishedSchemaChangeEvents.size(),
failedSchemaChangeEvents.size(),
ignoredSchemaChangeEvents.size());

schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(
finishedSchemaChangeEvents.size());
schemaOperatorMetrics.increaseFailedSchemaChangeEvents(failedSchemaChangeEvents.size());
schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(
ignoredSchemaChangeEvents.size());
}
}

Expand All @@ -489,16 +438,6 @@ private SchemaChangeResponse requestSchemaChange(
return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
}

private void requestApplyOriginalSchemaChanges(
TableId tableId, SchemaChangeEvent schemaChangeEvent) {
sendRequestToCoordinator(new ApplyOriginalSchemaChangeRequest(tableId, schemaChangeEvent));
}

private void requestApplyEvolvedSchemaChanges(
TableId tableId, List<SchemaChangeEvent> schemaChangeEvents) {
sendRequestToCoordinator(new ApplyEvolvedSchemaChangeRequest(tableId, schemaChangeEvents));
}

private ReleaseUpstreamResponse requestReleaseUpstream()
throws InterruptedException, TimeoutException {
CoordinationResponse coordinationResponse =
Expand Down Expand Up @@ -538,7 +477,7 @@ private Schema getLatestEvolvedSchema(TableId tableId) {
return optionalSchema.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to get latest schema for table \"%s\"", tableId));
String.format("Unable to get latest schema for table \"%s\"", tableId), e);
}
}

Expand All @@ -553,7 +492,7 @@ private Schema getLatestOriginalSchema(TableId tableId) {
return optionalSchema.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to get latest schema for table \"%s\"", tableId));
String.format("Unable to get latest schema for table \"%s\"", tableId), e);
}
}

Expand Down
Loading

0 comments on commit 0003a32

Please sign in to comment.