Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hotfix][sqlserver] Fix sqlserver close idle and chunk key column missing #2310

Merged
merged 1 commit into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,22 @@ public SqlServerSourceBuilder<T> deserializer(DebeziumDeserializationSchema<T> d
return this;
}

/**
* Whether to close idle readers at the end of the snapshot phase. This feature depends on
* FLIP-147: Support Checkpoints After Tasks Finished. The flink version is required to be
* greater than or equal to 1.14, and the configuration <code>
* 'execution.checkpointing.checkpoints-after-tasks-finish.enabled'</code> needs to be set to
* true.
*
* <p>See more <a
* href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished">FLIP-147:
* Support Checkpoints After Tasks Finished</a>.
*/
public SqlServerSourceBuilder<T> closeIdleReaders(boolean closeIdleReaders) {
this.configFactory.closeIdleReaders(closeIdleReaders);
return this;
}

/**
* Build the {@link SqlServerIncrementalSource}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.base.options.JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static com.ververica.cdc.connectors.base.options.SourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
Expand Down Expand Up @@ -135,6 +136,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
String chunkKeyColumn =
config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null);
boolean closeIdleReaders = config.get(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);

if (enableParallelRead) {
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
Expand Down Expand Up @@ -168,7 +170,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
connectMaxRetries,
distributionFactorUpper,
distributionFactorLower,
chunkKeyColumn);
chunkKeyColumn,
closeIdleReaders);
}

@Override
Expand Down Expand Up @@ -203,6 +206,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class SqlServerTableSource implements ScanTableSource, SupportsReadingMet
private final double distributionFactorUpper;
private final double distributionFactorLower;
private final String chunkKeyColumn;
private final boolean closeIdleReaders;

// --------------------------------------------------------------------------------------------
// Mutable attributes
Expand Down Expand Up @@ -108,7 +109,8 @@ public SqlServerTableSource(
int connectionPoolSize,
double distributionFactorUpper,
double distributionFactorLower,
@Nullable String chunkKeyColumn) {
@Nullable String chunkKeyColumn,
boolean closeIdleReaders) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
Expand All @@ -131,6 +133,7 @@ public SqlServerTableSource(
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.chunkKeyColumn = chunkKeyColumn;
this.closeIdleReaders = closeIdleReaders;
}

@Override
Expand Down Expand Up @@ -162,6 +165,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.port(port)
.databaseList(database)
.tableList(tableName)
.serverTimeZone(serverTimeZone.toString())
.username(username)
.password(password)
.startupOptions(startupOptions)
Expand All @@ -175,6 +179,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.connectMaxRetries(connectMaxRetries)
.distributionFactorUpper(distributionFactorUpper)
.distributionFactorLower(distributionFactorLower)
.chunkKeyColumn(chunkKeyColumn)
.closeIdleReaders(closeIdleReaders)
.build();
return SourceProvider.of(sqlServerChangeEventSource);
} else {
Expand Down Expand Up @@ -233,7 +239,8 @@ public DynamicTableSource copy() {
connectMaxRetries,
distributionFactorUpper,
distributionFactorLower,
chunkKeyColumn);
chunkKeyColumn,
closeIdleReaders);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down Expand Up @@ -269,7 +276,8 @@ public boolean equals(Object o) {
&& Objects.equals(connectionPoolSize, that.connectionPoolSize)
&& Objects.equals(distributionFactorUpper, that.distributionFactorUpper)
&& Objects.equals(distributionFactorLower, that.distributionFactorLower)
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn);
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn)
&& Objects.equals(closeIdleReaders, that.closeIdleReaders);
}

@Override
Expand All @@ -296,7 +304,8 @@ public int hashCode() {
connectionPoolSize,
distributionFactorUpper,
distributionFactorLower,
chunkKeyColumn);
chunkKeyColumn,
closeIdleReaders);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.ververica.cdc.debezium.utils.ResolvedSchemaUtils;
import org.junit.Test;

import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -108,17 +109,62 @@ public void testCommonProperties() {
.defaultValue(),
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
null);
null,
false);
assertEquals(expectedSource, actualSource);
}

@Test
public void testEnableParallelReadSource() {
Map<String, String> properties = getAllOptions();
properties.put("scan.incremental.snapshot.enabled", "true");
properties.put("scan.incremental.snapshot.chunk.size", "8000");
properties.put("chunk-meta.group.size", "3000");
properties.put("chunk-key.even-distribution.factor.upper-bound", "40.5");
properties.put("chunk-key.even-distribution.factor.lower-bound", "0.01");
properties.put("scan.snapshot.fetch.size", "100");
properties.put("connect.timeout", "45s");
properties.put("scan.incremental.snapshot.chunk.key-column", "testCol");
properties.put("scan.incremental.close-idle-reader.enabled", "true");

// validation for source
DynamicTableSource actualSource = createTableSource(SCHEMA, properties);
SqlServerTableSource expectedSource =
new SqlServerTableSource(
SCHEMA,
1433,
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
ZoneId.of("UTC"),
MY_USERNAME,
MY_PASSWORD,
PROPERTIES,
StartupOptions.initial(),
SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.defaultValue(),
8000,
3000,
100,
Duration.ofSeconds(45),
JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue(),
JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue(),
40.5d,
0.01d,
"testCol",
true);
assertEquals(expectedSource, actualSource);
}

@Test
public void testOptionalProperties() {
Map<String, String> options = getAllOptions();
options.put("port", "1433");
options.put("debezium.snapshot.mode", "initial");
Map<String, String> properties = getAllOptions();
properties.put("port", "1433");
properties.put("debezium.snapshot.mode", "initial");
properties.put("server-time-zone", "Asia/Shanghai");
properties.put("scan.incremental.snapshot.chunk.key-column", "testCol");
properties.put("scan.incremental.close-idle-reader.enabled", "true");

DynamicTableSource actualSource = createTableSource(options);
DynamicTableSource actualSource = createTableSource(properties);
Properties dbzProperties = new Properties();
dbzProperties.put("snapshot.mode", "initial");
SqlServerTableSource expectedSource =
Expand All @@ -128,7 +174,7 @@ public void testOptionalProperties() {
MY_LOCALHOST,
MY_DATABASE,
MY_TABLE,
ZoneId.of("UTC"),
ZoneId.of("Asia/Shanghai"),
MY_USERNAME,
MY_PASSWORD,
dbzProperties,
Expand All @@ -144,7 +190,8 @@ public void testOptionalProperties() {
.defaultValue(),
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
null);
"testCol",
true);
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -182,7 +229,8 @@ public void testMetadataColumns() {
.defaultValue(),
JdbcSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND
.defaultValue(),
null);
null,
false);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("op_ts", "database_name", "schema_name", "table_name");
Expand Down