Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36094] CDC SchemaRegistryRequestHandler should throw exception which is not SchemaEvolveException #3558

Merged
merged 7 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,22 @@

import org.apache.flink.cdc.common.event.SchemaChangeEvent;

import javax.annotation.Nullable;

/** A special kind of {@link SchemaEvolveException} that sink doesn't support such event type. */
public class UnsupportedSchemaChangeEventException extends SchemaEvolveException {

public UnsupportedSchemaChangeEventException(SchemaChangeEvent applyingEvent) {
super(applyingEvent, "Sink doesn't support such schema change event.", null);
this(applyingEvent, "Sink doesn't support such schema change event.");
}

public UnsupportedSchemaChangeEventException(
SchemaChangeEvent applyingEvent, String exceptionMessage) {
this(applyingEvent, exceptionMessage, null);
}

public UnsupportedSchemaChangeEventException(
SchemaChangeEvent applyingEvent, String exceptionMessage, @Nullable Throwable cause) {
super(applyingEvent, exceptionMessage, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private void applyAddColumnEvent(AddColumnEvent event) throws SchemaEvolveExcept
tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply add column event", e);
loserwang1024 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -234,7 +234,7 @@ private void applyDropColumnEvent(DropColumnEvent event) throws SchemaEvolveExce
tableId.getSchemaName(), tableId.getTableName(), col);
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply drop column event", e);
}
}

Expand All @@ -250,7 +250,7 @@ private void applyRenameColumnEvent(RenameColumnEvent event) throws SchemaEvolve
entry.getValue());
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply rename column event", e);
}
}

Expand All @@ -272,7 +272,7 @@ private void applyAlterColumnTypeEvent(AlterColumnTypeEvent event)
// will be fixed after FLINK-35243 got merged.
}
} catch (Exception e) {
throw new SchemaEvolveException(event, e.getMessage(), e);
throw new SchemaEvolveException(event, "fail to apply alter column type event", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.cdc.runtime.operators.schema;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
Expand All @@ -29,7 +28,6 @@
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Column;
Expand All @@ -40,8 +38,6 @@
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
Expand Down Expand Up @@ -431,56 +427,9 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh
ReleaseUpstreamResponse schemaEvolveResponse = requestReleaseUpstream();
List<SchemaChangeEvent> finishedSchemaChangeEvents =
schemaEvolveResponse.getFinishedSchemaChangeEvents();
List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChangeEvents =
schemaEvolveResponse.getFailedSchemaChangeEvents();
List<SchemaChangeEvent> ignoredSchemaChangeEvents =
schemaEvolveResponse.getIgnoredSchemaChangeEvents();

if (schemaChangeBehavior == SchemaChangeBehavior.EVOLVE
|| schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION) {
if (schemaEvolveResponse.hasException()) {
throw new RuntimeException(
String.format(
"Failed to apply schema change event %s.\nExceptions: %s",
schemaChangeEvent,
schemaEvolveResponse.getPrintableFailedSchemaChangeEvents()));
}
} else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE
|| schemaChangeBehavior == SchemaChangeBehavior.LENIENT
|| schemaChangeBehavior == SchemaChangeBehavior.IGNORE) {
if (schemaEvolveResponse.hasException()) {
schemaEvolveResponse
.getFailedSchemaChangeEvents()
.forEach(
e ->
LOG.warn(
"Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}",
e.f0,
e.f1));
}
} else {
throw new SchemaEvolveException(
schemaChangeEvent,
"Unexpected schema change behavior: " + schemaChangeBehavior);
}

// Update evolved schema changes based on apply results
requestApplyEvolvedSchemaChanges(tableId, finishedSchemaChangeEvents);
finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e)));

LOG.info(
"Applied schema change event {} to downstream. Among {} total evolved events, {} succeeded, {} failed, and {} ignored.",
schemaChangeEvent,
expectedSchemaChangeEvents.size(),
finishedSchemaChangeEvents.size(),
failedSchemaChangeEvents.size(),
ignoredSchemaChangeEvents.size());

schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(
finishedSchemaChangeEvents.size());
schemaOperatorMetrics.increaseFailedSchemaChangeEvents(failedSchemaChangeEvents.size());
schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(
ignoredSchemaChangeEvents.size());
}
}

Expand All @@ -489,16 +438,6 @@ private SchemaChangeResponse requestSchemaChange(
return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
}

private void requestApplyOriginalSchemaChanges(
TableId tableId, SchemaChangeEvent schemaChangeEvent) {
sendRequestToCoordinator(new ApplyOriginalSchemaChangeRequest(tableId, schemaChangeEvent));
}

private void requestApplyEvolvedSchemaChanges(
TableId tableId, List<SchemaChangeEvent> schemaChangeEvents) {
sendRequestToCoordinator(new ApplyEvolvedSchemaChangeRequest(tableId, schemaChangeEvents));
}

private ReleaseUpstreamResponse requestReleaseUpstream()
throws InterruptedException, TimeoutException {
CoordinationResponse coordinationResponse =
Expand Down Expand Up @@ -538,7 +477,7 @@ private Schema getLatestEvolvedSchema(TableId tableId) {
return optionalSchema.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to get latest schema for table \"%s\"", tableId));
String.format("Unable to get latest schema for table \"%s\"", tableId), e);
}
}

Expand All @@ -553,7 +492,7 @@ private Schema getLatestOriginalSchema(TableId tableId) {
return optionalSchema.get();
} catch (Exception e) {
throw new IllegalStateException(
String.format("Unable to get latest schema for table \"%s\"", tableId));
String.format("Unable to get latest schema for table \"%s\"", tableId), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

package org.apache.flink.cdc.runtime.operators.schema.coordinator;

import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse;
Expand Down Expand Up @@ -202,16 +200,6 @@ public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
} else if (request instanceof GetOriginalSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
} else if (request instanceof ApplyOriginalSchemaChangeRequest) {
return CompletableFuture.completedFuture(
wrap(
handleApplyOriginalSchemaChangeRequest(
(ApplyOriginalSchemaChangeRequest) request)));
} else if (request instanceof ApplyEvolvedSchemaChangeRequest) {
return CompletableFuture.completedFuture(
wrap(
handleApplyEvolvedSchemaChangeRequest(
(ApplyEvolvedSchemaChangeRequest) request)));
} else if (request instanceof SchemaChangeResultRequest) {
return requestHandler.getSchemaChangeResult();
} else if (request instanceof RefreshPendingListsRequest) {
Expand Down Expand Up @@ -342,18 +330,15 @@ private GetOriginalSchemaResponse handleGetOriginalSchemaRequest(
}
}

private ApplyOriginalSchemaChangeResponse handleApplyOriginalSchemaChangeRequest(
ApplyOriginalSchemaChangeRequest applyOriginalSchemaChangeRequest) {
schemaManager.applyOriginalSchemaChange(
applyOriginalSchemaChangeRequest.getSchemaChangeEvent());
return new ApplyOriginalSchemaChangeResponse();
// --------------------Only visible for test -----------------

@VisibleForTesting
public void handleApplyOriginalSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) {
schemaManager.applyOriginalSchemaChange(schemaChangeEvent);
}

private ApplyEvolvedSchemaChangeResponse handleApplyEvolvedSchemaChangeRequest(
ApplyEvolvedSchemaChangeRequest applyEvolvedSchemaChangeRequest) {
applyEvolvedSchemaChangeRequest
.getSchemaChangeEvent()
.forEach(schemaManager::applyEvolvedSchemaChange);
return new ApplyEvolvedSchemaChangeResponse();
@VisibleForTesting
public void handleApplyEvolvedSchemaChangeRequest(SchemaChangeEvent schemaChangeEvent) {
schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.cdc.runtime.operators.schema.coordinator;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
Expand All @@ -28,7 +27,7 @@
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
Expand Down Expand Up @@ -88,14 +87,15 @@ public class SchemaRegistryRequestHandler implements Closeable {
private final List<PendingSchemaChange> pendingSchemaChanges;

private final List<SchemaChangeEvent> finishedSchemaChanges;
private final List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChanges;
private final List<SchemaChangeEvent> ignoredSchemaChanges;

/** Sink writers which have sent flush success events for the request. */
private final Set<Integer> flushedSinkWriters;

/** Status of the execution of current schema change request. */
private volatile boolean isSchemaChangeApplying;
/** Actual exception if failed to apply schema change. */
private volatile Throwable schemaChangeException;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;

Expand All @@ -111,7 +111,6 @@ public SchemaRegistryRequestHandler(
this.flushedSinkWriters = new HashSet<>();
this.pendingSchemaChanges = new LinkedList<>();
this.finishedSchemaChanges = new LinkedList<>();
this.failedSchemaChanges = new LinkedList<>();
this.ignoredSchemaChanges = new LinkedList<>();
this.schemaManager = schemaManager;
this.schemaDerivation = schemaDerivation;
Expand All @@ -129,8 +128,8 @@ public SchemaRegistryRequestHandler(
private void applySchemaChange(
TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) {
isSchemaChangeApplying = true;
schemaChangeException = null;
finishedSchemaChanges.clear();
failedSchemaChanges.clear();
ignoredSchemaChanges.clear();

for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
Expand All @@ -147,17 +146,27 @@ private void applySchemaChange(
try {
metadataApplier.applySchemaChange(changeEvent);
LOG.debug("Applied schema change {} to table {}.", changeEvent, tableId);
schemaManager.applyEvolvedSchemaChange(changeEvent);
finishedSchemaChanges.add(changeEvent);
} catch (SchemaEvolveException e) {
} catch (Throwable t) {
LOG.error(
"Failed to apply schema change {} to table {}. Caused by: {}",
changeEvent,
tableId,
e);
failedSchemaChanges.add(Tuple2.of(changeEvent, e));
t);
if (!shouldIgnoreException(t)) {
schemaChangeException = t;
break;
} else {
LOG.warn(
"Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}",
changeEvent,
e);
}
}
}
}

PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) {
startNextSchemaChangeRequest();
Expand Down Expand Up @@ -254,19 +263,18 @@ public void flushSuccess(TableId tableId, int sinkSubtask) throws InterruptedExc
() -> applySchemaChange(tableId, waitFlushSuccess.derivedSchemaChangeEvents));
Thread.sleep(1000);

if (schemaChangeException != null) {
throw new RuntimeException("Failed to apply schema change.", schemaChangeException);
}

if (isSchemaChangeApplying) {
waitFlushSuccess
.getResponseFuture()
.complete(wrap(new SchemaChangeProcessingResponse()));
} else {
waitFlushSuccess
.getResponseFuture()
.complete(
wrap(
new ReleaseUpstreamResponse(
finishedSchemaChanges,
failedSchemaChanges,
ignoredSchemaChanges)));
.complete(wrap(new ReleaseUpstreamResponse(finishedSchemaChanges)));
}
}
}
Expand Down Expand Up @@ -305,16 +313,15 @@ public CompletableFuture<CoordinationResponse> refreshPendingLists() {
}

public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
if (schemaChangeException != null) {
throw new RuntimeException("Failed to apply schema change.", schemaChangeException);
}

if (isSchemaChangeApplying) {
return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse()));
} else {
return CompletableFuture.supplyAsync(
() ->
wrap(
new ReleaseUpstreamResponse(
finishedSchemaChanges,
failedSchemaChanges,
ignoredSchemaChanges)));
() -> wrap(new ReleaseUpstreamResponse(finishedSchemaChanges)));
}
}

Expand Down Expand Up @@ -437,6 +444,15 @@ private List<SchemaChangeEvent> lenientizeSchemaChangeEvent(SchemaChangeEvent ev
}
}

private boolean shouldIgnoreException(Throwable throwable) {

// In IGNORE mode, will never try to apply schema change events
// In EVOLVE and and LENIENT mode, such failure will not be tolerated
// In EXCEPTION mode, an exception will be thrown once captured
return (throwable instanceof UnsupportedSchemaChangeEventException)
&& (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE);
}

private static class PendingSchemaChange {
private final SchemaChangeRequest changeRequest;
private List<SchemaChangeEvent> derivedSchemaChangeEvents;
Expand Down
Loading
Loading