From b631183a6a636737667c1bdf5e96f1d0fdf3ea45 Mon Sep 17 00:00:00 2001 From: Matrix42 <934336389@qq.com> Date: Sun, 10 Jul 2022 23:55:30 +0800 Subject: [PATCH 1/6] fix SnapshotSplitReader and BinlogSplitReader thread lake --- .../mysql/debezium/reader/BinlogSplitReader.java | 6 ++++++ .../mysql/debezium/reader/SnapshotSplitReader.java | 3 +++ .../mysql/debezium/reader/BinlogSplitReaderTest.java | 5 +++++ .../debezium/reader/SnapshotSplitReaderTest.java | 4 ++++ .../connectors/mysql/source/MySqlSourceTestBase.java | 11 +++++++++++ 5 files changed, 29 insertions(+) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 76492ff853..c3f8de1658 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -51,6 +51,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSplitKey; @@ -176,6 +177,11 @@ public void close() { if (statefulTaskContext.getBinaryLogClient() != null) { statefulTaskContext.getBinaryLogClient().disconnect(); } + // set currentTaskRunning to false to terminate the + // while loop in MySqlStreamingChangeEventSource's execute method + currentTaskRunning = false; + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.SECONDS); } catch (Exception e) { LOG.error("Close binlog reader error", e); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index ec54d46e3e..feae7db2ce 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -54,6 +54,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.formatMessageTimestamp; @@ -328,6 +329,8 @@ public void close() { if (statefulTaskContext.getBinaryLogClient() != null) { statefulTaskContext.getBinaryLogClient().disconnect(); } + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.SECONDS); } catch (Exception e) { LOG.error("Close snapshot reader error", e); } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 39524112d8..c9257999bb 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -441,6 +441,7 @@ private List readBinlogSplits( } } } + snapshotSplitReader.close(); // step-2: create binlog split according the finished snapshot splits List finishedSplitsInfo = @@ -483,6 +484,10 @@ private List readBinlogSplits( pollRecordsFromReader(binlogReader, RecordUtils::isDataChangeRecord), dataType)); } + binlogReader.close(); + + assertExecutorIsTerminated(binlogReader); + return actual; } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 9433f2e5eb..1a44f42e09 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -411,6 +411,10 @@ private List readTableSnapshotSplits( if (binaryLogClient != null) { binaryLogClient.disconnect(); } + snapshotSplitReader.close(); + + assertExecutorIsTerminated(snapshotSplitReader); + return formatResult(result, dataType); } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java index 82f13f82eb..3e855f27e6 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.mysql.source; +import com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader; import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.util.MiniClusterWithClientResource; @@ -30,7 +31,9 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; +import java.lang.reflect.Field; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -86,4 +89,12 @@ public static void assertEqualsInOrder(List expected, List actua assertEquals(expected.size(), actual.size()); assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); } + + public static void assertExecutorIsTerminated(DebeziumReader reader) throws Exception { + Field executorField = reader.getClass().getDeclaredField("executor"); + executorField.setAccessible(true); + ExecutorService executorService = (ExecutorService) executorField.get(reader); + assertTrue(executorService.isTerminated()); + } + } From 1ef57998be757e59d0c3af45720940b6be8624c0 Mon Sep 17 00:00:00 2001 From: Matrix42 <934336389@qq.com> Date: Mon, 11 Jul 2022 00:03:52 +0800 Subject: [PATCH 2/6] fix checkstyle --- .../cdc/connectors/mysql/source/MySqlSourceTestBase.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java index 3e855f27e6..642d9846f3 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java @@ -16,12 +16,12 @@ package com.ververica.cdc.connectors.mysql.source; -import com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader; import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; +import com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader; import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer; import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion; import org.junit.BeforeClass; @@ -96,5 +96,4 @@ public static void assertExecutorIsTerminated(DebeziumReader reader) throw ExecutorService executorService = (ExecutorService) executorField.get(reader); assertTrue(executorService.isTerminated()); } - } From c1e3c938bcc1cdfe6af636ab22b1ec4a7fc345ce Mon Sep 17 00:00:00 2001 From: Matrix42 <934336389@qq.com> Date: Mon, 1 Aug 2022 21:36:10 +0800 Subject: [PATCH 3/6] fix review issue --- .../mysql/debezium/reader/BinlogSplitReader.java | 9 ++++++++- .../mysql/debezium/reader/SnapshotSplitReader.java | 9 ++++++++- .../mysql/debezium/reader/BinlogSplitReaderTest.java | 2 ++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index c3f8de1658..844a3509ed 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -80,6 +80,8 @@ public class BinlogSplitReader implements DebeziumReader pureBinlogPhaseTables; private Tables.TableFilter capturedTableFilter; + private static final long EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS = 5; + public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) { this.statefulTaskContext = statefulTaskContext; ThreadFactory threadFactory = @@ -181,7 +183,12 @@ public void close() { // while loop in MySqlStreamingChangeEventSource's execute method currentTaskRunning = false; executor.shutdown(); - executor.awaitTermination(5, TimeUnit.SECONDS); + boolean isShutdown = + executor.awaitTermination( + EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!isShutdown) { + LOG.warn("The thread executor of BinlogSplitReader wasn't shutdown properly."); + } } catch (Exception e) { LOG.error("Close binlog reader error", e); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index feae7db2ce..9988396b73 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -87,6 +87,8 @@ public class SnapshotSplitReader implements DebeziumReader readBinlogSplits( } snapshotSplitReader.close(); + assertExecutorIsTerminated(snapshotSplitReader); + // step-2: create binlog split according the finished snapshot splits List finishedSplitsInfo = getFinishedSplitsInfo(sqlSplits, snapshotRecords); From ca86b686cdca46d60dc612acfb0747281189e42d Mon Sep 17 00:00:00 2001 From: "deqiang.sun" <934336389@qq.com> Date: Tue, 2 Aug 2022 15:44:14 +0800 Subject: [PATCH 4/6] fix review issue --- .../debezium/reader/BinlogSplitReader.java | 27 ++++++++++++------- .../debezium/reader/SnapshotSplitReader.java | 27 ++++++++++++------- .../reader/BinlogSplitReaderTest.java | 8 ++++-- .../reader/SnapshotSplitReaderTest.java | 4 ++- .../mysql/source/MySqlSourceTestBase.java | 10 ------- 5 files changed, 43 insertions(+), 33 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 844a3509ed..6b14178cc4 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.mysql.debezium.reader; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.FlinkRuntimeException; @@ -66,7 +67,7 @@ public class BinlogSplitReader implements DebeziumReader queue; private volatile boolean currentTaskRunning; @@ -80,13 +81,13 @@ public class BinlogSplitReader implements DebeziumReader pureBinlogPhaseTables; private Tables.TableFilter capturedTableFilter; - private static final long EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS = 5; + private static final long READER_CLOSE_TIMEOUT = 30L; public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subTaskId) { this.statefulTaskContext = statefulTaskContext; ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build(); - this.executor = Executors.newSingleThreadExecutor(threadFactory); + this.executorService = Executors.newSingleThreadExecutor(threadFactory); this.currentTaskRunning = true; this.pureBinlogPhaseTables = new HashSet<>(); } @@ -111,7 +112,7 @@ public void submitSplit(MySqlSplit mySqlSplit) { statefulTaskContext.getStreamingChangeEventSourceMetrics(), currentBinlogSplit); - executor.submit( + executorService.submit( () -> { try { binlogSplitReadTask.execute( @@ -182,12 +183,13 @@ public void close() { // set currentTaskRunning to false to terminate the // while loop in MySqlStreamingChangeEventSource's execute method currentTaskRunning = false; - executor.shutdown(); - boolean isShutdown = - executor.awaitTermination( - EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (!isShutdown) { - LOG.warn("The thread executor of BinlogSplitReader wasn't shutdown properly."); + if (executorService != null) { + executorService.shutdown(); + if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { + LOG.warn( + "Failed to close the binlog split reader in {} ms.", + READER_CLOSE_TIMEOUT); + } } } catch (Exception e) { LOG.error("Close binlog reader error", e); @@ -297,4 +299,9 @@ private void configureFilter() { public void stopBinlogReadTask() { this.currentTaskRunning = false; } + + @VisibleForTesting + public ExecutorService getExecutorService() { + return executorService; + } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index 9988396b73..52c3737155 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.mysql.debezium.reader; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -74,7 +75,7 @@ public class SnapshotSplitReader implements DebeziumReader queue; private volatile boolean currentTaskRunning; @@ -87,13 +88,13 @@ public class SnapshotSplitReader implements DebeziumReader { try { currentTaskRunning = true; @@ -331,18 +332,24 @@ public void close() { if (statefulTaskContext.getBinaryLogClient() != null) { statefulTaskContext.getBinaryLogClient().disconnect(); } - executor.shutdown(); - boolean isShutdown = - executor.awaitTermination( - EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (!isShutdown) { - LOG.warn("The thread executor of SnapshotSplitReader wasn't shutdown properly."); + if (executorService != null) { + executorService.shutdown(); + if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { + LOG.warn( + "Failed to close the snapshot split reader in {} ms.", + READER_CLOSE_TIMEOUT); + } } } catch (Exception e) { LOG.error("Close snapshot reader error", e); } } + @VisibleForTesting + public ExecutorService getExecutorService() { + return executorService; + } + /** * {@link ChangeEventSource.ChangeEventSourceContext} implementation that keeps low/high * watermark for each {@link MySqlSnapshotSplit}. diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 40f7c689a0..29e31300f2 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -65,6 +65,8 @@ import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; /** Tests for {@link BinlogSplitReader}. */ public class BinlogSplitReaderTest extends MySqlSourceTestBase { @@ -443,7 +445,8 @@ private List readBinlogSplits( } snapshotSplitReader.close(); - assertExecutorIsTerminated(snapshotSplitReader); + assertNull(snapshotSplitReader.getExecutorService()); + assertTrue(snapshotSplitReader.getExecutorService().isTerminated()); // step-2: create binlog split according the finished snapshot splits List finishedSplitsInfo = @@ -488,7 +491,8 @@ private List readBinlogSplits( } binlogReader.close(); - assertExecutorIsTerminated(binlogReader); + assertNull(snapshotSplitReader.getExecutorService()); + assertTrue(snapshotSplitReader.getExecutorService().isTerminated()); return actual; } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 1a44f42e09..7ddebf69a0 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -53,6 +53,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -413,7 +414,8 @@ private List readTableSnapshotSplits( } snapshotSplitReader.close(); - assertExecutorIsTerminated(snapshotSplitReader); + assertNull(snapshotSplitReader.getExecutorService()); + assertTrue(snapshotSplitReader.getExecutorService().isTerminated()); return formatResult(result, dataType); } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java index 642d9846f3..82f13f82eb 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java @@ -21,7 +21,6 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.TestLogger; -import com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader; import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer; import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion; import org.junit.BeforeClass; @@ -31,9 +30,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; -import java.lang.reflect.Field; import java.util.List; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -89,11 +86,4 @@ public static void assertEqualsInOrder(List expected, List actua assertEquals(expected.size(), actual.size()); assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); } - - public static void assertExecutorIsTerminated(DebeziumReader reader) throws Exception { - Field executorField = reader.getClass().getDeclaredField("executor"); - executorField.setAccessible(true); - ExecutorService executorService = (ExecutorService) executorField.get(reader); - assertTrue(executorService.isTerminated()); - } } From 580bbca431d39eb3ff9f52dfaa3b646c6d1c4f5a Mon Sep 17 00:00:00 2001 From: "deqiang.sun" <934336389@qq.com> Date: Tue, 2 Aug 2022 16:00:05 +0800 Subject: [PATCH 5/6] fix review issue --- .../mysql/debezium/reader/BinlogSplitReaderTest.java | 6 +++--- .../mysql/debezium/reader/SnapshotSplitReaderTest.java | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 29e31300f2..163d8c392c 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -65,7 +65,7 @@ import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; /** Tests for {@link BinlogSplitReader}. */ @@ -445,7 +445,7 @@ private List readBinlogSplits( } snapshotSplitReader.close(); - assertNull(snapshotSplitReader.getExecutorService()); + assertNotNull(snapshotSplitReader.getExecutorService()); assertTrue(snapshotSplitReader.getExecutorService().isTerminated()); // step-2: create binlog split according the finished snapshot splits @@ -491,7 +491,7 @@ private List readBinlogSplits( } binlogReader.close(); - assertNull(snapshotSplitReader.getExecutorService()); + assertNotNull(snapshotSplitReader.getExecutorService()); assertTrue(snapshotSplitReader.getExecutorService().isTerminated()); return actual; diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 7ddebf69a0..5065b61e2f 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -53,7 +53,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -414,7 +414,7 @@ private List readTableSnapshotSplits( } snapshotSplitReader.close(); - assertNull(snapshotSplitReader.getExecutorService()); + assertNotNull(snapshotSplitReader.getExecutorService()); assertTrue(snapshotSplitReader.getExecutorService().isTerminated()); return formatResult(result, dataType); From 087e61992bdb61375ab752accf1a92b92d86b65f Mon Sep 17 00:00:00 2001 From: "deqiang.sun" <934336389@qq.com> Date: Tue, 2 Aug 2022 16:01:43 +0800 Subject: [PATCH 6/6] fix review issue --- .../cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java | 2 +- .../connectors/mysql/debezium/reader/SnapshotSplitReader.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index 6b14178cc4..dc95119ce0 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -187,7 +187,7 @@ public void close() { executorService.shutdown(); if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { LOG.warn( - "Failed to close the binlog split reader in {} ms.", + "Failed to close the binlog split reader in {} seconds.", READER_CLOSE_TIMEOUT); } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java index 52c3737155..2a8db8de49 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java @@ -336,7 +336,7 @@ public void close() { executorService.shutdown(); if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { LOG.warn( - "Failed to close the snapshot split reader in {} ms.", + "Failed to close the snapshot split reader in {} seconds.", READER_CLOSE_TIMEOUT); } }