Skip to content

Commit

Permalink
[minor][cdc-runtime] Run schema coordinator logic asynchronously to a…
Browse files Browse the repository at this point in the history
…void blocking the main thread

This closes #3557
  • Loading branch information
yuxiqian authored Aug 27, 2024
1 parent afe9c3c commit a876af2
Show file tree
Hide file tree
Showing 7 changed files with 314 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/** Dummy classes for migration test. Called via reflection. */
Expand Down Expand Up @@ -69,6 +70,7 @@ public SchemaRegistry generateSchemaRegistry() {
return new SchemaRegistry(
"Dummy Name",
null,
Executors.newFixedThreadPool(1),
new MetadataApplier() {
@Override
public boolean acceptsSchemaEvolutionType(
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.FatalExitExceptionHandler;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/** Provider of {@link SchemaRegistry}. */
@Internal
Expand Down Expand Up @@ -57,7 +61,55 @@ public OperatorID getOperatorId() {

@Override
public OperatorCoordinator create(OperatorCoordinator.Context context) throws Exception {
CoordinatorExecutorThreadFactory coordinatorThreadFactory =
new CoordinatorExecutorThreadFactory(
"schema-evolution-coordinator", context.getUserCodeClassloader());
ExecutorService coordinatorExecutor =
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
return new SchemaRegistry(
operatorName, context, metadataApplier, routingRules, schemaChangeBehavior);
operatorName,
context,
coordinatorExecutor,
metadataApplier,
routingRules,
schemaChangeBehavior);
}

/** A thread factory class that provides some helper methods. */
public static class CoordinatorExecutorThreadFactory implements ThreadFactory {

private final String coordinatorThreadName;
private final ClassLoader cl;
private final Thread.UncaughtExceptionHandler errorHandler;

private Thread t;

CoordinatorExecutorThreadFactory(
final String coordinatorThreadName, final ClassLoader contextClassLoader) {
this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE);
}

CoordinatorExecutorThreadFactory(
final String coordinatorThreadName,
final ClassLoader contextClassLoader,
final Thread.UncaughtExceptionHandler errorHandler) {
this.coordinatorThreadName = coordinatorThreadName;
this.cl = contextClassLoader;
this.errorHandler = errorHandler;
}

@Override
public synchronized Thread newThread(Runnable r) {
if (t != null) {
throw new Error(
"This indicates that a fatal error has happened and caused the "
+ "coordinator executor thread to exit. Check the earlier logs"
+ "to see the root cause of the problem.");
}
t = new Thread(r, coordinatorThreadName);
t.setContextClassLoader(cl);
t.setUncaughtExceptionHandler(errorHandler);
return t;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -95,15 +96,19 @@ public class SchemaRegistryRequestHandler implements Closeable {

private final SchemaChangeBehavior schemaChangeBehavior;

private final OperatorCoordinator.Context context;

public SchemaRegistryRequestHandler(
MetadataApplier metadataApplier,
SchemaManager schemaManager,
SchemaDerivation schemaDerivation,
SchemaChangeBehavior schemaChangeBehavior) {
SchemaChangeBehavior schemaChangeBehavior,
OperatorCoordinator.Context context) {
this.metadataApplier = metadataApplier;
this.schemaManager = schemaManager;
this.schemaDerivation = schemaDerivation;
this.schemaChangeBehavior = schemaChangeBehavior;
this.context = context;

this.activeSinkWriters = ConcurrentHashMap.newKeySet();
this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
Expand All @@ -123,8 +128,8 @@ public SchemaRegistryRequestHandler(
*
* @param request the received SchemaChangeRequest
*/
public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
SchemaChangeRequest request) {
public void handleSchemaChangeRequest(
SchemaChangeRequest request, CompletableFuture<CoordinationResponse> response) {

// We use requester subTask ID as the pending ticket, because there will be at most 1 schema
// change requests simultaneously from each subTask
Expand Down Expand Up @@ -157,7 +162,8 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
pendingSubTaskIds.add(requestSubTaskId);
}
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
response.complete(wrap(SchemaChangeResponse.busy()));
return;
}

SchemaChangeEvent event = request.getSchemaChangeEvent();
Expand All @@ -169,8 +175,8 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
request);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.duplicate()));
response.complete(wrap(SchemaChangeResponse.duplicate()));
return;
}
schemaManager.applyOriginalSchemaChange(event);
List<SchemaChangeEvent> derivedSchemaChangeEvents =
Expand All @@ -184,7 +190,9 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));

response.complete(wrap(SchemaChangeResponse.ignored()));
return;
}

LOG.info(
Expand All @@ -206,8 +214,8 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
}
});
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
return CompletableFuture.completedFuture(
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));

response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
} else {
LOG.info(
"Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).",
Expand All @@ -217,7 +225,7 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
pendingSubTaskIds.add(requestSubTaskId);
}
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
response.complete(wrap(SchemaChangeResponse.busy()));
}
}
}
Expand Down Expand Up @@ -314,7 +322,7 @@ public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {
}
}

public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
public void getSchemaChangeResult(CompletableFuture<CoordinationResponse> response) {
Preconditions.checkState(
schemaChangeStatus != RequestStatus.IDLE,
"Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
Expand All @@ -326,11 +334,12 @@ public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {

// This request has been finished, return it and prepare for the next request
List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest();
return CompletableFuture.supplyAsync(
() -> wrap(new SchemaChangeResultResponse(finishedEvents)));
SchemaChangeResultResponse resultResponse =
new SchemaChangeResultResponse(finishedEvents);
response.complete(wrap(resultResponse));
} else {
// Still working on schema change request, waiting it
return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse()));
response.complete(wrap(new SchemaChangeProcessingResponse()));
}
}

Expand Down Expand Up @@ -459,7 +468,8 @@ private boolean shouldIgnoreException(Throwable throwable) {

private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
if (currentChangeException != null) {
throw new RuntimeException("Failed to apply schema change.", currentChangeException);
context.failJob(
new RuntimeException("Failed to apply schema change.", currentChangeException));
}
List<SchemaChangeEvent> finishedSchemaChanges =
new ArrayList<>(currentFinishedSchemaChanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
Expand Down Expand Up @@ -1039,11 +1040,16 @@ tableId, buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)),
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"height", DOUBLE, "Height data")))));
Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents))
processEvent(schemaOperator, addColumnEvents);
Assertions.assertThat(harness.isJobFailed()).isEqualTo(true);
Assertions.assertThat(harness.getJobFailureCause())
.cause()
.cause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to apply schema change");
.isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class)
.matches(
e ->
((UnsupportedSchemaChangeEventException) e)
.getExceptionMessage()
.equals("Sink doesn't support such schema change event."));
harness.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
Expand All @@ -56,6 +55,7 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.Executors;

import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.unwrap;

Expand All @@ -81,6 +81,7 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
private final SchemaRegistry schemaRegistry;
private final TestingSchemaRegistryGateway schemaRegistryGateway;
private final LinkedList<StreamRecord<E>> outputRecords = new LinkedList<>();
private final MockedOperatorCoordinatorContext mockedContext;

public EventOperatorTestHarness(OP operator, int numOutputs) {
this(operator, numOutputs, null, SchemaChangeBehavior.EVOLVE);
Expand All @@ -94,11 +95,14 @@ public EventOperatorTestHarness(
OP operator, int numOutputs, Duration duration, SchemaChangeBehavior behavior) {
this.operator = operator;
this.numOutputs = numOutputs;
this.mockedContext =
new MockedOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
new MockOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
mockedContext,
Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(duration),
new ArrayList<>(),
behavior);
Expand All @@ -113,11 +117,14 @@ public EventOperatorTestHarness(
Set<SchemaChangeEventType> enabledEventTypes) {
this.operator = operator;
this.numOutputs = numOutputs;
this.mockedContext =
new MockedOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
new MockOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
mockedContext,
Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(duration, enabledEventTypes),
new ArrayList<>(),
behavior);
Expand All @@ -133,11 +140,14 @@ public EventOperatorTestHarness(
Set<SchemaChangeEventType> errorsOnEventTypes) {
this.operator = operator;
this.numOutputs = numOutputs;
this.mockedContext =
new MockedOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
new MockOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
mockedContext,
Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(
duration, enabledEventTypes, errorsOnEventTypes),
new ArrayList<>(),
Expand Down Expand Up @@ -196,6 +206,14 @@ public Schema getLatestEvolvedSchema(TableId tableId) throws Exception {
.orElse(null);
}

public boolean isJobFailed() {
return mockedContext.isJobFailed();
}

public Throwable getJobFailureCause() {
return mockedContext.getFailureCause();
}

@Override
public void close() throws Exception {
operator.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.runtime.testutils.operators;

import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;

/**
* This is a mocked version of Operator coordinator context that stores failure cause for testing
* purposes only.
*/
public class MockedOperatorCoordinatorContext extends MockOperatorCoordinatorContext {
public MockedOperatorCoordinatorContext(
OperatorID operatorID, ClassLoader userCodeClassLoader) {
super(operatorID, userCodeClassLoader);
}

private Throwable failureCause;

@Override
public void failJob(Throwable cause) {
super.failJob(cause);
failureCause = cause;
}

public Throwable getFailureCause() {
return failureCause;
}
}

0 comments on commit a876af2

Please sign in to comment.