diff --git a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java index 6a416469c..53379ec21 100644 --- a/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java +++ b/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConnector.java @@ -11,6 +11,7 @@ import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.rocketmq.client.exception.MQClientException; @@ -43,6 +44,7 @@ public class JdbcSinkConnector extends SinkConnector { private volatile boolean adminStarted; + private ScheduledFuture listenerHandle; public JdbcSinkConnector() { topicRouteMap = new HashMap<>(); dbConnectorConfig = new SinkDbConnectorConfig(); @@ -94,7 +96,7 @@ public void start() { } public void startListener() { - executor.scheduleAtFixedRate(new Runnable() { + listenerHandle = executor.scheduleAtFixedRate(new Runnable() { boolean first = true; HashMap> origin = null; @@ -169,9 +171,10 @@ public void buildRoute() { } } + @Override public void stop() { - + listenerHandle.cancel(true); } @Override