Skip to content

Commit

Permalink
#1731 support pg
Browse files Browse the repository at this point in the history
  • Loading branch information
wuzhenhua01 committed Aug 16, 2023
1 parent f7df47e commit 5beee5f
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,23 @@ public PostgresSourceBuilder<T> debeziumProperties(Properties properties) {
return this;
}

/**
* scan.incremental.close-idle-reader.enabled
*
* <p>Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
* greater than or equal to 1.14, and the configuration <code>
* 'execution.checkpointing.checkpoints-after-tasks-finish.enabled'</code> needs to be set to
* true.
*
* <p>See more
* https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished.
*/
public PostgresSourceBuilder<T> closeIdleReaders(boolean closeIdleReaders) {
this.configFactory.closeIdleReaders(closeIdleReaders);
return this;
}

/**
* The deserializer used to convert from consumed {@link
* org.apache.kafka.connect.source.SourceRecord}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Properties;
import java.util.UUID;

import static com.ververica.cdc.connectors.base.utils.EnvironmentUtils.checkSupportCheckpointsAfterTasksFinished;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -51,6 +52,7 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
checkSupportCheckpointsAfterTasksFinished(closeIdleReaders);
Properties props = new Properties();
props.setProperty("connector.class", PostgresConnector.class.getCanonicalName());
props.setProperty("plugin.name", pluginName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCHEMA_NAME;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.TABLE_NAME;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.USERNAME;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.base.utils.ObjectUtils.doubleCompare;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHANGELOG_MODE;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE;
Expand Down Expand Up @@ -108,6 +109,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
String chunkKeyColumn =
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);

boolean closeIdlerReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);

if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(SCAN_SNAPSHOT_FETCH_SIZE, fetchSize, 1);
Expand Down Expand Up @@ -148,7 +151,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
distributionFactorLower,
heartbeatInterval,
startupOptions,
chunkKeyColumn);
chunkKeyColumn,
closeIdlerReaders);
}

@Override
Expand Down Expand Up @@ -187,6 +191,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CONNECT_MAX_RETRIES);
options.add(CONNECTION_POOL_SIZE);
options.add(HEARTBEAT_INTERVAL);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
private final Duration heartbeatInterval;
private final StartupOptions startupOptions;
private final String chunkKeyColumn;
private final boolean closeIdleReaders;

// --------------------------------------------------------------------------------------------
// Mutable attributes
Expand Down Expand Up @@ -116,7 +117,8 @@ public PostgreSQLTableSource(
double distributionFactorLower,
Duration heartbeatInterval,
StartupOptions startupOptions,
@Nullable String chunkKeyColumn) {
@Nullable String chunkKeyColumn,
boolean closeIdleReaders) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
Expand Down Expand Up @@ -144,6 +146,7 @@ public PostgreSQLTableSource(
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
this.closeIdleReaders = closeIdleReaders;
}

@Override
Expand Down Expand Up @@ -202,6 +205,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.startupOptions(startupOptions)
.chunkKeyColumn(chunkKeyColumn)
.heartbeatInterval(heartbeatInterval)
.closeIdleReaders(closeIdleReaders)
.build();
return SourceProvider.of(parallelSource);
} else {
Expand Down Expand Up @@ -266,7 +270,8 @@ public DynamicTableSource copy() {
distributionFactorLower,
heartbeatInterval,
startupOptions,
chunkKeyColumn);
chunkKeyColumn,
closeIdleReaders);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down Expand Up @@ -306,7 +311,8 @@ public boolean equals(Object o) {
&& Objects.equals(distributionFactorLower, that.distributionFactorLower)
&& Objects.equals(heartbeatInterval, that.heartbeatInterval)
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn);
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(closeIdleReaders, that.closeIdleReaders);
}

@Override
Expand Down Expand Up @@ -337,7 +343,8 @@ public int hashCode() {
distributionFactorLower,
heartbeatInterval,
startupOptions,
chunkKeyColumn);
chunkKeyColumn,
closeIdleReaders);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.Properties;

import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions.CONNECT_MAX_RETRIES;
Expand Down Expand Up @@ -110,6 +111,8 @@ public class PostgreSQLTableFactoryTest {
private static final String MY_SCHEMA = "public";
private static final String MY_SLOT_NAME = "flinktest";
private static final Properties PROPERTIES = new Properties();
private static final boolean SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT =
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED.defaultValue();

@Test
public void testCommonProperties() {
Expand Down Expand Up @@ -142,7 +145,8 @@ public void testCommonProperties() {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
HEARTBEAT_INTERVAL.defaultValue(),
StartupOptions.initial(),
null);
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -182,7 +186,8 @@ public void testOptionalProperties() {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
HEARTBEAT_INTERVAL.defaultValue(),
StartupOptions.initial(),
null);
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -222,7 +227,8 @@ public void testMetadataColumns() {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
HEARTBEAT_INTERVAL.defaultValue(),
StartupOptions.initial(),
null);
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "schema_name", "table_name");
Expand Down Expand Up @@ -272,7 +278,8 @@ public void testEnableParallelReadSource() {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
HEARTBEAT_INTERVAL.defaultValue(),
StartupOptions.initial(),
null);
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -312,7 +319,8 @@ public void testStartupFromLatestOffset() {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
HEARTBEAT_INTERVAL.defaultValue(),
StartupOptions.latest(),
null);
null,
SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED_DEFAULT);
assertEquals(expectedSource, actualSource);
}

Expand Down

0 comments on commit 5beee5f

Please sign in to comment.