Skip to content

Commit

Permalink
[mysql] allow users to select one column in the primary key as the ch…
Browse files Browse the repository at this point in the history
…unk key to get chunks. (#1108)
  • Loading branch information
ruanhang1993 committed Jul 27, 2022
1 parent 4a6a899 commit 58326b3
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,24 @@ public class SourceOptions {
"The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups.");

public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
ConfigOptions.key("split-key.even-distribution.factor.upper-bound")
ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound")
.doubleType()
.defaultValue(1000.0d)
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
.withDescription(
"The upper bound of split key distribution factor. The distribution factor is used to determine whether the"
"The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");

public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
ConfigOptions.key("split-key.even-distribution.factor.lower-bound")
ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound")
.doubleType()
.defaultValue(0.05d)
.withFallbackKeys("split-key.even-distribution.factor.lower-bound")
.withDescription(
"The lower bound of split key distribution factor. The distribution factor is used to determine whether the"
"The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ public MySqlSourceBuilder<T> serverTimeZone(String timeZone) {
return this;
}

/**
* The chunk key of table snapshot, captured tables are split into multiple splits by the chunk
* key when read the snapshot of table.
*/
public MySqlSourceBuilder<T> chunkKey(String chunkKey) {
this.configFactory.chunkKey(chunkKey);
return this;
}

/**
* The split size (number of rows) of table snapshot, captured tables are split into multiple
* splits when read the snapshot of table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public Collection<MySqlSnapshotSplit> generateSplits(TableId tableId) {
long start = System.currentTimeMillis();

Table table = mySqlSchema.getTableSchema(jdbc, tableId).getTable();
Column splitColumn = ChunkUtils.getSplitColumn(table);
Column splitColumn = ChunkUtils.getSplitColumn(table, sourceConfig.getChunkKey());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class MySqlSourceConfig implements Serializable {
private final boolean includeSchemaChanges;
private final boolean scanNewlyAddedTableEnabled;
private final Properties jdbcProperties;
@Nullable private final String chunkKey;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand Down Expand Up @@ -84,7 +85,8 @@ public class MySqlSourceConfig implements Serializable {
boolean includeSchemaChanges,
boolean scanNewlyAddedTableEnabled,
Properties dbzProperties,
Properties jdbcProperties) {
Properties jdbcProperties,
@Nullable String chunkKey) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
Expand All @@ -108,6 +110,7 @@ public class MySqlSourceConfig implements Serializable {
this.dbzConfiguration = Configuration.from(dbzProperties);
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
this.jdbcProperties = jdbcProperties;
this.chunkKey = chunkKey;
}

public String getHostname() {
Expand Down Expand Up @@ -206,4 +209,9 @@ public RelationalTableFilters getTableFilters() {
public Properties getJdbcProperties() {
return jdbcProperties;
}

@Nullable
public String getChunkKey() {
return chunkKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Properties;
import java.util.UUID;

import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
Expand All @@ -37,8 +39,6 @@
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** A factory to construct {@link MySqlSourceConfig}. */
Expand All @@ -63,14 +63,15 @@ public class MySqlSourceConfigFactory implements Serializable {
private int connectMaxRetries = CONNECT_MAX_RETRIES.defaultValue();
private int connectionPoolSize = CONNECTION_POOL_SIZE.defaultValue();
private double distributionFactorUpper =
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
private double distributionFactorLower =
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
private boolean scanNewlyAddedTableEnabled = false;
private Properties jdbcProperties;
private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue();
private Properties dbzProperties;
private String chunkKey;

public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
Expand Down Expand Up @@ -140,6 +141,15 @@ public MySqlSourceConfigFactory serverTimeZone(String timeZone) {
return this;
}

/**
* The chunk key of table snapshot, captured tables are split into multiple splits by the chunk
* key when read the snapshot of table.
*/
public MySqlSourceConfigFactory chunkKey(String chunkKey) {
this.chunkKey = chunkKey;
return this;
}

/**
* The split size (number of rows) of table snapshot, captured tables are split into multiple
* splits when read the snapshot of table.
Expand Down Expand Up @@ -329,6 +339,7 @@ public MySqlSourceConfig createConfig(int subtaskId) {
includeSchemaChanges,
scanNewlyAddedTableEnabled,
props,
jdbcProperties);
jdbcProperties,
chunkKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,24 +181,26 @@ public class MySqlSourceOptions {
"The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups.");

@Experimental
public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
ConfigOptions.key("split-key.even-distribution.factor.upper-bound")
public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound")
.doubleType()
.defaultValue(1000.0d)
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
.withDescription(
"The upper bound of split key distribution factor. The distribution factor is used to determine whether the"
"The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");

@Experimental
public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
ConfigOptions.key("split-key.even-distribution.factor.lower-bound")
public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound")
.doubleType()
.defaultValue(0.05d)
.withFallbackKeys("split-key.even-distribution.factor.lower-bound")
.withDescription(
"The lower bound of split key distribution factor. The distribution factor is used to determine whether the"
"The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
Expand All @@ -211,4 +213,14 @@ public class MySqlSourceOptions {
.defaultValue(false)
.withDescription(
"Whether capture the scan the newly added tables or not, by default is false.");

@Experimental
public static final ConfigOption<String> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY =
ConfigOptions.key("scan.incremental.snapshot.chunk.key")
.stringType()
.noDefaultValue()
.withDescription(
"The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table."
+ "By default, the chunk key is the first column of the primary key."
+ "This column must be a column of the primary key.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
import io.debezium.relational.Column;
import io.debezium.relational.Table;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
Expand Down Expand Up @@ -54,7 +58,7 @@ public static RowType getSplitType(Column splitColumn) {
.getLogicalType();
}

public static Column getSplitColumn(Table table) {
public static Column getSplitColumn(Table table, @Nullable String chunkKey) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
Expand All @@ -64,7 +68,21 @@ public static Column getSplitColumn(Table table) {
table.id()));
}

// use first field in primary key as the split key
if (chunkKey != null) {
Optional<Column> targetPkColumn =
primaryKeys.stream().filter(col -> chunkKey.equals(col.name())).findFirst();
if (targetPkColumn.isPresent()) {
return targetPkColumn.get();
}
throw new ValidationException(
String.format(
"Chunk key '%s' doesn't exist in the primary key [%s] of the table %s.",
chunkKey,
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
table.id()));
}

// use first field in primary key as the split key by default
return primaryKeys.get(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final boolean scanNewlyAddedTableEnabled;
private final Properties jdbcProperties;
private final Duration heartbeatInterval;
private final String chunkKey;

// --------------------------------------------------------------------------------------------
// Mutable attributes
Expand Down Expand Up @@ -135,7 +136,8 @@ public MySqlTableSource(
startupOptions,
false,
new Properties(),
heartbeatInterval);
heartbeatInterval,
null);
}

public MySqlTableSource(
Expand All @@ -161,7 +163,8 @@ public MySqlTableSource(
StartupOptions startupOptions,
boolean scanNewlyAddedTableEnabled,
Properties jdbcProperties,
Duration heartbeatInterval) {
Duration heartbeatInterval,
@Nullable String chunkKey) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
Expand All @@ -188,6 +191,7 @@ public MySqlTableSource(
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
this.heartbeatInterval = heartbeatInterval;
this.chunkKey = chunkKey;
}

@Override
Expand Down Expand Up @@ -242,6 +246,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.jdbcProperties(jdbcProperties)
.heartbeatInterval(heartbeatInterval)
.chunkKey(chunkKey)
.build();
return SourceProvider.of(parallelSource);
} else {
Expand Down Expand Up @@ -320,7 +325,8 @@ public DynamicTableSource copy() {
startupOptions,
scanNewlyAddedTableEnabled,
jdbcProperties,
heartbeatInterval);
heartbeatInterval,
chunkKey);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down Expand Up @@ -359,7 +365,8 @@ public boolean equals(Object o) {
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(jdbcProperties, that.jdbcProperties)
&& Objects.equals(heartbeatInterval, that.heartbeatInterval);
&& Objects.equals(heartbeatInterval, that.heartbeatInterval)
&& Objects.equals(chunkKey, that.chunkKey);
}

@Override
Expand Down Expand Up @@ -389,7 +396,8 @@ public int hashCode() {
metadataKeys,
scanNewlyAddedTableEnabled,
jdbcProperties,
heartbeatInterval);
heartbeatInterval,
chunkKey);
}

@Override
Expand Down
Loading

0 comments on commit 58326b3

Please sign in to comment.