Skip to content

Commit

Permalink
[FLINK-36076][minor][cdc-runtime] Set isSchemaChangeApplying as volat…
Browse files Browse the repository at this point in the history
…ile for thread safe consideration

This closes apache#3556.
  • Loading branch information
loserwang1024 authored and qiaozongmi committed Sep 23, 2024
1 parent a3cb355 commit 59ec921
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig());
sourceTranslator.translate(
pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);

// Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
Expand All @@ -41,12 +40,14 @@
public class DataSourceTranslator {

public DataStreamSource<Event> translate(
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
SourceDef sourceDef,
StreamExecutionEnvironment env,
Configuration pipelineConfig,
int sourceParallelism) {
// Create data source
DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig);

// Get source provider
final int sourceParallelism = pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
if (eventSourceProvider instanceof FlinkSourceProvider) {
// Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class SchemaRegistryRequestHandler implements Closeable {
private final Set<Integer> flushedSinkWriters;

/** Status of the execution of current schema change request. */
private boolean isSchemaChangeApplying;
private volatile boolean isSchemaChangeApplying;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;

Expand Down

0 comments on commit 59ec921

Please sign in to comment.