diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java index e7eb866e50..dba621777d 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java @@ -89,22 +89,24 @@ public class SourceOptions { "The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups."); public static final ConfigOption SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = - ConfigOptions.key("split-key.even-distribution.factor.upper-bound") + ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound") .doubleType() .defaultValue(1000.0d) + .withFallbackKeys("split-key.even-distribution.factor.upper-bound") .withDescription( - "The upper bound of split key distribution factor. The distribution factor is used to determine whether the" + "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + " table is evenly distribution or not." + " The table chunks would use evenly calculation optimization when the data distribution is even," + " and the query MySQL for splitting would happen when it is uneven." + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); public static final ConfigOption SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = - ConfigOptions.key("split-key.even-distribution.factor.lower-bound") + ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound") .doubleType() .defaultValue(0.05d) + .withFallbackKeys("split-key.even-distribution.factor.lower-bound") .withDescription( - "The lower bound of split key distribution factor. The distribution factor is used to determine whether the" + "The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the" + " table is evenly distribution or not." + " The table chunks would use evenly calculation optimization when the data distribution is even," + " and the query MySQL for splitting would happen when it is uneven." diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java index a749e9f18e..76492ff853 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java @@ -206,8 +206,9 @@ private boolean shouldEmit(SourceRecord sourceRecord) { // only the table who captured snapshot splits need to filter if (finishedSplitsInfo.containsKey(tableId)) { RowType splitKeyType = - ChunkUtils.getSplitType( - statefulTaskContext.getDatabaseSchema().tableFor(tableId)); + ChunkUtils.getChunkKeyColumnType( + statefulTaskContext.getDatabaseSchema().tableFor(tableId), + statefulTaskContext.getSourceConfig().getChunkKeyColumn()); Object[] key = getSplitKey( splitKeyType, diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java index 56327fddc9..64d749810d 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -119,6 +119,15 @@ public MySqlSourceBuilder serverTimeZone(String timeZone) { return this; } + /** + * The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk + * key column when read the snapshot of table. + */ + public MySqlSourceBuilder chunkKeyColumn(String chunkKeyColumn) { + this.configFactory.chunkKeyColumn(chunkKeyColumn); + return this; + } + /** * The split size (number of rows) of table snapshot, captured tables are split into multiple * splits when read the snapshot of table. diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java index 444fd74a77..b9440dd2cd 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java @@ -78,17 +78,18 @@ public Collection generateSplits(TableId tableId) { long start = System.currentTimeMillis(); Table table = mySqlSchema.getTableSchema(jdbc, tableId).getTable(); - Column splitColumn = ChunkUtils.getSplitColumn(table); + Column chunkKeyColumn = + ChunkUtils.getChunkKeyColumn(table, sourceConfig.getChunkKeyColumn()); final List chunks; try { - chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); + chunks = splitTableIntoChunks(jdbc, tableId, chunkKeyColumn); } catch (SQLException e) { throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e); } // convert chunks into splits List splits = new ArrayList<>(); - RowType splitType = ChunkUtils.getSplitType(splitColumn); + RowType chunkKeyColumnType = ChunkUtils.getChunkKeyColumnType(chunkKeyColumn); for (int i = 0; i < chunks.size(); i++) { ChunkRange chunk = chunks.get(i); MySqlSnapshotSplit split = @@ -96,7 +97,7 @@ public Collection generateSplits(TableId tableId) { jdbc, tableId, i, - splitType, + chunkKeyColumnType, chunk.getChunkStart(), chunk.getChunkEnd()); splits.add(split); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 812bfa4fce..7af461f869 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -55,6 +55,7 @@ public class MySqlSourceConfig implements Serializable { private final boolean includeSchemaChanges; private final boolean scanNewlyAddedTableEnabled; private final Properties jdbcProperties; + @Nullable private final String chunkKeyColumn; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -84,7 +85,8 @@ public class MySqlSourceConfig implements Serializable { boolean includeSchemaChanges, boolean scanNewlyAddedTableEnabled, Properties dbzProperties, - Properties jdbcProperties) { + Properties jdbcProperties, + @Nullable String chunkKeyColumn) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -108,6 +110,7 @@ public class MySqlSourceConfig implements Serializable { this.dbzConfiguration = Configuration.from(dbzProperties); this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration); this.jdbcProperties = jdbcProperties; + this.chunkKeyColumn = chunkKeyColumn; } public String getHostname() { @@ -206,4 +209,9 @@ public RelationalTableFilters getTableFilters() { public Properties getJdbcProperties() { return jdbcProperties; } + + @Nullable + public String getChunkKeyColumn() { + return chunkKeyColumn; + } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 0fa2087d9a..433900e131 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -29,6 +29,8 @@ import java.util.Properties; import java.util.UUID; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES; @@ -37,8 +39,6 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.util.Preconditions.checkNotNull; /** A factory to construct {@link MySqlSourceConfig}. */ @@ -63,14 +63,15 @@ public class MySqlSourceConfigFactory implements Serializable { private int connectMaxRetries = CONNECT_MAX_RETRIES.defaultValue(); private int connectionPoolSize = CONNECTION_POOL_SIZE.defaultValue(); private double distributionFactorUpper = - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(); + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(); private double distributionFactorLower = - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(); + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(); private boolean includeSchemaChanges = false; private boolean scanNewlyAddedTableEnabled = false; private Properties jdbcProperties; private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue(); private Properties dbzProperties; + private String chunkKeyColumn; public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -140,6 +141,15 @@ public MySqlSourceConfigFactory serverTimeZone(String timeZone) { return this; } + /** + * The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk + * key column when read the snapshot of table. + */ + public MySqlSourceConfigFactory chunkKeyColumn(String chunkKeyColumn) { + this.chunkKeyColumn = chunkKeyColumn; + return this; + } + /** * The split size (number of rows) of table snapshot, captured tables are split into multiple * splits when read the snapshot of table. @@ -329,6 +339,7 @@ public MySqlSourceConfig createConfig(int subtaskId) { includeSchemaChanges, scanNewlyAddedTableEnabled, props, - jdbcProperties); + jdbcProperties, + chunkKeyColumn); } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index 44da4ca736..85ef061fa1 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -181,24 +181,26 @@ public class MySqlSourceOptions { "The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups."); @Experimental - public static final ConfigOption SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = - ConfigOptions.key("split-key.even-distribution.factor.upper-bound") + public static final ConfigOption CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound") .doubleType() .defaultValue(1000.0d) + .withFallbackKeys("split-key.even-distribution.factor.upper-bound") .withDescription( - "The upper bound of split key distribution factor. The distribution factor is used to determine whether the" + "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + " table is evenly distribution or not." + " The table chunks would use evenly calculation optimization when the data distribution is even," + " and the query MySQL for splitting would happen when it is uneven." + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); @Experimental - public static final ConfigOption SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = - ConfigOptions.key("split-key.even-distribution.factor.lower-bound") + public static final ConfigOption CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound") .doubleType() .defaultValue(0.05d) + .withFallbackKeys("split-key.even-distribution.factor.lower-bound") .withDescription( - "The lower bound of split key distribution factor. The distribution factor is used to determine whether the" + "The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the" + " table is evenly distribution or not." + " The table chunks would use evenly calculation optimization when the data distribution is even," + " and the query MySQL for splitting would happen when it is uneven." @@ -211,4 +213,14 @@ public class MySqlSourceOptions { .defaultValue(false) .withDescription( "Whether capture the scan the newly added tables or not, by default is false."); + + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN = + ConfigOptions.key("scan.incremental.snapshot.chunk.key-column") + .stringType() + .noDefaultValue() + .withDescription( + "The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table." + + "By default, the chunk key is the first column of the primary key." + + "This column must be a column of the primary key."); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java index 421eadc0a1..91c4f1cdc3 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java @@ -24,7 +24,11 @@ import io.debezium.relational.Column; import io.debezium.relational.Table; +import javax.annotation.Nullable; + import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.api.DataTypes.ROW; @@ -34,27 +38,17 @@ public class ChunkUtils { private ChunkUtils() {} - public static RowType getSplitType(Table table) { - List primaryKeys = table.primaryKeyColumns(); - if (primaryKeys.isEmpty()) { - throw new ValidationException( - String.format( - "Incremental snapshot for tables requires primary key," - + " but table %s doesn't have primary key.", - table.id())); - } - - // use first field in primary key as the split key - return getSplitType(primaryKeys.get(0)); + public static RowType getChunkKeyColumnType(Table table, @Nullable String chunkKeyColumn) { + return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumn)); } - public static RowType getSplitType(Column splitColumn) { + public static RowType getChunkKeyColumnType(Column chunkKeyColumn) { return (RowType) - ROW(FIELD(splitColumn.name(), MySqlTypeUtils.fromDbzColumn(splitColumn))) + ROW(FIELD(chunkKeyColumn.name(), MySqlTypeUtils.fromDbzColumn(chunkKeyColumn))) .getLogicalType(); } - public static Column getSplitColumn(Table table) { + public static Column getChunkKeyColumn(Table table, @Nullable String chunkKeyColumn) { List primaryKeys = table.primaryKeyColumns(); if (primaryKeys.isEmpty()) { throw new ValidationException( @@ -64,7 +58,23 @@ public static Column getSplitColumn(Table table) { table.id())); } - // use first field in primary key as the split key + if (chunkKeyColumn != null) { + Optional targetPkColumn = + primaryKeys.stream() + .filter(col -> chunkKeyColumn.equals(col.name())) + .findFirst(); + if (targetPkColumn.isPresent()) { + return targetPkColumn.get(); + } + throw new ValidationException( + String.format( + "Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.", + chunkKeyColumn, + primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")), + table.id())); + } + + // use the first column of primary key columns as the chunk key column by default return primaryKeys.get(0); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java index 19a4d28e09..1dec806940 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java @@ -79,6 +79,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final boolean scanNewlyAddedTableEnabled; private final Properties jdbcProperties; private final Duration heartbeatInterval; + private final String chunkKeyColumn; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -135,7 +136,8 @@ public MySqlTableSource( startupOptions, false, new Properties(), - heartbeatInterval); + heartbeatInterval, + null); } public MySqlTableSource( @@ -161,7 +163,8 @@ public MySqlTableSource( StartupOptions startupOptions, boolean scanNewlyAddedTableEnabled, Properties jdbcProperties, - Duration heartbeatInterval) { + Duration heartbeatInterval, + @Nullable String chunkKeyColumn) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -188,6 +191,7 @@ public MySqlTableSource( this.producedDataType = physicalSchema.toPhysicalRowDataType(); this.metadataKeys = Collections.emptyList(); this.heartbeatInterval = heartbeatInterval; + this.chunkKeyColumn = chunkKeyColumn; } @Override @@ -242,6 +246,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .jdbcProperties(jdbcProperties) .heartbeatInterval(heartbeatInterval) + .chunkKeyColumn(chunkKeyColumn) .build(); return SourceProvider.of(parallelSource); } else { @@ -320,7 +325,8 @@ public DynamicTableSource copy() { startupOptions, scanNewlyAddedTableEnabled, jdbcProperties, - heartbeatInterval); + heartbeatInterval, + chunkKeyColumn); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -359,7 +365,8 @@ public boolean equals(Object o) { && Objects.equals(producedDataType, that.producedDataType) && Objects.equals(metadataKeys, that.metadataKeys) && Objects.equals(jdbcProperties, that.jdbcProperties) - && Objects.equals(heartbeatInterval, that.heartbeatInterval); + && Objects.equals(heartbeatInterval, that.heartbeatInterval) + && Objects.equals(chunkKeyColumn, that.chunkKeyColumn); } @Override @@ -389,7 +396,8 @@ public int hashCode() { metadataKeys, scanNewlyAddedTableEnabled, jdbcProperties, - heartbeatInterval); + heartbeatInterval, + chunkKeyColumn); } @Override diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 9af4672279..d9bd0d42bf 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -35,6 +35,8 @@ import java.util.Set; import java.util.regex.Pattern; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES; @@ -44,6 +46,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.HOSTNAME; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PASSWORD; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PORT; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; @@ -54,8 +57,6 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_ID; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.TABLE_NAME; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.USERNAME; import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; @@ -96,8 +97,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { Duration connectTimeout = config.get(CONNECT_TIMEOUT); int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); - double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); - double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + double distributionFactorUpper = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); + double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL); @@ -137,7 +138,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { startupOptions, scanNewlyAddedTableEnabled, JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()), - heartbeatInterval); + heartbeatInterval, + config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN).orElse(null)); } @Override @@ -172,11 +174,12 @@ public Set> optionalOptions() { options.add(SCAN_SNAPSHOT_FETCH_SIZE); options.add(CONNECT_TIMEOUT); options.add(CONNECTION_POOL_SIZE); - options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); - options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); + options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(CONNECT_MAX_RETRIES); options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); options.add(HEARTBEAT_INTERVAL); + options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); return options; } @@ -288,7 +291,7 @@ private void validateDistributionFactorUpper(double distributionFactorUpper) { doubleCompare(distributionFactorUpper, 1.0d) >= 0, String.format( "The value of option '%s' must larger than or equals %s, but is %s", - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(), 1.0d, distributionFactorUpper)); } @@ -300,7 +303,7 @@ private void validateDistributionFactorLower(double distributionFactorLower) { && doubleCompare(distributionFactorLower, 1.0d) <= 0, String.format( "The value of option '%s' must between %s and %s inclusively, but is %s", - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(), 0.0d, 1.0d, distributionFactorLower)); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 6f12589745..0bf4520fd1 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -36,10 +36,11 @@ import java.util.Optional; import java.util.stream.Collectors; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** Tests for {@link MySqlSnapshotSplitAssigner}. */ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { @@ -62,8 +63,8 @@ public void testAssignSingleTableSplits() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"}); assertEquals(expected, splits); } @@ -74,8 +75,8 @@ public void testAssignTableWhoseRowCntLessSplitSize() { List splits = getTestAssignSnapshotSplits( 2000, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customers"}); assertEquals(expected, splits); } @@ -93,8 +94,8 @@ public void testAssignMultipleTableSplits() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customers_even_dist", customerDatabase.getDatabaseName() + ".customers_sparse_dist" @@ -102,6 +103,64 @@ public void testAssignMultipleTableSplits() { assertEquals(expected, splits); } + @Test + public void testAssignCompositePkTableSplitsUnevenlyWithChunkKeyColumn() { + List expected = + Arrays.asList( + "shopping_cart null [KIND_007]", + "shopping_cart [KIND_007] [KIND_008]", + "shopping_cart [KIND_008] [KIND_009]", + "shopping_cart [KIND_009] [KIND_100]", + "shopping_cart [KIND_100] null"); + List splits = + getTestAssignSnapshotSplits( + customerDatabase, + 4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"}, + "product_kind"); + assertEquals(expected, splits); + } + + @Test + public void testAssignCompositePkTableSplitsEvenlyWithChunkKeyColumn() { + List expected = + Arrays.asList( + "evenly_shopping_cart null [105]", + "evenly_shopping_cart [105] [109]", + "evenly_shopping_cart [109] null"); + List splits = + getTestAssignSnapshotSplits( + customerDatabase, + 4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".evenly_shopping_cart"}, + "product_no"); + assertEquals(expected, splits); + } + + @Test + public void testAssignCompositePkTableWithWrongChunkKeyColumn() { + try { + getTestAssignSnapshotSplits( + customerDatabase, + 4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] {customerDatabase.getDatabaseName() + ".customer_card"}, + "errorCol"); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + t, + "Chunk key column 'errorCol' doesn't exist in the primary key [card_no,level] of the table") + .isPresent()); + } + } + @Test public void testEnableAutoIncrementedKeyOptimization() { List expected = @@ -109,8 +168,8 @@ public void testEnableAutoIncrementedKeyOptimization() { List splits = getTestAssignSnapshotSplits( 2, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_big"}); assertEquals(expected, splits); } @@ -125,8 +184,8 @@ public void testAssignSnapshotSplitsWithRandomPrimaryKey() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".address"}); assertEquals(expected, splits); } @@ -140,8 +199,8 @@ public void testAssignSnapshotSplitsWithDecimalKey() { List splits = getTestAssignSnapshotSplits( 2, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_dec"}); assertEquals(expected, splits); } @@ -159,8 +218,8 @@ public void testAssignTableWithMultipleKey() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customer_card"}); assertEquals(expected, splits); } @@ -178,7 +237,7 @@ public void testAssignTableWithSparseDistributionSplitKey() { getTestAssignSnapshotSplits( 4, 2000.0d, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customers_sparse_dist" }); @@ -194,7 +253,7 @@ public void testAssignTableWithSparseDistributionSplitKey() { getTestAssignSnapshotSplits( 4, 2.0d, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customers_sparse_dist" }); @@ -212,8 +271,8 @@ public void testAssignTableWithDenseDistributionSplitKey() { List splits = getTestAssignSnapshotSplits( 2, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customers_dense_dist" }); @@ -225,7 +284,7 @@ public void testAssignTableWithDenseDistributionSplitKey() { List splits1 = getTestAssignSnapshotSplits( 2, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), 0.9d, new String[] { customerDatabase.getDatabaseName() + ".customers_dense_dist" @@ -239,8 +298,8 @@ public void testAssignTableWithSingleLine() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customer_card_single_line" }); @@ -258,8 +317,8 @@ public void testAssignTableWithCombinedIntSplitKey() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"}); assertEquals(expected, splits); } @@ -275,8 +334,8 @@ public void testAssignTableWithConfiguredStringSplitKey() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"}); assertEquals(expected, splits); } @@ -293,8 +352,8 @@ public void testAssignMinSplitSize() { List splits = getTestAssignSnapshotSplits( 2, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"}); assertEquals(expected, splits); } @@ -305,8 +364,8 @@ public void testAssignMaxSplitSize() { List splits = getTestAssignSnapshotSplits( 8096, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"}); assertEquals(expected, splits); } @@ -316,8 +375,8 @@ public void testUnMatchedPrimaryKey() { try { getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customer_card"}); } catch (Throwable t) { assertTrue( @@ -334,8 +393,8 @@ public void testTableWithoutPrimaryKey() { try { getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {tableWithoutPrimaryKey}); } catch (Throwable t) { assertTrue( @@ -353,9 +412,45 @@ private List getTestAssignSnapshotSplits( double distributionFactorUpper, double distributionFactorLower, String[] captureTables) { + return getTestAssignSnapshotSplits( + customerDatabase, + splitSize, + distributionFactorUpper, + distributionFactorLower, + captureTables, + null); + } + + private List getTestAssignSnapshotSplits( + UniqueDatabase database, + int splitSize, + double distributionFactorUpper, + double distributionFactorLower, + String[] captureTables) { + return getTestAssignSnapshotSplits( + database, + splitSize, + distributionFactorUpper, + distributionFactorLower, + captureTables, + null); + } + + private List getTestAssignSnapshotSplits( + UniqueDatabase database, + int splitSize, + double distributionFactorUpper, + double distributionFactorLower, + String[] captureTables, + String chunkKeyColumn) { MySqlSourceConfig configuration = getConfig( - splitSize, distributionFactorUpper, distributionFactorLower, captureTables); + database, + splitSize, + distributionFactorUpper, + distributionFactorLower, + captureTables, + chunkKeyColumn); List remainingTables = Arrays.stream(captureTables).map(TableId::parse).collect(Collectors.toList()); final MySqlSnapshotSplitAssigner assigner = @@ -390,13 +485,15 @@ private List getTestAssignSnapshotSplits( } private MySqlSourceConfig getConfig( + UniqueDatabase database, int splitSize, double distributionFactorUpper, double distributionLower, - String[] captureTables) { + String[] captureTables, + String chunkKeyColumn) { return new MySqlSourceConfigFactory() .startupOptions(StartupOptions.initial()) - .databaseList(customerDatabase.getDatabaseName()) + .databaseList(database.getDatabaseName()) .tableList(captureTables) .hostname(MYSQL_CONTAINER.getHost()) .port(MYSQL_CONTAINER.getDatabasePort()) @@ -404,9 +501,10 @@ private MySqlSourceConfig getConfig( .fetchSize(2) .distributionFactorUpper(distributionFactorUpper) .distributionFactorLower(distributionLower) - .username(customerDatabase.getUsername()) - .password(customerDatabase.getPassword()) + .username(database.getUsername()) + .password(database.getPassword()) .serverTimeZone(ZoneId.of("UTC").toString()) + .chunkKeyColumn(chunkKeyColumn) .createConfig(0); } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 9b31a6581e..55d00894a0 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -43,6 +43,8 @@ import java.util.Map; import java.util.Properties; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES; @@ -51,8 +53,6 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -117,12 +117,13 @@ public void testCommonProperties() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -133,10 +134,11 @@ public void testEnableParallelReadSource() { properties.put("server-id", "123-126"); properties.put("scan.incremental.snapshot.chunk.size", "8000"); properties.put("chunk-meta.group.size", "3000"); - properties.put("split-key.even-distribution.factor.upper-bound", "40.5"); - properties.put("split-key.even-distribution.factor.lower-bound", "0.01"); + properties.put("chunk-key.even-distribution.factor.upper-bound", "40.5"); + properties.put("chunk-key.even-distribution.factor.lower-bound", "0.01"); properties.put("scan.snapshot.fetch.size", "100"); properties.put("connect.timeout", "45s"); + properties.put("scan.incremental.snapshot.chunk.key-column", "testCol"); // validation for source DynamicTableSource actualSource = createTableSource(properties); @@ -164,7 +166,8 @@ public void testEnableParallelReadSource() { StartupOptions.initial(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + "testCol"); assertEquals(expectedSource, actualSource); } @@ -198,12 +201,13 @@ public void testEnableParallelReadSourceWithSingleServerId() { Duration.ofSeconds(45), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -235,12 +239,13 @@ public void testEnableParallelReadSourceLatestOffset() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.latest(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -254,6 +259,7 @@ public void testOptionalProperties() { options.put("debezium.snapshot.mode", "never"); options.put("jdbc.properties.useSSL", "false"); options.put("heartbeat.interval", "15213ms"); + options.put("scan.incremental.snapshot.chunk.key-column", "testCol"); DynamicTableSource actualSource = createTableSource(options); Properties dbzProperties = new Properties(); @@ -279,12 +285,13 @@ public void testOptionalProperties() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), true, jdbcProperties, - Duration.ofMillis(15213)); + Duration.ofMillis(15213), + "testCol"); assertEquals(expectedSource, actualSource); } @@ -338,12 +345,13 @@ public void testStartupFromInitial() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -406,12 +414,13 @@ public void testStartupFromLatestOffset() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.latest(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -446,12 +455,13 @@ public void testMetadataColumns() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); @@ -549,7 +559,7 @@ public void testValidation() { assertThat( t, containsMessage( - "The value of option 'split-key.even-distribution.factor.upper-bound' must larger than or equals 1.0, but is 0.8")); + "The value of option 'chunk-key.even-distribution.factor.upper-bound' must larger than or equals 1.0, but is 0.8")); } // validate illegal connection pool size diff --git a/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql b/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql index 3e810ab742..9ba69bf9a0 100644 --- a/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql +++ b/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql @@ -231,6 +231,29 @@ VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'), (404, 'KIND_008', 'user_5', 'leo list'), (600, 'KIND_009', 'user_6', 'my shopping cart'); +-- table has combined primary key and one of the primary key is evenly +CREATE TABLE evenly_shopping_cart ( + product_no INT NOT NULL, + product_kind VARCHAR(255), + user_id VARCHAR(255) NOT NULL, + description VARCHAR(255) NOT NULL, + PRIMARY KEY(product_kind, product_no, user_id) +); + +insert into evenly_shopping_cart +VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'), + (102, 'KIND_002', 'user_1', 'my shopping cart'), + (103, 'KIND_007', 'user_1', 'my shopping cart'), + (104, 'KIND_008', 'user_1', 'my shopping cart'), + (105, 'KIND_100', 'user_2', 'my shopping list'), + (105, 'KIND_999', 'user_3', 'my shopping list'), + (107, 'KIND_010', 'user_4', 'my shopping list'), + (108, 'KIND_009', 'user_4', 'my shopping list'), + (109, 'KIND_002', 'user_5', 'leo list'), + (111, 'KIND_007', 'user_5', 'leo list'), + (111, 'KIND_008', 'user_5', 'leo list'), + (112, 'KIND_009', 'user_6', 'my shopping cart'); + -- table has bigint unsigned auto increment primary key CREATE TABLE shopping_cart_big ( product_no BIGINT UNSIGNED AUTO_INCREMENT NOT NULL,