diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 76492ff853..dc95119ce0 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.mysql.debezium.reader; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.FlinkRuntimeException; @@ -51,6 +52,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSplitKey; @@ -65,7 +67,7 @@ public class BinlogSplitReader implements DebeziumReader queue; private volatile boolean currentTaskRunning; @@ -79,11 +81,13 @@ public class BinlogSplitReader implements DebeziumReader pureBinlogPhaseTables; private Tables.TableFilter capturedTableFilter; + private static final long READER_CLOSE_TIMEOUT = 30L; + public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) { this.statefulTaskContext = statefulTaskContext; ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build(); - this.executor = Executors.newSingleThreadExecutor(threadFactory); + this.executorService = Executors.newSingleThreadExecutor(threadFactory); this.currentTaskRunning = true; this.pureBinlogPhaseTables = new HashSet<>(); } @@ -108,7 +112,7 @@ public void submitSplit(MySqlSplit mySqlSplit) { statefulTaskContext.getStreamingChangeEventSourceMetrics(), currentBinlogSplit); - executor.submit( + executorService.submit( () -> { try { binlogSplitReadTask.execute( @@ -176,6 +180,17 @@ public void close() { if (statefulTaskContext.getBinaryLogClient() != null) { statefulTaskContext.getBinaryLogClient().disconnect(); } + // set currentTaskRunning to false to terminate the + // while loop in MySqlStreamingChangeEventSource's execute method + currentTaskRunning = false; + if (executorService != null) { + executorService.shutdown(); + if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { + LOG.warn( + "Failed to close the binlog split reader in {} seconds.", + READER_CLOSE_TIMEOUT); + } + } } catch (Exception e) { LOG.error("Close binlog reader error", e); } @@ -284,4 +299,9 @@ private void configureFilter() { public void stopBinlogReadTask() { this.currentTaskRunning = false; } + + @VisibleForTesting + public ExecutorService getExecutorService() { + return executorService; + } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index ec54d46e3e..2a8db8de49 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.mysql.debezium.reader; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -54,6 +55,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.formatMessageTimestamp; @@ -73,7 +75,7 @@ public class SnapshotSplitReader implements DebeziumReader queue; private volatile boolean currentTaskRunning; @@ -86,11 +88,13 @@ public class SnapshotSplitReader implements DebeziumReader { try { currentTaskRunning = true; @@ -328,11 +332,24 @@ public void close() { if (statefulTaskContext.getBinaryLogClient() != null) { statefulTaskContext.getBinaryLogClient().disconnect(); } + if (executorService != null) { + executorService.shutdown(); + if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { + LOG.warn( + "Failed to close the snapshot split reader in {} seconds.", + READER_CLOSE_TIMEOUT); + } + } } catch (Exception e) { LOG.error("Close snapshot reader error", e); } } + @VisibleForTesting + public ExecutorService getExecutorService() { + return executorService; + } + /** * {@link ChangeEventSource.ChangeEventSourceContext} implementation that keeps low/high * watermark for each {@link MySqlSnapshotSplit}. diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 39524112d8..163d8c392c 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -65,6 +65,8 @@ import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; /** Tests for {@link BinlogSplitReader}. */ public class BinlogSplitReaderTest extends MySqlSourceTestBase { @@ -441,6 +443,10 @@ private List readBinlogSplits( } } } + snapshotSplitReader.close(); + + assertNotNull(snapshotSplitReader.getExecutorService()); + assertTrue(snapshotSplitReader.getExecutorService().isTerminated()); // step-2: create binlog split according the finished snapshot splits List finishedSplitsInfo = @@ -483,6 +489,11 @@ private List readBinlogSplits( pollRecordsFromReader(binlogReader, RecordUtils::isDataChangeRecord), dataType)); } + binlogReader.close(); + + assertNotNull(snapshotSplitReader.getExecutorService()); + assertTrue(snapshotSplitReader.getExecutorService().isTerminated()); + return actual; } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 9433f2e5eb..5065b61e2f 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -53,6 +53,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -411,6 +412,11 @@ private List readTableSnapshotSplits( if (binaryLogClient != null) { binaryLogClient.disconnect(); } + snapshotSplitReader.close(); + + assertNotNull(snapshotSplitReader.getExecutorService()); + assertTrue(snapshotSplitReader.getExecutorService().isTerminated()); + return formatResult(result, dataType); }