From 477b45ebd6be32219835cd9f220b63c8a1d91581 Mon Sep 17 00:00:00 2001 From: gongzhongqiang Date: Mon, 17 Jul 2023 13:03:32 +0800 Subject: [PATCH] [hotfix][sqlserver] Fix sqlserver close idle and chunk key column missing --- .../source/SqlServerSourceBuilder.java | 16 +++++ .../table/SqlServerTableFactory.java | 6 +- .../sqlserver/table/SqlServerTableSource.java | 17 +++-- .../table/SqlServerTableFactoryTest.java | 64 ++++++++++++++++--- 4 files changed, 90 insertions(+), 13 deletions(-) diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceBuilder.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceBuilder.java index 8b17b436bf..935612aece 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceBuilder.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/source/SqlServerSourceBuilder.java @@ -196,6 +196,22 @@ public SqlServerSourceBuilder deserializer(DebeziumDeserializationSchema d return this; } + /** + * Whether to close idle readers at the end of the snapshot phase. This feature depends on + * FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be + * greater than or equal to 1.14, and the configuration + * 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' needs to be set to + * true. + * + *

See more FLIP-147: + * Support Checkpoints After Tasks Finished. + */ + public SqlServerSourceBuilder closeIdleReaders(boolean closeIdleReaders) { + this.configFactory.closeIdleReaders(closeIdleReaders); + return this; + } + /** * Build the {@link SqlServerIncrementalSource}. * diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java index faecb03d83..ad200c6706 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java @@ -39,6 +39,7 @@ import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT; import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE; +import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; @@ -135,6 +136,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); String chunkKeyColumn = config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null); + boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); if (enableParallelRead) { validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1); @@ -168,7 +170,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { connectMaxRetries, distributionFactorUpper, distributionFactorLower, - chunkKeyColumn); + chunkKeyColumn, + closeIdleReaders); } @Override @@ -203,6 +206,7 @@ public Set> optionalOptions() { options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); + options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); return options; } diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableSource.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableSource.java index ee3beb50d2..f877fa1b65 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableSource.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableSource.java @@ -77,6 +77,7 @@ public class SqlServerTableSource implements ScanTableSource, SupportsReadingMet private final double distributionFactorUpper; private final double distributionFactorLower; private final String chunkKeyColumn; + private final boolean closeIdleReaders; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -108,7 +109,8 @@ public SqlServerTableSource( int connectionPoolSize, double distributionFactorUpper, double distributionFactorLower, - @Nullable String chunkKeyColumn) { + @Nullable String chunkKeyColumn, + boolean closeIdleReaders) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -131,6 +133,7 @@ public SqlServerTableSource( this.distributionFactorUpper = distributionFactorUpper; this.distributionFactorLower = distributionFactorLower; this.chunkKeyColumn = chunkKeyColumn; + this.closeIdleReaders = closeIdleReaders; } @Override @@ -162,6 +165,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .port(port) .databaseList(database) .tableList(tableName) + .serverTimeZone(serverTimeZone.toString()) .username(username) .password(password) .startupOptions(startupOptions) @@ -175,6 +179,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .connectMaxRetries(connectMaxRetries) .distributionFactorUpper(distributionFactorUpper) .distributionFactorLower(distributionFactorLower) + .chunkKeyColumn(chunkKeyColumn) + .closeIdleReaders(closeIdleReaders) .build(); return SourceProvider.of(sqlServerChangeEventSource); } else { @@ -233,7 +239,8 @@ public DynamicTableSource copy() { connectMaxRetries, distributionFactorUpper, distributionFactorLower, - chunkKeyColumn); + chunkKeyColumn, + closeIdleReaders); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -269,7 +276,8 @@ public boolean equals(Object o) { && Objects.equals(connectionPoolSize, that.connectionPoolSize) && Objects.equals(distributionFactorUpper, that.distributionFactorUpper) && Objects.equals(distributionFactorLower, that.distributionFactorLower) - && Objects.equals(chunkKeyColumn, that.chunkKeyColumn); + && Objects.equals(chunkKeyColumn, that.chunkKeyColumn) + && Objects.equals(closeIdleReaders, that.closeIdleReaders); } @Override @@ -296,7 +304,8 @@ public int hashCode() { connectionPoolSize, distributionFactorUpper, distributionFactorLower, - chunkKeyColumn); + chunkKeyColumn, + closeIdleReaders); } @Override diff --git a/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java b/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java index a69d7bd81c..3997d869d6 100644 --- a/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java +++ b/flink-connector-sqlserver-cdc/src/test/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactoryTest.java @@ -34,6 +34,7 @@ import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils; import org.junit.Test; +import java.time.Duration; import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; @@ -108,17 +109,62 @@ public void testCommonProperties() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + false); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testEnableParallelReadSource() { + Map properties = getAllOptions(); + properties.put("scan.incremental.snapshot.enabled", "true"); + properties.put("scan.incremental.snapshot.chunk.size", "8000"); + properties.put("chunk-meta.group.size", "3000"); + properties.put("chunk-key.even-distribution.factor.upper-bound", "40.5"); + properties.put("chunk-key.even-distribution.factor.lower-bound", "0.01"); + properties.put("scan.snapshot.fetch.size", "100"); + properties.put("connect.timeout", "45s"); + properties.put("scan.incremental.snapshot.chunk.key-column", "testCol"); + properties.put("scan.incremental.close-idle-reader.enabled", "true"); + + // validation for source + DynamicTableSource actualSource = createTableSource(SCHEMA, properties); + SqlServerTableSource expectedSource = + new SqlServerTableSource( + SCHEMA, + 1433, + MY_LOCALHOST, + MY_DATABASE, + MY_TABLE, + ZoneId.of("UTC"), + MY_USERNAME, + MY_PASSWORD, + PROPERTIES, + StartupOptions.initial(), + SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.defaultValue(), + 8000, + 3000, + 100, + Duration.ofSeconds(45), + JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue(), + JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue(), + 40.5d, + 0.01d, + "testCol", + true); assertEquals(expectedSource, actualSource); } @Test public void testOptionalProperties() { - Map options = getAllOptions(); - options.put("port", "1433"); - options.put("debezium.snapshot.mode", "initial"); + Map properties = getAllOptions(); + properties.put("port", "1433"); + properties.put("debezium.snapshot.mode", "initial"); + properties.put("server-time-zone", "Asia/Shanghai"); + properties.put("scan.incremental.snapshot.chunk.key-column", "testCol"); + properties.put("scan.incremental.close-idle-reader.enabled", "true"); - DynamicTableSource actualSource = createTableSource(options); + DynamicTableSource actualSource = createTableSource(properties); Properties dbzProperties = new Properties(); dbzProperties.put("snapshot.mode", "initial"); SqlServerTableSource expectedSource = @@ -128,7 +174,7 @@ public void testOptionalProperties() { MY_LOCALHOST, MY_DATABASE, MY_TABLE, - ZoneId.of("UTC"), + ZoneId.of("Asia/Shanghai"), MY_USERNAME, MY_PASSWORD, dbzProperties, @@ -144,7 +190,8 @@ public void testOptionalProperties() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + "testCol", + true); assertEquals(expectedSource, actualSource); } @@ -182,7 +229,8 @@ public void testMetadataColumns() { .defaultValue(), JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND .defaultValue(), - null); + null, + false); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name", "schema_name", "table_name");