diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java index 64df710c4d..c5cdd3105c 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java @@ -141,22 +141,25 @@ public void open(final Configuration config) throws Exception { @Override public void run(SourceContext ctx) throws Exception { outputCollector.context = ctx; + try { + LOG.info("Start to initial table whitelist"); + initTableWhiteList(); + + LOG.info("Start readChangeRecords process"); + readChangeRecords(); + + if (shouldReadSnapshot()) { + LOG.info("Snapshot reading started"); + readSnapshotRecords(); + LOG.info("Snapshot reading finished"); + } else { + LOG.info("Snapshot reading skipped"); + } - LOG.info("Start to initial table whitelist"); - initTableWhiteList(); - - LOG.info("Start readChangeRecords process"); - readChangeRecords(); - - if (shouldReadSnapshot()) { - LOG.info("Snapshot reading started"); - readSnapshotRecords(); - LOG.info("Snapshot reading finished"); - } else { - LOG.info("Snapshot reading skipped"); + logProxyClient.join(); + } finally { + cancel(); } - - logProxyClient.join(); } private boolean shouldReadSnapshot() {