Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix debezium reader thread lake #1358

Merged
merged 6 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ververica.cdc.connectors.mysql.debezium.reader;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.FlinkRuntimeException;

Expand Down Expand Up @@ -51,6 +52,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSplitKey;
Expand All @@ -65,7 +67,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli

private static final Logger LOG = LoggerFactory.getLogger(BinlogSplitReader.class);
private final StatefulTaskContext statefulTaskContext;
private final ExecutorService executor;
private final ExecutorService executorService;

private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile boolean currentTaskRunning;
Expand All @@ -79,11 +81,13 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecord, MySqlSpli
private final Set<TableId> pureBinlogPhaseTables;
private Tables.TableFilter capturedTableFilter;

private static final long READER_CLOSE_TIMEOUT = 30L;

public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) {
this.statefulTaskContext = statefulTaskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = true;
this.pureBinlogPhaseTables = new HashSet<>();
}
Expand All @@ -108,7 +112,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
currentBinlogSplit);

executor.submit(
executorService.submit(
() -> {
try {
binlogSplitReadTask.execute(
Expand Down Expand Up @@ -176,6 +180,17 @@ public void close() {
if (statefulTaskContext.getBinaryLogClient() != null) {
statefulTaskContext.getBinaryLogClient().disconnect();
}
// set currentTaskRunning to false to terminate the
// while loop in MySqlStreamingChangeEventSource's execute method
currentTaskRunning = false;
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
LOG.warn(
"Failed to close the binlog split reader in {} seconds.",
READER_CLOSE_TIMEOUT);
}
}
} catch (Exception e) {
LOG.error("Close binlog reader error", e);
}
Expand Down Expand Up @@ -284,4 +299,9 @@ private void configureFilter() {
public void stopBinlogReadTask() {
this.currentTaskRunning = false;
}

@VisibleForTesting
public ExecutorService getExecutorService() {
return executorService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.ververica.cdc.connectors.mysql.debezium.reader;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.formatMessageTimestamp;
Expand All @@ -73,7 +75,7 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp

private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitReader.class);
private final StatefulTaskContext statefulTaskContext;
private final ExecutorService executor;
private final ExecutorService executorService;

private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile boolean currentTaskRunning;
Expand All @@ -86,11 +88,13 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp
public AtomicBoolean hasNextElement;
public AtomicBoolean reachEnd;

private static final long READER_CLOSE_TIMEOUT = 30L;

public SnapshotSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId) {
this.statefulTaskContext = statefulTaskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subtaskId).build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.currentTaskRunning = false;
this.hasNextElement = new AtomicBoolean(false);
this.reachEnd = new AtomicBoolean(false);
Expand All @@ -114,7 +118,7 @@ public void submitSplit(MySqlSplit mySqlSplit) {
statefulTaskContext.getSnapshotReceiver(),
StatefulTaskContext.getClock(),
currentSnapshotSplit);
executor.submit(
executorService.submit(
() -> {
try {
currentTaskRunning = true;
Expand Down Expand Up @@ -328,11 +332,24 @@ public void close() {
if (statefulTaskContext.getBinaryLogClient() != null) {
statefulTaskContext.getBinaryLogClient().disconnect();
}
if (executorService != null) {
executorService.shutdown();
if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
LOG.warn(
"Failed to close the snapshot split reader in {} seconds.",
READER_CLOSE_TIMEOUT);
}
}
} catch (Exception e) {
LOG.error("Close snapshot reader error", e);
}
}

@VisibleForTesting
public ExecutorService getExecutorService() {
return executorService;
}

/**
* {@link ChangeEventSource.ChangeEventSourceContext} implementation that keeps low/high
* watermark for each {@link MySqlSnapshotSplit}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

/** Tests for {@link BinlogSplitReader}. */
public class BinlogSplitReaderTest extends MySqlSourceTestBase {
Expand Down Expand Up @@ -441,6 +443,10 @@ private List<String> readBinlogSplits(
}
}
}
snapshotSplitReader.close();
leonardBang marked this conversation as resolved.
Show resolved Hide resolved

assertNotNull(snapshotSplitReader.getExecutorService());
assertTrue(snapshotSplitReader.getExecutorService().isTerminated());

// step-2: create binlog split according the finished snapshot splits
List<FinishedSnapshotSplitInfo> finishedSplitsInfo =
Expand Down Expand Up @@ -483,6 +489,11 @@ private List<String> readBinlogSplits(
pollRecordsFromReader(binlogReader, RecordUtils::isDataChangeRecord),
dataType));
}
binlogReader.close();

assertNotNull(snapshotSplitReader.getExecutorService());
assertTrue(snapshotSplitReader.getExecutorService().isTerminated());

return actual;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -411,6 +412,11 @@ private List<String> readTableSnapshotSplits(
if (binaryLogClient != null) {
binaryLogClient.disconnect();
}
snapshotSplitReader.close();

assertNotNull(snapshotSplitReader.getExecutorService());
assertTrue(snapshotSplitReader.getExecutorService().isTerminated());

return formatResult(result, dataType);
}

Expand Down