diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java index c9e7f84f2d..748d519446 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresSourceBuilder.java @@ -215,6 +215,23 @@ public PostgresSourceBuilder debeziumProperties(Properties properties) { return this; } + /** + * scan.incremental.close-idle-reader.enabled + * + *

Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more + * https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished. + */ + public PostgresSourceBuilder closeIdleReaders(boolean closeIdleReaders) { + this.configFactory.closeIdleReaders(closeIdleReaders); + return this; + } + /** * The deserializer used to convert from consumed {@link * org.apache.kafka.connect.source.SourceRecord}. diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java index 913a8484f1..bcefd5954a 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java @@ -28,6 +28,7 @@ import java.util.Properties; import java.util.UUID; +import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished; import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -51,6 +52,7 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory { /** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */ @Override public PostgresSourceConfig create(int subtaskId) { + checkSupportCheckpointsAfterTasksFinished(closeIdleReaders); Properties props = new Properties(); props.setProperty("connector.class", PostgresConnector.class.getCanonicalName()); props.setProperty("plugin.name", pluginName); diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java index 880e69504d..93cd1e682a 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java @@ -42,6 +42,7 @@ import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCHEMA_NAME; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.USERNAME; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static com.ververica.cdc.connectors.base.utils.ObjectUtils.doubleCompare; import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHANGELOG_MODE; import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE; @@ -108,6 +109,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c String chunkKeyColumn = config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null); + boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + if (enableParallelRead) { validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1); @@ -148,7 +151,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c distributionFactorLower, heartbeatInterval, startupOptions, - chunkKeyColumn); + chunkKeyColumn, + closeIdlerReaders); } @Override @@ -187,6 +191,7 @@ public Set> optionalOptions() { options.add(CONNECT_MAX_RETRIES); options.add(CONNECTION_POOL_SIZE); options.add(HEARTBEAT_INTERVAL); + options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); return options; } diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java index 7bbddc4113..bb702e2313 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableSource.java @@ -81,6 +81,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe private final Duration heartbeatInterval; private final StartupOptions startupOptions; private final String chunkKeyColumn; + private final boolean closeIdleReaders; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -116,7 +117,8 @@ public PostgreSQLTableSource( double distributionFactorLower, Duration heartbeatInterval, StartupOptions startupOptions, - @Nullable String chunkKeyColumn) { + @Nullable String chunkKeyColumn, + boolean closeIdleReaders) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -144,6 +146,7 @@ public PostgreSQLTableSource( // Mutable attributes this.producedDataType = physicalSchema.toPhysicalRowDataType(); this.metadataKeys = Collections.emptyList(); + this.closeIdleReaders = closeIdleReaders; } @Override @@ -202,6 +205,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .startupOptions(startupOptions) .chunkKeyColumn(chunkKeyColumn) .heartbeatInterval(heartbeatInterval) + .closeIdleReaders(closeIdleReaders) .build(); return SourceProvider.of(parallelSource); } else { @@ -266,7 +270,8 @@ public DynamicTableSource copy() { distributionFactorLower, heartbeatInterval, startupOptions, - chunkKeyColumn); + chunkKeyColumn, + closeIdleReaders); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -306,7 +311,8 @@ public boolean equals(Object o) { && Objects.equals(distributionFactorLower, that.distributionFactorLower) && Objects.equals(heartbeatInterval, that.heartbeatInterval) && Objects.equals(startupOptions, that.startupOptions) - && Objects.equals(chunkKeyColumn, that.chunkKeyColumn); + && Objects.equals(chunkKeyColumn, that.chunkKeyColumn) + && Objects.equals(closeIdleReaders, that.closeIdleReaders); } @Override @@ -337,7 +343,8 @@ public int hashCode() { distributionFactorLower, heartbeatInterval, startupOptions, - chunkKeyColumn); + chunkKeyColumn, + closeIdleReaders); } @Override diff --git a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java index 282ed9c365..c43f05cd7c 100644 --- a/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java +++ b/flink-connector-postgres-cdc/src/test/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.Properties; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE; import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECTION_POOL_SIZE; import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_MAX_RETRIES; @@ -110,6 +111,8 @@ public class PostgreSQLTableFactoryTest { private static final String MY_SCHEMA = "public"; private static final String MY_SLOT_NAME = "flinktest"; private static final Properties PROPERTIES = new Properties(); + private static final boolean SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT = + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue(); @Test public void testCommonProperties() { @@ -142,7 +145,8 @@ public void testCommonProperties() { SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), HEARTBEAT_INTERVAL.defaultValue(), StartupOptions.initial(), - null); + null, + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT); assertEquals(expectedSource, actualSource); } @@ -182,7 +186,8 @@ public void testOptionalProperties() { SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), HEARTBEAT_INTERVAL.defaultValue(), StartupOptions.initial(), - null); + null, + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT); assertEquals(expectedSource, actualSource); } @@ -222,7 +227,8 @@ public void testMetadataColumns() { SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), HEARTBEAT_INTERVAL.defaultValue(), StartupOptions.initial(), - null); + null, + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "schema_name", "table_name"); @@ -272,7 +278,8 @@ public void testEnableParallelReadSource() { SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), HEARTBEAT_INTERVAL.defaultValue(), StartupOptions.initial(), - null); + null, + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT); assertEquals(expectedSource, actualSource); } @@ -312,7 +319,8 @@ public void testStartupFromLatestOffset() { SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), HEARTBEAT_INTERVAL.defaultValue(), StartupOptions.latest(), - null); + null, + SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT); assertEquals(expectedSource, actualSource); }