-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix debezium reader thread lake #1358
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your work, @Matrix42 . I have left some comments.
// while loop in MySqlStreamingChangeEventSource's execute method | ||
currentTaskRunning = false; | ||
executor.shutdown(); | ||
executor.awaitTermination(5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This timeout should be saved as a constant.
// while loop in MySqlStreamingChangeEventSource's execute method | ||
currentTaskRunning = false; | ||
executor.shutdown(); | ||
executor.awaitTermination(5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could add a log when the awaitTermination
method failed.
@@ -330,6 +331,8 @@ public void close() { | |||
if (statefulTaskContext.getBinaryLogClient() != null) { | |||
statefulTaskContext.getBinaryLogClient().disconnect(); | |||
} | |||
executor.shutdown(); | |||
executor.awaitTermination(5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same commnets in the BinlogSplitReader.java
.
.../src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
Show resolved
Hide resolved
Hi @ruanhang1993! Thanks for your review. I'v addressed the issue you mentioned, I'll rebase the master when the compile problem resolved. |
Hi @ruanhang1993! Please take a look when you are available. The test failed is same as #1144 CC @leonardBang |
executor.shutdown(); | ||
boolean isShutdown = | ||
executor.awaitTermination( | ||
EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS); | ||
if (!isShutdown) { | ||
LOG.warn("The thread executor of SnapshotSplitReader wasn't shutdown properly."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check not NULL before call it.
executor.shutdown(); | |
boolean isShutdown = | |
executor.awaitTermination( | |
EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS); | |
if (!isShutdown) { | |
LOG.warn("The thread executor of SnapshotSplitReader wasn't shutdown properly."); | |
} | |
if (executorService != null) { | |
executorService.shutdown(); | |
if (!executorService.awaitTermination( | |
READER_CLOSE_TIMEOUT, TimeUnit.MILLISECONDS)) { | |
LOG.warn( | |
"Failed to close the snapshot split reader in {} ms.", | |
READER_CLOSE_TIMEOUT); | |
} | |
} |
@@ -86,6 +87,8 @@ public class SnapshotSplitReader implements DebeziumReader<SourceRecord, MySqlSp | |||
public AtomicBoolean hasNextElement; | |||
public AtomicBoolean reachEnd; | |||
|
|||
private static final long EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private static final long EXECUTOR_AWAIT_TERMINATION_TIMEOUT_SECONDS = 5; | |
private static final long READER_CLOSE_TIMEOUT = 30L; |
And let's rename executor
to executorService
for more precise meaning.
@@ -86,4 +89,11 @@ public static void assertEqualsInOrder(List<String> expected, List<String> actua | |||
assertEquals(expected.size(), actual.size()); | |||
assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); | |||
} | |||
|
|||
public static void assertExecutorIsTerminated(DebeziumReader<?, ?> reader) throws Exception { | |||
Field executorField = reader.getClass().getDeclaredField("executor"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use reflection is not cool here, if you really want to check executorService
member of SnapshotReader
, you can expose a method in SnapshotReader
like:
@VisibleForTesting
public ExecutorService getExecutorService() {
return executorService;
}
executorService.shutdown(); | ||
if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { | ||
LOG.warn( | ||
"Failed to close the binlog split reader in {} ms.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Failed to close the binlog split reader in {} ms.", | |
"Failed to close the binlog split reader in {} seconds.", |
executorService.shutdown(); | ||
if (executorService.awaitTermination(READER_CLOSE_TIMEOUT, TimeUnit.SECONDS)) { | ||
LOG.warn( | ||
"Failed to close the snapshot split reader in {} ms.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Failed to close the snapshot split reader in {} ms.", | |
"Failed to close the snapshot split reader in {} seconds.", |
assertNull(snapshotSplitReader.getExecutorService()); | ||
assertTrue(snapshotSplitReader.getExecutorService().isTerminated()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
? assertNotNull
assertNull(snapshotSplitReader.getExecutorService()); | ||
assertTrue(snapshotSplitReader.getExecutorService().isTerminated()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
? assertNotNull
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I uesd wrong method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Matrix42 for the timely update, LGTM, let's wait the CI pass.
Thanks @leonardBang for the review! |
this fix #1350