diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java index e7eb866e507..dba621777da 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/options/SourceOptions.java @@ -89,22 +89,24 @@ public class SourceOptions { "The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups."); public static final ConfigOption SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = - ConfigOptions.key("split-key.even-distribution.factor.upper-bound") + ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound") .doubleType() .defaultValue(1000.0d) + .withFallbackKeys("split-key.even-distribution.factor.upper-bound") .withDescription( - "The upper bound of split key distribution factor. The distribution factor is used to determine whether the" + "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + " table is evenly distribution or not." + " The table chunks would use evenly calculation optimization when the data distribution is even," + " and the query MySQL for splitting would happen when it is uneven." + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); public static final ConfigOption SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = - ConfigOptions.key("split-key.even-distribution.factor.lower-bound") + ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound") .doubleType() .defaultValue(0.05d) + .withFallbackKeys("split-key.even-distribution.factor.lower-bound") .withDescription( - "The lower bound of split key distribution factor. The distribution factor is used to determine whether the" + "The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the" + " table is evenly distribution or not." + " The table chunks would use evenly calculation optimization when the data distribution is even," + " and the query MySQL for splitting would happen when it is uneven." diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java index 56327fddc9f..f9fb4f9997e 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -119,6 +119,15 @@ public MySqlSourceBuilder serverTimeZone(String timeZone) { return this; } + /** + * The chunk key of table snapshot, captured tables are split into multiple splits by the chunk + * key when read the snapshot of table. + */ + public MySqlSourceBuilder chunkKey(String chunkKey) { + this.configFactory.chunkKey(chunkKey); + return this; + } + /** * The split size (number of rows) of table snapshot, captured tables are split into multiple * splits when read the snapshot of table. diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java index 444fd74a773..fa34268a829 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/ChunkSplitter.java @@ -78,7 +78,7 @@ public Collection generateSplits(TableId tableId) { long start = System.currentTimeMillis(); Table table = mySqlSchema.getTableSchema(jdbc, tableId).getTable(); - Column splitColumn = ChunkUtils.getSplitColumn(table); + Column splitColumn = ChunkUtils.getSplitColumn(table, sourceConfig.getChunkKey()); final List chunks; try { chunks = splitTableIntoChunks(jdbc, tableId, splitColumn); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 812bfa4fce4..cf43087d39b 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -55,6 +55,7 @@ public class MySqlSourceConfig implements Serializable { private final boolean includeSchemaChanges; private final boolean scanNewlyAddedTableEnabled; private final Properties jdbcProperties; + @Nullable private final String chunkKey; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -84,7 +85,8 @@ public class MySqlSourceConfig implements Serializable { boolean includeSchemaChanges, boolean scanNewlyAddedTableEnabled, Properties dbzProperties, - Properties jdbcProperties) { + Properties jdbcProperties, + @Nullable String chunkKey) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -108,6 +110,7 @@ public class MySqlSourceConfig implements Serializable { this.dbzConfiguration = Configuration.from(dbzProperties); this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration); this.jdbcProperties = jdbcProperties; + this.chunkKey = chunkKey; } public String getHostname() { @@ -206,4 +209,9 @@ public RelationalTableFilters getTableFilters() { public Properties getJdbcProperties() { return jdbcProperties; } + + @Nullable + public String getChunkKey() { + return chunkKey; + } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 0fa2087d9a4..6566bf42659 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -29,6 +29,8 @@ import java.util.Properties; import java.util.UUID; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES; @@ -37,8 +39,6 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.util.Preconditions.checkNotNull; /** A factory to construct {@link MySqlSourceConfig}. */ @@ -63,14 +63,15 @@ public class MySqlSourceConfigFactory implements Serializable { private int connectMaxRetries = CONNECT_MAX_RETRIES.defaultValue(); private int connectionPoolSize = CONNECTION_POOL_SIZE.defaultValue(); private double distributionFactorUpper = - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(); + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(); private double distributionFactorLower = - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(); + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(); private boolean includeSchemaChanges = false; private boolean scanNewlyAddedTableEnabled = false; private Properties jdbcProperties; private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue(); private Properties dbzProperties; + private String chunkKey; public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -140,6 +141,15 @@ public MySqlSourceConfigFactory serverTimeZone(String timeZone) { return this; } + /** + * The chunk key of table snapshot, captured tables are split into multiple splits by the chunk + * key when read the snapshot of table. + */ + public MySqlSourceConfigFactory chunkKey(String chunkKey) { + this.chunkKey = chunkKey; + return this; + } + /** * The split size (number of rows) of table snapshot, captured tables are split into multiple * splits when read the snapshot of table. @@ -329,6 +339,7 @@ public MySqlSourceConfig createConfig(int subtaskId) { includeSchemaChanges, scanNewlyAddedTableEnabled, props, - jdbcProperties); + jdbcProperties, + chunkKey); } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index 44da4ca736e..9a20ab1c724 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -181,24 +181,26 @@ public class MySqlSourceOptions { "The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups."); @Experimental - public static final ConfigOption SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = - ConfigOptions.key("split-key.even-distribution.factor.upper-bound") + public static final ConfigOption CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound") .doubleType() .defaultValue(1000.0d) + .withFallbackKeys("split-key.even-distribution.factor.upper-bound") .withDescription( - "The upper bound of split key distribution factor. The distribution factor is used to determine whether the" + "The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the" + " table is evenly distribution or not." + " The table chunks would use evenly calculation optimization when the data distribution is even," + " and the query MySQL for splitting would happen when it is uneven." + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); @Experimental - public static final ConfigOption SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = - ConfigOptions.key("split-key.even-distribution.factor.lower-bound") + public static final ConfigOption CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND = + ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound") .doubleType() .defaultValue(0.05d) + .withFallbackKeys("split-key.even-distribution.factor.lower-bound") .withDescription( - "The lower bound of split key distribution factor. The distribution factor is used to determine whether the" + "The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the" + " table is evenly distribution or not." + " The table chunks would use evenly calculation optimization when the data distribution is even," + " and the query MySQL for splitting would happen when it is uneven." @@ -211,4 +213,14 @@ public class MySqlSourceOptions { .defaultValue(false) .withDescription( "Whether capture the scan the newly added tables or not, by default is false."); + + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY = + ConfigOptions.key("scan.incremental.snapshot.chunk.key") + .stringType() + .noDefaultValue() + .withDescription( + "The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table." + + "By default, the chunk key is the first column of the primary key." + + "This column must be a column of the primary key."); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java index 421eadc0a16..aab4d21d367 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/ChunkUtils.java @@ -24,7 +24,11 @@ import io.debezium.relational.Column; import io.debezium.relational.Table; +import javax.annotation.Nullable; + import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.flink.table.api.DataTypes.FIELD; import static org.apache.flink.table.api.DataTypes.ROW; @@ -54,7 +58,7 @@ public static RowType getSplitType(Column splitColumn) { .getLogicalType(); } - public static Column getSplitColumn(Table table) { + public static Column getSplitColumn(Table table, @Nullable String chunkKey) { List primaryKeys = table.primaryKeyColumns(); if (primaryKeys.isEmpty()) { throw new ValidationException( @@ -64,7 +68,21 @@ public static Column getSplitColumn(Table table) { table.id())); } - // use first field in primary key as the split key + if (chunkKey != null) { + Optional targetPkColumn = + primaryKeys.stream().filter(col -> chunkKey.equals(col.name())).findFirst(); + if (targetPkColumn.isPresent()) { + return targetPkColumn.get(); + } + throw new ValidationException( + String.format( + "Chunk key '%s' doesn't exist in the primary key [%s] of the table %s.", + chunkKey, + primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")), + table.id())); + } + + // use first field in primary key as the split key by default return primaryKeys.get(0); } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java index 19a4d28e095..eb68d764997 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java @@ -79,6 +79,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final boolean scanNewlyAddedTableEnabled; private final Properties jdbcProperties; private final Duration heartbeatInterval; + private final String chunkKey; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -135,7 +136,8 @@ public MySqlTableSource( startupOptions, false, new Properties(), - heartbeatInterval); + heartbeatInterval, + null); } public MySqlTableSource( @@ -161,7 +163,8 @@ public MySqlTableSource( StartupOptions startupOptions, boolean scanNewlyAddedTableEnabled, Properties jdbcProperties, - Duration heartbeatInterval) { + Duration heartbeatInterval, + @Nullable String chunkKey) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -188,6 +191,7 @@ public MySqlTableSource( this.producedDataType = physicalSchema.toPhysicalRowDataType(); this.metadataKeys = Collections.emptyList(); this.heartbeatInterval = heartbeatInterval; + this.chunkKey = chunkKey; } @Override @@ -242,6 +246,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) .jdbcProperties(jdbcProperties) .heartbeatInterval(heartbeatInterval) + .chunkKey(chunkKey) .build(); return SourceProvider.of(parallelSource); } else { @@ -320,7 +325,8 @@ public DynamicTableSource copy() { startupOptions, scanNewlyAddedTableEnabled, jdbcProperties, - heartbeatInterval); + heartbeatInterval, + chunkKey); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -359,7 +365,8 @@ public boolean equals(Object o) { && Objects.equals(producedDataType, that.producedDataType) && Objects.equals(metadataKeys, that.metadataKeys) && Objects.equals(jdbcProperties, that.jdbcProperties) - && Objects.equals(heartbeatInterval, that.heartbeatInterval); + && Objects.equals(heartbeatInterval, that.heartbeatInterval) + && Objects.equals(chunkKey, that.chunkKey); } @Override @@ -389,7 +396,8 @@ public int hashCode() { metadataKeys, scanNewlyAddedTableEnabled, jdbcProperties, - heartbeatInterval); + heartbeatInterval, + chunkKey); } @Override diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 9af46722798..e9debcf7844 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -35,6 +35,8 @@ import java.util.Set; import java.util.regex.Pattern; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES; @@ -44,6 +46,7 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.HOSTNAME; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PASSWORD; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PORT; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; @@ -54,8 +57,6 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_ID; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.TABLE_NAME; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.USERNAME; import static com.ververica.cdc.connectors.mysql.source.utils.ObjectUtils.doubleCompare; @@ -96,8 +97,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { Duration connectTimeout = config.get(CONNECT_TIMEOUT); int connectMaxRetries = config.get(CONNECT_MAX_RETRIES); int connectionPoolSize = config.get(CONNECTION_POOL_SIZE); - double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); - double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + double distributionFactorUpper = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); + double distributionFactorLower = config.get(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED); Duration heartbeatInterval = config.get(HEARTBEAT_INTERVAL); @@ -137,7 +138,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { startupOptions, scanNewlyAddedTableEnabled, JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()), - heartbeatInterval); + heartbeatInterval, + config.getOptional(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY).orElse(null)); } @Override @@ -172,11 +174,12 @@ public Set> optionalOptions() { options.add(SCAN_SNAPSHOT_FETCH_SIZE); options.add(CONNECT_TIMEOUT); options.add(CONNECTION_POOL_SIZE); - options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); - options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); + options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); + options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(CONNECT_MAX_RETRIES); options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); options.add(HEARTBEAT_INTERVAL); + options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY); return options; } @@ -288,7 +291,7 @@ private void validateDistributionFactorUpper(double distributionFactorUpper) { doubleCompare(distributionFactorUpper, 1.0d) >= 0, String.format( "The value of option '%s' must larger than or equals %s, but is %s", - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.key(), 1.0d, distributionFactorUpper)); } @@ -300,7 +303,7 @@ private void validateDistributionFactorLower(double distributionFactorLower) { && doubleCompare(distributionFactorLower, 1.0d) <= 0, String.format( "The value of option '%s' must between %s and %s inclusively, but is %s", - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.key(), 0.0d, 1.0d, distributionFactorLower)); diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index 6f125897458..2fafbe3cada 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -36,20 +36,24 @@ import java.util.Optional; import java.util.stream.Collectors; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** Tests for {@link MySqlSnapshotSplitAssigner}. */ public class MySqlSnapshotSplitAssignerTest extends MySqlSourceTestBase { private static final UniqueDatabase customerDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); + private static final UniqueDatabase compositePkDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "composite_pk", "mysqluser", "mysqlpw"); @BeforeClass public static void init() { customerDatabase.createAndInitialize(); + compositePkDatabase.createAndInitialize(); } @Test @@ -62,8 +66,8 @@ public void testAssignSingleTableSplits() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"}); assertEquals(expected, splits); } @@ -74,8 +78,8 @@ public void testAssignTableWhoseRowCntLessSplitSize() { List splits = getTestAssignSnapshotSplits( 2000, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customers"}); assertEquals(expected, splits); } @@ -93,8 +97,8 @@ public void testAssignMultipleTableSplits() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customers_even_dist", customerDatabase.getDatabaseName() + ".customers_sparse_dist" @@ -102,6 +106,68 @@ public void testAssignMultipleTableSplits() { assertEquals(expected, splits); } + @Test + public void testAssignCompositePkTableSplitsUnevenly() { + List expected = + Arrays.asList( + "composite_pk_table null [name2]", + "composite_pk_table [name2] [name3]", + "composite_pk_table [name3] [name4]", + "composite_pk_table [name4] null"); + List splits = + getTestAssignSnapshotSplits( + compositePkDatabase, + 4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] { + compositePkDatabase.getDatabaseName() + ".composite_pk_table" + }, + "name"); + assertEquals(expected, splits); + } + + @Test + public void testAssignCompositePkTableSplitsEvenly() { + List expected = + Arrays.asList( + "composite_pk_table null [5]", + "composite_pk_table [5] [9]", + "composite_pk_table [9] [13]", + "composite_pk_table [13] null"); + List splits = + getTestAssignSnapshotSplits( + compositePkDatabase, + 4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] { + compositePkDatabase.getDatabaseName() + ".composite_pk_table" + }, + "id"); + assertEquals(expected, splits); + } + + @Test + public void testAssignCompositePkTableWithWrongChunkKey() { + try { + getTestAssignSnapshotSplits( + compositePkDatabase, + 4, + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + new String[] {compositePkDatabase.getDatabaseName() + ".composite_pk_table"}, + "errorCol"); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + t, + "Chunk key 'errorCol' doesn't exist in the primary key [nickname,id,name] of the table") + .isPresent()); + } + } + @Test public void testEnableAutoIncrementedKeyOptimization() { List expected = @@ -109,8 +175,8 @@ public void testEnableAutoIncrementedKeyOptimization() { List splits = getTestAssignSnapshotSplits( 2, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_big"}); assertEquals(expected, splits); } @@ -125,8 +191,8 @@ public void testAssignSnapshotSplitsWithRandomPrimaryKey() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".address"}); assertEquals(expected, splits); } @@ -140,8 +206,8 @@ public void testAssignSnapshotSplitsWithDecimalKey() { List splits = getTestAssignSnapshotSplits( 2, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".shopping_cart_dec"}); assertEquals(expected, splits); } @@ -159,8 +225,8 @@ public void testAssignTableWithMultipleKey() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customer_card"}); assertEquals(expected, splits); } @@ -178,7 +244,7 @@ public void testAssignTableWithSparseDistributionSplitKey() { getTestAssignSnapshotSplits( 4, 2000.0d, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customers_sparse_dist" }); @@ -194,7 +260,7 @@ public void testAssignTableWithSparseDistributionSplitKey() { getTestAssignSnapshotSplits( 4, 2.0d, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customers_sparse_dist" }); @@ -212,8 +278,8 @@ public void testAssignTableWithDenseDistributionSplitKey() { List splits = getTestAssignSnapshotSplits( 2, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customers_dense_dist" }); @@ -225,7 +291,7 @@ public void testAssignTableWithDenseDistributionSplitKey() { List splits1 = getTestAssignSnapshotSplits( 2, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), 0.9d, new String[] { customerDatabase.getDatabaseName() + ".customers_dense_dist" @@ -239,8 +305,8 @@ public void testAssignTableWithSingleLine() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] { customerDatabase.getDatabaseName() + ".customer_card_single_line" }); @@ -258,8 +324,8 @@ public void testAssignTableWithCombinedIntSplitKey() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"}); assertEquals(expected, splits); } @@ -275,8 +341,8 @@ public void testAssignTableWithConfiguredStringSplitKey() { List splits = getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".shopping_cart"}); assertEquals(expected, splits); } @@ -293,8 +359,8 @@ public void testAssignMinSplitSize() { List splits = getTestAssignSnapshotSplits( 2, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"}); assertEquals(expected, splits); } @@ -305,8 +371,8 @@ public void testAssignMaxSplitSize() { List splits = getTestAssignSnapshotSplits( 8096, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customers_even_dist"}); assertEquals(expected, splits); } @@ -316,8 +382,8 @@ public void testUnMatchedPrimaryKey() { try { getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {customerDatabase.getDatabaseName() + ".customer_card"}); } catch (Throwable t) { assertTrue( @@ -334,8 +400,8 @@ public void testTableWithoutPrimaryKey() { try { getTestAssignSnapshotSplits( 4, - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), new String[] {tableWithoutPrimaryKey}); } catch (Throwable t) { assertTrue( @@ -353,9 +419,45 @@ private List getTestAssignSnapshotSplits( double distributionFactorUpper, double distributionFactorLower, String[] captureTables) { + return getTestAssignSnapshotSplits( + customerDatabase, + splitSize, + distributionFactorUpper, + distributionFactorLower, + captureTables, + null); + } + + private List getTestAssignSnapshotSplits( + UniqueDatabase database, + int splitSize, + double distributionFactorUpper, + double distributionFactorLower, + String[] captureTables) { + return getTestAssignSnapshotSplits( + database, + splitSize, + distributionFactorUpper, + distributionFactorLower, + captureTables, + null); + } + + private List getTestAssignSnapshotSplits( + UniqueDatabase database, + int splitSize, + double distributionFactorUpper, + double distributionFactorLower, + String[] captureTables, + String chunkKey) { MySqlSourceConfig configuration = getConfig( - splitSize, distributionFactorUpper, distributionFactorLower, captureTables); + database, + splitSize, + distributionFactorUpper, + distributionFactorLower, + captureTables, + chunkKey); List remainingTables = Arrays.stream(captureTables).map(TableId::parse).collect(Collectors.toList()); final MySqlSnapshotSplitAssigner assigner = @@ -390,13 +492,15 @@ private List getTestAssignSnapshotSplits( } private MySqlSourceConfig getConfig( + UniqueDatabase database, int splitSize, double distributionFactorUpper, double distributionLower, - String[] captureTables) { + String[] captureTables, + String chunkKey) { return new MySqlSourceConfigFactory() .startupOptions(StartupOptions.initial()) - .databaseList(customerDatabase.getDatabaseName()) + .databaseList(database.getDatabaseName()) .tableList(captureTables) .hostname(MYSQL_CONTAINER.getHost()) .port(MYSQL_CONTAINER.getDatabasePort()) @@ -404,9 +508,10 @@ private MySqlSourceConfig getConfig( .fetchSize(2) .distributionFactorUpper(distributionFactorUpper) .distributionFactorLower(distributionLower) - .username(customerDatabase.getUsername()) - .password(customerDatabase.getPassword()) + .username(database.getUsername()) + .password(database.getPassword()) .serverTimeZone(ZoneId.of("UTC").toString()) + .chunkKey(chunkKey) .createConfig(0); } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 9b31a6581ea..d98d714278a 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -43,6 +43,8 @@ import java.util.Map; import java.util.Properties; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; +import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES; @@ -51,8 +53,6 @@ import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED; import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; -import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -117,12 +117,13 @@ public void testCommonProperties() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -133,10 +134,11 @@ public void testEnableParallelReadSource() { properties.put("server-id", "123-126"); properties.put("scan.incremental.snapshot.chunk.size", "8000"); properties.put("chunk-meta.group.size", "3000"); - properties.put("split-key.even-distribution.factor.upper-bound", "40.5"); - properties.put("split-key.even-distribution.factor.lower-bound", "0.01"); + properties.put("chunk-key.even-distribution.factor.upper-bound", "40.5"); + properties.put("chunk-key.even-distribution.factor.lower-bound", "0.01"); properties.put("scan.snapshot.fetch.size", "100"); properties.put("connect.timeout", "45s"); + properties.put("scan.incremental.snapshot.chunk.key", "testCol"); // validation for source DynamicTableSource actualSource = createTableSource(properties); @@ -164,7 +166,8 @@ public void testEnableParallelReadSource() { StartupOptions.initial(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + "testCol"); assertEquals(expectedSource, actualSource); } @@ -198,12 +201,13 @@ public void testEnableParallelReadSourceWithSingleServerId() { Duration.ofSeconds(45), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -235,12 +239,13 @@ public void testEnableParallelReadSourceLatestOffset() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.latest(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -254,6 +259,7 @@ public void testOptionalProperties() { options.put("debezium.snapshot.mode", "never"); options.put("jdbc.properties.useSSL", "false"); options.put("heartbeat.interval", "15213ms"); + options.put("scan.incremental.snapshot.chunk.key", "testCol"); DynamicTableSource actualSource = createTableSource(options); Properties dbzProperties = new Properties(); @@ -279,12 +285,13 @@ public void testOptionalProperties() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), true, jdbcProperties, - Duration.ofMillis(15213)); + Duration.ofMillis(15213), + "testCol"); assertEquals(expectedSource, actualSource); } @@ -338,12 +345,13 @@ public void testStartupFromInitial() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -406,12 +414,13 @@ public void testStartupFromLatestOffset() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.latest(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -446,12 +455,13 @@ public void testMetadataColumns() { CONNECT_TIMEOUT.defaultValue(), CONNECT_MAX_RETRIES.defaultValue(), CONNECTION_POOL_SIZE.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), - SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(), + CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(), StartupOptions.initial(), false, new Properties(), - HEARTBEAT_INTERVAL.defaultValue()); + HEARTBEAT_INTERVAL.defaultValue(), + null); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); @@ -549,7 +559,7 @@ public void testValidation() { assertThat( t, containsMessage( - "The value of option 'split-key.even-distribution.factor.upper-bound' must larger than or equals 1.0, but is 0.8")); + "The value of option 'chunk-key.even-distribution.factor.upper-bound' must larger than or equals 1.0, but is 0.8")); } // validate illegal connection pool size diff --git a/flink-connector-mysql-cdc/src/test/resources/ddl/composite_pk.sql b/flink-connector-mysql-cdc/src/test/resources/ddl/composite_pk.sql new file mode 100644 index 00000000000..b332678574b --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/resources/ddl/composite_pk.sql @@ -0,0 +1,42 @@ +-- Copyright 2022 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: composite_pk +-- ---------------------------------------------------------------------------------------------------------------- + +CREATE TABLE composite_pk_table ( + nickname VARCHAR(255) NOT NULL, + id BIGINT NOT NULL, + name VARCHAR(255) NOT NULL, + address VARCHAR(1024), + phone_number VARCHAR(512), + PRIMARY KEY (`nickname`, `id`,`name`) +); + +INSERT INTO composite_pk_table +VALUES ("a", 1, "name1", "Beijing", "0123"), + ("b", 2, "name1", "Beijing", "4567"), + ("c", 3, "name1", "Beijing", "8901"), + ("d", 4, "name2", "Beijing", "2345"), + ("e", 5, "name2", "Beijing", "6789"), + ("f", 6, "name2", "Beijing", "0123"), + ("a", 7, "name3", "Beijing", "4567"), + ("b", 8, "name3", "Beijing", "8901"), + ("c", 9, "name3", "Beijing", "2345"), + ("d", 10, "name4", "Beijing", "6789"), + ("e", 11, "name4", "Beijing", "0123"), + ("f", 12, "name4", "Beijing", "0123"), + ("a", 13, "name5", "Beijing", "0123"), + ("b", 14, "name5", "Beijing", "0123"), + ("c", 15, "name5", "Beijing", "0123"); \ No newline at end of file