Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36094] CDC SchemaRegistryRequestHandler should throw exception which is not SchemaEvolveException #3558

Merged
merged 7 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

/** An exception occurred during schema evolution. */
public class SchemaEvolveException extends FlinkRuntimeException {
private final SchemaChangeEvent applyingEvent;
private final String exceptionMessage;
private final @Nullable Throwable cause;
protected final SchemaChangeEvent applyingEvent;
protected final String exceptionMessage;
protected final @Nullable Throwable cause;

public SchemaEvolveException(SchemaChangeEvent applyingEvent, String exceptionMessage) {
this(applyingEvent, exceptionMessage, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,36 @@

import org.apache.flink.cdc.common.event.SchemaChangeEvent;

import javax.annotation.Nullable;

/** A special kind of {@link SchemaEvolveException} that sink doesn't support such event type. */
public class UnsupportedSchemaChangeEventException extends SchemaEvolveException {

public UnsupportedSchemaChangeEventException(SchemaChangeEvent applyingEvent) {
super(applyingEvent, "Sink doesn't support such schema change event.", null);
this(applyingEvent, "Sink doesn't support such schema change event.");
}

public UnsupportedSchemaChangeEventException(
SchemaChangeEvent applyingEvent, String exceptionMessage) {
this(applyingEvent, exceptionMessage, null);
}

public UnsupportedSchemaChangeEventException(
SchemaChangeEvent applyingEvent, String exceptionMessage, @Nullable Throwable cause) {
super(applyingEvent, exceptionMessage, cause);
}

@Override
public String toString() {
return "UnsupportedSchemaChangeEventException{"
+ "applyingEvent="
+ applyingEvent
+ ", exceptionMessage='"
+ exceptionMessage
+ '\''
+ ", cause='"
+ cause
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private void applyAddColumnEvent(AddColumnEvent event) throws SchemaEvolveExcept
tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply add column event", e);
loserwang1024 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -234,7 +234,7 @@ private void applyDropColumnEvent(DropColumnEvent event) throws SchemaEvolveExce
tableId.getSchemaName(), tableId.getTableName(), col);
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply drop column event", e);
}
}

Expand All @@ -250,7 +250,7 @@ private void applyRenameColumnEvent(RenameColumnEvent event) throws SchemaEvolve
entry.getValue());
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply rename column event", e);
}
}

Expand All @@ -272,7 +272,7 @@ private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event)
// will be fixed after FLINK-35243 got merged.
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply alter column type event", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
Expand Down Expand Up @@ -152,7 +153,7 @@ public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
tableId, ((CreateTableEvent) schemaChangeEvent).getSchema()));
}
} else {
throw new SchemaEvolveException(
throw new UnsupportedSchemaChangeEventException(
schemaChangeEvent,
"Rejected schema change event since error.on.schema.change is enabled.",
null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private void waitUntilSpecificEvent(String event) throws Exception {
long endTimeout = System.currentTimeMillis() + MysqlE2eITCase.EVENT_WAITING_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ public void testReplacementSymbol() throws Exception {
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEALPHA, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}",
"RenameColumnEvent{tableId=NEW_%s.NEW_TABLEBETA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEBETA, before=[], after=[10002, 15], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}",
"AlterColumnTypeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, typeMapping={VERSION=VARCHAR(19)}, oldTypeMapping={VERSION=VARCHAR(17)}}",
"RenameColumnEvent{tableId=NEW_%s.NEW_TABLEGAMMA, nameMapping={VERSION=VERSION_EX}}",
"DataChangeEvent{tableId=NEW_%s.NEW_TABLEGAMMA, before=[], after=[10003, Fluorite], op=INSERT, meta=()}",
"DropColumnEvent{tableId=NEW_%s.NEW_TABLEDELTA, droppedColumnNames=[VERSION]}",
Expand All @@ -818,7 +818,7 @@ private void waitUntilSpecificEvent(String event) throws Exception {
long endTimeout = System.currentTimeMillis() + EVENT_DEFAULT_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ public void testSchemaEvolveWithIncompatibleChanges() throws Exception {
true,
false,
false,
Collections.singletonList(
"java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\""),
Collections.singletonList(
Collections.emptyList(),
Arrays.asList(
"java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"",
"org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"));
}

Expand All @@ -126,11 +126,10 @@ public void testSchemaEvolveWithException() throws Exception {
false,
true,
false,
Collections.singletonList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}"),
Collections.emptyList(),
Arrays.asList(
"Failed to apply schema change event AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}.",
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy"));
}

Expand All @@ -146,8 +145,8 @@ public void testSchemaTryEvolveWithException() throws Exception {
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null], op=INSERT, meta=()}"),
Arrays.asList(
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members.",
"SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}"));
"Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}",
"UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}"));
}

@Test
Expand Down Expand Up @@ -185,7 +184,7 @@ public void testLenientSchemaEvolution() throws Exception {
false,
false,
Arrays.asList(
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}",
"DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.members, typeMapping={age=DOUBLE}, oldTypeMapping={age=INT}}",
"AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}",
Expand Down Expand Up @@ -369,7 +368,7 @@ private void testGenericSchemaEvolution(

List<String> expectedJmEvents =
expectedJobManagerEvents.stream()
.map(s -> String.format(s, dbName, dbName))
.map(s -> String.format(s, dbName, dbName, dbName))
.collect(Collectors.toList());

validateResult(expectedJmEvents, jobManagerConsumer);
Expand Down Expand Up @@ -422,7 +421,7 @@ private void waitUntilSpecificEvent(String event, ToStringConsumer consumer) thr
long endTimeout = System.currentTimeMillis() + SchemaEvolveE2eITCase.EVENT_WAITING_TIMEOUT;
while (System.currentTimeMillis() < endTimeout) {
String stdout = consumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ private void waitUntilSpecificEvent(String event, long timeout) throws Exception
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ private void waitUntilSpecificEvent(String event, long timeout) throws Exception
long endTimeout = System.currentTimeMillis() + timeout;
while (System.currentTimeMillis() < endTimeout) {
String stdout = taskManagerConsumer.toUtf8String();
if (stdout.contains(event)) {
if (stdout.contains(event + "\n")) {
result = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.cdc.runtime.operators.schema;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
Expand All @@ -29,7 +28,6 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
Expand All @@ -40,8 +38,6 @@
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
Expand Down Expand Up @@ -431,56 +427,9 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh
ReleaseUpstreamResponse schemaEvolveResponse = requestReleaseUpstream();
List<SchemaChangeEvent> finishedSchemaChangeEvents =
schemaEvolveResponse.getFinishedSchemaChangeEvents();
List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChangeEvents =
schemaEvolveResponse.getFailedSchemaChangeEvents();
List<SchemaChangeEvent> ignoredSchemaChangeEvents =
schemaEvolveResponse.getIgnoredSchemaChangeEvents();

if (schemaChangeBehavior == SchemaChangeBehavior.EVOLVE
|| schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION) {
if (schemaEvolveResponse.hasException()) {
throw new RuntimeException(
String.format(
"Failed to apply schema change event %s.\nExceptions: %s",
schemaChangeEvent,
schemaEvolveResponse.getPrintableFailedSchemaChangeEvents()));
}
} else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE
|| schemaChangeBehavior == SchemaChangeBehavior.LENIENT
|| schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
if (schemaEvolveResponse.hasException()) {
schemaEvolveResponse
.getFailedSchemaChangeEvents()
.forEach(
e ->
LOG.warn(
"Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}",
e.f0,
e.f1));
}
} else {
throw new SchemaEvolveException(
schemaChangeEvent,
"Unexpected schema change behavior: " + schemaChangeBehavior);
}

// Update evolved schema changes based on apply results
requestApplyEvolvedSchemaChanges(tableId, finishedSchemaChangeEvents);
finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e)));

LOG.info(
"Applied schema change event {} to downstream. Among {} total evolved events, {} succeeded, {} failed, and {} ignored.",
schemaChangeEvent,
expectedSchemaChangeEvents.size(),
finishedSchemaChangeEvents.size(),
failedSchemaChangeEvents.size(),
ignoredSchemaChangeEvents.size());

schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(
finishedSchemaChangeEvents.size());
schemaOperatorMetrics.increaseFailedSchemaChangeEvents(failedSchemaChangeEvents.size());
schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(
ignoredSchemaChangeEvents.size());
}
}

Expand All @@ -489,16 +438,6 @@ private SchemaChangeResponse requestSchemaChange(
return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
}

private void requestApplyOriginalSchemaChanges(
TableId tableId, SchemaChangeEvent schemaChangeEvent) {
sendRequestToCoordinator(new ApplyOriginalSchemaChangeRequest(tableId, schemaChangeEvent));
}

private void requestApplyEvolvedSchemaChanges(
TableId tableId, List<SchemaChangeEvent> schemaChangeEvents) {
sendRequestToCoordinator(new ApplyEvolvedSchemaChangeRequest(tableId, schemaChangeEvents));
}

private ReleaseUpstreamResponse requestReleaseUpstream()
throws InterruptedException, TimeoutException {
CoordinationResponse coordinationResponse =
Expand Down Expand Up @@ -538,7 +477,7 @@ private Schema getLatestEvolvedSchema(TableId tableId) {
return optionalSchema.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to get latest schema for table \"%s\"", tableId));
String.format("Unable to get latest schema for table \"%s\"", tableId), e);
}
}

Expand All @@ -553,7 +492,7 @@ private Schema getLatestOriginalSchema(TableId tableId) {
return optionalSchema.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to get latest schema for table \"%s\"", tableId));
String.format("Unable to get latest schema for table \"%s\"", tableId), e);
}
}

Expand Down
Loading
Loading