From 0b69126d525940739b65b5c72bf2e8905efee5a6 Mon Sep 17 00:00:00 2001 From: Jiabao Sun Date: Tue, 30 May 2023 02:29:52 +0800 Subject: [PATCH] [cdc-base] Close idle readers when snapshot finished --- .../base/config/BaseSourceConfig.java | 8 ++ .../base/config/JdbcSourceConfig.java | 2 + .../base/config/JdbcSourceConfigFactory.java | 27 +++++ .../connectors/base/config/SourceConfig.java | 6 +- .../base/options/SourceOptions.java | 12 ++ .../base/source/IncrementalSource.java | 1 + .../IncrementalSourceEnumerator.java | 5 + .../base/utils/EnvironmentUtils.java | 57 ++++++++++ .../base/utils/VersionComparable.java | 107 ++++++++++++++++++ .../base/experimental/MySqlSourceBuilder.java | 15 +++ .../config/MySqlSourceConfig.java | 2 + .../config/MySqlSourceConfigFactory.java | 1 + .../cdc/connectors/tests/MongoE2eITCase.java | 7 +- .../mongodb/source/MongoDBSourceBuilder.java | 18 ++- .../source/config/MongoDBSourceConfig.java | 14 ++- .../config/MongoDBSourceConfigFactory.java | 34 +++++- .../mongodb/table/MongoDBTableSource.java | 19 +++- .../table/MongoDBTableSourceFactory.java | 6 +- .../table/MongoDBTableFactoryTest.java | 13 ++- .../oracle/source/OracleSourceBuilder.java | 15 +++ .../source/config/OracleSourceConfig.java | 2 + .../config/OracleSourceConfigFactory.java | 1 + .../oracle/table/OracleTableSource.java | 15 ++- .../table/OracleTableSourceFactory.java | 7 +- .../table/OracleTableSourceFactoryTest.java | 24 ++-- 25 files changed, 386 insertions(+), 32 deletions(-) create mode 100644 flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/EnvironmentUtils.java create mode 100644 flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/VersionComparable.java diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java index 824dac83a7..46b95b39ce 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/BaseSourceConfig.java @@ -33,6 +33,7 @@ public abstract class BaseSourceConfig implements SourceConfig { protected final double distributionFactorUpper; protected final double distributionFactorLower; protected final boolean includeSchemaChanges; + protected final boolean closeIdleReaders; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -47,6 +48,7 @@ public BaseSourceConfig( double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean closeIdleReaders, Properties dbzProperties, Configuration dbzConfiguration) { this.startupOptions = startupOptions; @@ -55,6 +57,7 @@ public BaseSourceConfig( this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; this.includeSchemaChanges = includeSchemaChanges; + this.closeIdleReaders = closeIdleReaders; this.dbzProperties = dbzProperties; this.dbzConfiguration = dbzConfiguration; } @@ -87,6 +90,11 @@ public boolean isIncludeSchemaChanges() { return includeSchemaChanges; } + @Override + public boolean isCloseIdleReaders() { + return closeIdleReaders; + } + public Properties getDbzProperties() { return dbzProperties; } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfig.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfig.java index 3b5fa905c0..27dbf95452 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfig.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfig.java @@ -53,6 +53,7 @@ public JdbcSourceConfig( double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean closeIdleReaders, Properties dbzProperties, Configuration dbzConfiguration, String driverClassName, @@ -73,6 +74,7 @@ public JdbcSourceConfig( distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, dbzProperties, dbzConfiguration); this.driverClassName = driverClassName; diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java index d7f1cd1e34..09f7978e31 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/JdbcSourceConfigFactory.java @@ -17,6 +17,7 @@ package com.ververica.cdc.connectors.base.config; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; import com.ververica.cdc.connectors.base.config.SourceConfig.Factory; import com.ververica.cdc.connectors.base.options.JdbcSourceOptions; @@ -28,6 +29,8 @@ import java.util.List; import java.util.Properties; +import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.requireCheckpointsAfterTasksFinished; + /** A {@link Factory} to provide {@link SourceConfig} of JDBC data source. */ @Internal public abstract class JdbcSourceConfigFactory implements Factory { @@ -42,6 +45,7 @@ public abstract class JdbcSourceConfigFactory implements Factory tableList; protected StartupOptions startupOptions = StartupOptions.initial(); protected boolean includeSchemaChanges = false; + protected boolean closeIdleReaders = false; protected double distributionFactorUpper = SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(); protected double distributionFactorLower = @@ -209,6 +213,29 @@ public JdbcSourceConfigFactory startupOptions(StartupOptions startupOptions) { return this; } + /** + * Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public JdbcSourceConfigFactory closeIdleReaders(boolean closeIdleReaders) { + this.closeIdleReaders = closeIdleReaders; + return this; + } + + @Override + public JdbcSourceConfig create(int subtask, Configuration configuration) { + if (closeIdleReaders) { + requireCheckpointsAfterTasksFinished(configuration); + } + return create(subtask); + } + @Override public abstract JdbcSourceConfig create(int subtask); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/SourceConfig.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/SourceConfig.java index 5dcd2b5328..3a01257981 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/SourceConfig.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/config/SourceConfig.java @@ -17,6 +17,7 @@ package com.ververica.cdc.connectors.base.config; import org.apache.flink.annotation.Experimental; +import org.apache.flink.configuration.Configuration; import com.ververica.cdc.connectors.base.options.StartupOptions; @@ -34,10 +35,13 @@ public interface SourceConfig extends Serializable { boolean isIncludeSchemaChanges(); + boolean isCloseIdleReaders(); + /** Factory for the {@code SourceConfig}. */ - @FunctionalInterface interface Factory extends Serializable { + C create(int subtask, Configuration configuration); + C create(int subtask); } } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java index 4c4ed87106..55e61d90f0 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java @@ -16,6 +16,7 @@ package com.ververica.cdc.connectors.base.options; +import org.apache.flink.annotation.Experimental; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; @@ -111,4 +112,15 @@ public class SourceOptions { + " The table chunks would use evenly calculation optimization when the data distribution is even," + " and the query for splitting would happen when it is uneven." + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = + ConfigOptions.key("scan.incremental.close-idle-reader.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to close idle readers at the end of the snapshot phase. This feature depends on " + + "FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be " + + "greater than or equal to 1.14, and the configuration 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to" + + "true."); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java index d7b37517fc..db95261abd 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java @@ -103,6 +103,7 @@ public IncrementalSourceReader createReader(SourceReaderContext readerCont throws Exception { // create source config for the given subtask (e.g. unique server id) C sourceConfig = configFactory.create(readerContext.getIndexOfSubtask()); + FutureCompletingBlockingQueue> elementsQueue = new FutureCompletingBlockingQueue<>(); diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java index c8cf10c9fb..269d6cff37 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/enumerator/IncrementalSourceEnumerator.java @@ -169,6 +169,11 @@ private void assignSplits() { context.assignSplit(sourceSplit, nextAwaiting); awaitingReader.remove(); LOG.info("Assign split {} to subtask {}", sourceSplit, nextAwaiting); + } else if (sourceConfig.isCloseIdleReaders()) { + // close idle readers when snapshot phase finished. + context.signalNoMoreSplits(nextAwaiting); + awaitingReader.remove(); + LOG.info("Close idle reader of subtask {}", nextAwaiting); } else { // there is no available splits by now, skip assigning break; diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/EnvironmentUtils.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/EnvironmentUtils.java new file mode 100644 index 0000000000..bfc07e0178 --- /dev/null +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/EnvironmentUtils.java @@ -0,0 +1,57 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.base.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.EnvironmentInformation; + +/** Utilities for environment information at runtime. */ +public class EnvironmentUtils { + + private EnvironmentUtils() {} + + private static final VersionComparable FLINK_1_14 = VersionComparable.fromVersionString("1.14"); + + private static final String ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH = + "execution.checkpointing.checkpoints-after-tasks-finish.enabled"; + + public static VersionComparable runtimeFlinkVersion() { + return VersionComparable.fromVersionString(EnvironmentInformation.getVersion()); + } + + public static void requireCheckpointsAfterTasksFinished(Configuration configuration) { + if (!supportCheckpointsAfterTasksFinished()) { + throw new UnsupportedOperationException( + "To enabled checkpoints after tasks finished requires flink version greater than or equal to 1.14, current version is " + + runtimeFlinkVersion()); + } + if (!enableCheckPointsAfterTaskFinishes(configuration)) { + throw new IllegalArgumentException( + String.format( + "To enabled checkpoints after tasks finished requires '%s' set to true.", + ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH)); + } + } + + public static boolean supportCheckpointsAfterTasksFinished() { + return runtimeFlinkVersion().newerThanOrEqualTo(FLINK_1_14); + } + + public static boolean enableCheckPointsAfterTaskFinishes(Configuration configuration) { + return configuration.getBoolean(ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false); + } +} diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/VersionComparable.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/VersionComparable.java new file mode 100644 index 0000000000..700c66be13 --- /dev/null +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/VersionComparable.java @@ -0,0 +1,107 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.base.utils; + +/** Used to compare version numbers at runtime. */ +public class VersionComparable implements Comparable { + + private int majorVersion; + private int minorVersion; + private int patchVersion; + private String versionString; + + public VersionComparable(String versionString) { + this.versionString = versionString; + try { + int pos = versionString.indexOf('-'); + String numberPart = versionString; + if (pos > 0) { + numberPart = versionString.substring(0, pos); + } + + String[] versions = numberPart.split("\\."); + + this.majorVersion = Integer.parseInt(versions[0]); + this.minorVersion = Integer.parseInt(versions[1]); + if (versions.length == 3) { + this.patchVersion = Integer.parseInt(versions[2]); + } + } catch (Exception e) { + throw new IllegalArgumentException( + String.format("Can not recognize version %s.", versionString)); + } + } + + public int getMajorVersion() { + return majorVersion; + } + + public int getMinorVersion() { + return minorVersion; + } + + public int getPatchVersion() { + return patchVersion; + } + + public static VersionComparable fromVersionString(String versionString) { + return new VersionComparable(versionString); + } + + @Override + public int compareTo(VersionComparable version) { + if (equalTo(version)) { + return 0; + } else if (newerThan(version)) { + return 1; + } else { + return -1; + } + } + + public boolean equalTo(VersionComparable version) { + return majorVersion == version.majorVersion + && minorVersion == version.minorVersion + && patchVersion == version.patchVersion; + } + + public boolean newerThan(VersionComparable version) { + if (majorVersion <= version.majorVersion) { + if (majorVersion < version.majorVersion) { + return false; + } else { + if (minorVersion <= version.minorVersion) { + if (minorVersion < version.patchVersion) { + return false; + } else { + return patchVersion > version.patchVersion; + } + } + } + } + return true; + } + + public boolean newerThanOrEqualTo(VersionComparable version) { + return newerThan(version) || equalTo(version); + } + + @Override + public String toString() { + return versionString; + } +} diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlSourceBuilder.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlSourceBuilder.java index 187dea6cd6..8933a25ca6 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlSourceBuilder.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlSourceBuilder.java @@ -205,6 +205,21 @@ public MySqlSourceBuilder debeziumProperties(Properties properties) { return this; } + /** + * Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public MySqlSourceBuilder closeIdleReaders(boolean closeIdleReaders) { + this.configFactory.closeIdleReaders(closeIdleReaders); + return this; + } + /** * The deserializer used to convert from consumed {@link * org.apache.kafka.connect.source.SourceRecord}. diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfig.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfig.java index dcd6fcaa8c..4e79dd0c62 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfig.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfig.java @@ -45,6 +45,7 @@ public MySqlSourceConfig( double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean closeIdleReaders, Properties dbzProperties, Configuration dbzConfiguration, String driverClassName, @@ -66,6 +67,7 @@ public MySqlSourceConfig( distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, dbzProperties, dbzConfiguration, driverClassName, diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java index db291aab91..6f68da9ae0 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java @@ -113,6 +113,7 @@ public MySqlSourceConfig create(int subtaskId) { distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, props, dbzConfiguration, driverClassName, diff --git a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java index e416525ccd..1fa89260bb 100644 --- a/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java +++ b/flink-cdc-e2e-tests/src/test/java/com/ververica/cdc/connectors/tests/MongoE2eITCase.java @@ -46,6 +46,7 @@ import java.util.Random; import java.util.stream.Stream; +import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.supportCheckpointsAfterTasksFinished; import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER; import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER_PASSWORD; import static com.ververica.cdc.connectors.mongodb.utils.MongoDBContainer.MONGODB_PORT; @@ -141,6 +142,7 @@ public void testMongoDbCDC() throws Exception { List sqlLines = Arrays.asList( "SET 'execution.checkpointing.interval' = '3s';", + "SET 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true';", "CREATE TABLE products_source (", " _id STRING NOT NULL,", " name STRING,", @@ -156,7 +158,10 @@ public void testMongoDbCDC() throws Exception { " 'password' = '" + FLINK_USER_PASSWORD + "',", " 'collection' = 'products',", " 'heartbeat.interval.ms' = '1000',", - " 'scan.incremental.snapshot.enabled' = '" + parallelismSnapshot + "'", + " 'scan.incremental.snapshot.enabled' = '" + parallelismSnapshot + "',", + " 'scan.incremental.close-idle-reader.enabled' = '" + + supportCheckpointsAfterTasksFinished() + + "'", ");", "CREATE TABLE mongodb_products_sink (", " `id` STRING NOT NULL,", diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java index 54281e1408..f104e3902d 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java @@ -177,6 +177,23 @@ public MongoDBSourceBuilder splitMetaGroupSize(int splitMetaGroupSize) { return this; } + /** + * scan.incremental.close-idle-reader.enabled + * + *

Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public MongoDBSourceBuilder closeIdleReaders(boolean closeIdleReaders) { + this.configFactory.closeIdleReaders(closeIdleReaders); + return this; + } + /** * The deserializer used to convert from consumed {@link * org.apache.kafka.connect.source.SourceRecord}. @@ -192,7 +209,6 @@ public MongoDBSourceBuilder deserializer(DebeziumDeserializationSchema des * @return a MongoDBParallelSource with the settings made for this builder. */ public MongoDBSource build() { - configFactory.validate(); return new MongoDBSource<>(configFactory, checkNotNull(deserializer)); } } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java index 6e8134204b..0ba72c2c56 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java @@ -47,6 +47,7 @@ public class MongoDBSourceConfig implements SourceConfig { private final int heartbeatIntervalMillis; private final int splitMetaGroupSize; private final int splitSizeMB; + private final boolean closeIdleReaders; MongoDBSourceConfig( String hosts, @@ -62,7 +63,8 @@ public class MongoDBSourceConfig implements SourceConfig { StartupOptions startupOptions, int heartbeatIntervalMillis, int splitMetaGroupSize, - int splitSizeMB) { + int splitSizeMB, + boolean closeIdleReaders) { this.hosts = checkNotNull(hosts); this.username = username; this.password = password; @@ -79,6 +81,7 @@ public class MongoDBSourceConfig implements SourceConfig { this.heartbeatIntervalMillis = heartbeatIntervalMillis; this.splitMetaGroupSize = splitMetaGroupSize; this.splitSizeMB = splitSizeMB; + this.closeIdleReaders = closeIdleReaders; } public String getHosts() { @@ -149,6 +152,11 @@ public boolean isIncludeSchemaChanges() { return false; } + @Override + public boolean isCloseIdleReaders() { + return closeIdleReaders; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -166,6 +174,7 @@ public boolean equals(Object o) { && heartbeatIntervalMillis == that.heartbeatIntervalMillis && splitMetaGroupSize == that.splitMetaGroupSize && splitSizeMB == that.splitSizeMB + && closeIdleReaders == that.closeIdleReaders && Objects.equals(hosts, that.hosts) && Objects.equals(username, that.username) && Objects.equals(password, that.password) @@ -190,6 +199,7 @@ public int hashCode() { startupOptions, heartbeatIntervalMillis, splitMetaGroupSize, - splitSizeMB); + splitSizeMB, + closeIdleReaders); } } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java index ec1cd08814..4d4aa947c4 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java @@ -17,6 +17,7 @@ package com.ververica.cdc.connectors.mongodb.source.config; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; import com.ververica.cdc.connectors.base.config.SourceConfig.Factory; import com.ververica.cdc.connectors.base.options.StartupOptions; @@ -25,6 +26,7 @@ import java.util.List; import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.requireCheckpointsAfterTasksFinished; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS; @@ -48,11 +50,12 @@ public class MongoDBSourceConfigFactory implements Factory private Integer batchSize = BATCH_SIZE.defaultValue(); private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue(); private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue(); - private Boolean updateLookup = true; + private boolean updateLookup = true; private StartupOptions startupOptions = StartupOptions.initial(); private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue(); private Integer splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue(); private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue(); + private boolean closeIdleReaders = false; /** The comma-separated list of hostname and port pairs of mongodb servers. */ public MongoDBSourceConfigFactory hosts(String hosts) { @@ -187,16 +190,34 @@ public MongoDBSourceConfigFactory splitMetaGroupSize(int splitMetaGroupSize) { return this; } - /** Validate required options. */ - public void validate() { - checkNotNull(hosts, "hosts must be provided"); + /** + * Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public MongoDBSourceConfigFactory closeIdleReaders(boolean closeIdleReaders) { + this.closeIdleReaders = closeIdleReaders; + return this; + } + + @Override + public MongoDBSourceConfig create(int subtaskId, Configuration configuration) { + if (closeIdleReaders) { + requireCheckpointsAfterTasksFinished(configuration); + } + return create(subtaskId); } /** Creates a new {@link MongoDBSourceConfig} for the given subtask {@code subtaskId}. */ @Override public MongoDBSourceConfig create(int subtaskId) { return new MongoDBSourceConfig( - hosts, + checkNotNull(hosts), username, password, databaseList, @@ -209,6 +230,7 @@ public MongoDBSourceConfig create(int subtaskId) { startupOptions, heartbeatIntervalMillis, splitMetaGroupSize, - splitSizeMB); + splitSizeMB, + closeIdleReaders); } } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java index 4be17dae5b..119d7c40ff 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java @@ -78,6 +78,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad private final boolean enableParallelRead; private final Integer splitMetaGroupSize; private final Integer splitSizeMB; + private final boolean closeIdlerReaders; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -106,7 +107,8 @@ public MongoDBTableSource( ZoneId localTimeZone, boolean enableParallelRead, @Nullable Integer splitMetaGroupSize, - @Nullable Integer splitSizeMB) { + @Nullable Integer splitSizeMB, + boolean closeIdlerReaders) { this.physicalSchema = physicalSchema; this.hosts = checkNotNull(hosts); this.username = username; @@ -126,6 +128,7 @@ public MongoDBTableSource( this.enableParallelRead = enableParallelRead; this.splitMetaGroupSize = splitMetaGroupSize; this.splitSizeMB = splitSizeMB; + this.closeIdlerReaders = closeIdlerReaders; } @Override @@ -172,7 +175,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { if (enableParallelRead) { MongoDBSourceBuilder builder = - MongoDBSource.builder().hosts(hosts).deserializer(deserializer); + MongoDBSource.builder() + .hosts(hosts) + .closeIdleReaders(closeIdlerReaders) + .deserializer(deserializer); Optional.ofNullable(databaseList).ifPresent(builder::databaseList); Optional.ofNullable(collectionList).ifPresent(builder::collectionList); @@ -263,7 +269,8 @@ public DynamicTableSource copy() { localTimeZone, enableParallelRead, splitMetaGroupSize, - splitSizeMB); + splitSizeMB, + closeIdlerReaders); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -296,7 +303,8 @@ public boolean equals(Object o) { && Objects.equals(splitMetaGroupSize, that.splitMetaGroupSize) && Objects.equals(splitSizeMB, that.splitSizeMB) && Objects.equals(producedDataType, that.producedDataType) - && Objects.equals(metadataKeys, that.metadataKeys); + && Objects.equals(metadataKeys, that.metadataKeys) + && Objects.equals(closeIdlerReaders, that.closeIdlerReaders); } @Override @@ -320,7 +328,8 @@ public int hashCode() { splitMetaGroupSize, splitSizeMB, producedDataType, - metadataKeys); + metadataKeys, + closeIdlerReaders); } @Override diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java index 9c35489db0..a703fd9c0a 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java @@ -30,6 +30,7 @@ import java.util.Set; import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COLLECTION; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.CONNECTION_OPTIONS; @@ -87,6 +88,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { : ZoneId.of(zoneId); boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); + boolean enableCloseIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); int splitSizeMB = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB); int splitMetaGroupSize = config.get(CHUNK_META_GROUP_SIZE); @@ -113,7 +115,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { localTimeZone, enableParallelRead, splitMetaGroupSize, - splitSizeMB); + splitSizeMB, + enableCloseIdleReaders); } private void checkPrimaryKey(UniqueConstraint pk, String message) { @@ -151,6 +154,7 @@ public Set> optionalOptions() { options.add(SCAN_INCREMENTAL_SNAPSHOT_ENABLED); options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB); options.add(CHUNK_META_GROUP_SIZE); + options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); return options; } } diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java index 654ace41c9..09f51d84de 100644 --- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java +++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java @@ -45,6 +45,7 @@ import java.util.Map; import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING; import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS; @@ -101,6 +102,8 @@ public class MongoDBTableFactoryTest { private static final int SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue(); private static final int CHUNK_META_GROUP_SIZE_DEFAULT = CHUNK_META_GROUP_SIZE.defaultValue(); + private static final boolean SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT = + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(); @Test public void testCommonProperties() { @@ -126,7 +129,8 @@ public void testCommonProperties() { LOCAL_TIME_ZONE, SCAN_INCREMENTAL_SNAPSHOT_ENABLED_DEFAULT, CHUNK_META_GROUP_SIZE_DEFAULT, - SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT); + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT, + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT); assertEquals(expectedSource, actualSource); } @@ -143,6 +147,7 @@ public void testOptionalProperties() { options.put("scan.incremental.snapshot.enabled", "true"); options.put("chunk-meta.group.size", "1001"); options.put("scan.incremental.snapshot.chunk.size.mb", "10"); + options.put("scan.incremental.close-idle-reader.enabled", "true"); DynamicTableSource actualSource = createTableSource(SCHEMA, options); MongoDBTableSource expectedSource = @@ -163,7 +168,8 @@ public void testOptionalProperties() { LOCAL_TIME_ZONE, true, 1001, - 10); + 10, + true); assertEquals(expectedSource, actualSource); } @@ -197,7 +203,8 @@ public void testMetadataColumns() { LOCAL_TIME_ZONE, SCAN_INCREMENTAL_SNAPSHOT_ENABLED_DEFAULT, CHUNK_META_GROUP_SIZE_DEFAULT, - SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT); + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB_DEFAULT, + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java index 4d7b38156f..9517db29da 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java @@ -203,6 +203,21 @@ public OracleSourceBuilder debeziumProperties(Properties properties) { return this; } + /** + * Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public OracleSourceBuilder closeIdleReaders(boolean closeIdleReaders) { + this.configFactory.closeIdleReaders(closeIdleReaders); + return this; + } + /** * The deserializer used to convert from consumed {@link * org.apache.kafka.connect.source.SourceRecord}. diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java index 1e97aee446..478611d8cc 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfig.java @@ -47,6 +47,7 @@ public OracleSourceConfig( double distributionFactorUpper, double distributionFactorLower, boolean includeSchemaChanges, + boolean closeIdleReaders, Properties dbzProperties, Configuration dbzConfiguration, String driverClassName, @@ -70,6 +71,7 @@ public OracleSourceConfig( distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, dbzProperties, dbzConfiguration, driverClassName, diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java index d35016249f..aa1a6c76d2 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/config/OracleSourceConfigFactory.java @@ -114,6 +114,7 @@ public OracleSourceConfig create(int subtaskId) { distributionFactorUpper, distributionFactorLower, includeSchemaChanges, + closeIdleReaders, props, dbzConfiguration, DRIVER_ClASS_NAME, diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java index 0f5ccfe3a1..c0cdd162e1 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java @@ -77,6 +77,7 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada private final double distributionFactorUpper; private final double distributionFactorLower; private final String chunkKeyColumn; + private final boolean closeIdleReaders; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -109,7 +110,8 @@ public OracleTableSource( int connectionPoolSize, double distributionFactorUpper, double distributionFactorLower, - @Nullable String chunkKeyColumn) { + @Nullable String chunkKeyColumn, + boolean closeIdleReaders) { this.physicalSchema = physicalSchema; this.url = url; this.port = port; @@ -133,6 +135,7 @@ public OracleTableSource( this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; this.chunkKeyColumn = chunkKeyColumn; + this.closeIdleReaders = closeIdleReaders; } @Override @@ -178,6 +181,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .connectMaxRetries(connectMaxRetries) .distributionFactorUpper(distributionFactorUpper) .distributionFactorLower(distributionFactorLower) + .closeIdleReaders(closeIdleReaders) .build(); return SourceProvider.of(oracleChangeEventSource); @@ -241,7 +245,8 @@ public DynamicTableSource copy() { connectionPoolSize, distributionFactorUpper, distributionFactorLower, - chunkKeyColumn); + chunkKeyColumn, + closeIdleReaders); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -278,7 +283,8 @@ public boolean equals(Object o) { && Objects.equals(connectionPoolSize, that.connectionPoolSize) && Objects.equals(distributionFactorUpper, that.distributionFactorUpper) && Objects.equals(distributionFactorLower, that.distributionFactorLower) - && Objects.equals(chunkKeyColumn, that.chunkKeyColumn); + && Objects.equals(chunkKeyColumn, that.chunkKeyColumn) + && Objects.equals(closeIdleReaders, that.closeIdleReaders); } @Override @@ -306,7 +312,8 @@ public int hashCode() { connectionPoolSize, distributionFactorUpper, distributionFactorLower, - chunkKeyColumn); + chunkKeyColumn, + closeIdleReaders); } @Override diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java index 81d8806ab5..ef61790ac0 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java @@ -37,6 +37,7 @@ import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.DATABASE_NAME; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.HOSTNAME; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.PASSWORD; +import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SERVER_TIME_ZONE; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME; @@ -91,6 +92,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null); String serverTimezone = config.get(SERVER_TIME_ZONE); + boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + if (enableParallelRead) { validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1); @@ -122,7 +125,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { connectionPoolSize, distributionFactorUpper, distributionFactorLower, - chunkKeyColumn); + chunkKeyColumn, + closeIdlerReaders); } @Override @@ -158,6 +162,7 @@ public Set> optionalOptions() { options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); + options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); return options; } diff --git a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java index d0b9a6a858..4c95091e19 100644 --- a/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java +++ b/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactoryTest.java @@ -118,7 +118,9 @@ public void testRequiredProperties() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + JdbcSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED + .defaultValue()); assertEquals(expectedSource, actualSource); } @@ -152,7 +154,8 @@ public void testCommonProperties() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -190,7 +193,8 @@ public void testOptionalProperties() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -214,6 +218,8 @@ public void testScanIncrementalProperties() { String.valueOf(chunkSize)); options.put(SourceOptions.CHUNK_META_GROUP_SIZE.key(), String.valueOf(splitMetaGroupSize)); options.put(SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE.key(), String.valueOf(fetchSize)); + options.put(SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.key(), "true"); + options.put( JdbcSourceOptions.CONNECT_TIMEOUT.key(), String.format("%ds", connectTimeout.getSeconds())); @@ -251,7 +257,8 @@ public void testScanIncrementalProperties() { connectPoolSize, distributionFactorUpper, distributionFactorLower, - null); + null, + true); assertEquals(expectedSource, actualSource); } @@ -286,7 +293,8 @@ public void testStartupFromInitial() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -321,7 +329,8 @@ public void testStartupFromLatestOffset() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()); assertEquals(expectedSource, actualSource); } @@ -360,7 +369,8 @@ public void testMetadataColumns() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue()); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "table_name", "schema_name");