Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mysql] allow users to select one column in the primary key as the chunk key to get chunks. (#1108) #1415

Merged
merged 3 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,24 @@ public class SourceOptions {
"The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups.");

public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
ConfigOptions.key("split-key.even-distribution.factor.upper-bound")
ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound")
.doubleType()
.defaultValue(1000.0d)
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
.withDescription(
"The upper bound of split key distribution factor. The distribution factor is used to determine whether the"
"The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");

public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
ConfigOptions.key("split-key.even-distribution.factor.lower-bound")
ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound")
.doubleType()
.defaultValue(0.05d)
.withFallbackKeys("split-key.even-distribution.factor.lower-bound")
.withDescription(
"The lower bound of split key distribution factor. The distribution factor is used to determine whether the"
"The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,9 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
// only the table who captured snapshot splits need to filter
if (finishedSplitsInfo.containsKey(tableId)) {
RowType splitKeyType =
ChunkUtils.getSplitType(
statefulTaskContext.getDatabaseSchema().tableFor(tableId));
ChunkUtils.getChunkKeyColumnType(
statefulTaskContext.getDatabaseSchema().tableFor(tableId),
statefulTaskContext.getSourceConfig().getChunkKeyColumn());
Object[] key =
getSplitKey(
splitKeyType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ public MySqlSourceBuilder<T> serverTimeZone(String timeZone) {
return this;
}

/**
* The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
* key column when read the snapshot of table.
*/
public MySqlSourceBuilder<T> chunkKeyColumn(String chunkKeyColumn) {
this.configFactory.chunkKeyColumn(chunkKeyColumn);
return this;
}

/**
* The split size (number of rows) of table snapshot, captured tables are split into multiple
* splits when read the snapshot of table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,26 @@ public Collection<MySqlSnapshotSplit> generateSplits(TableId tableId) {
long start = System.currentTimeMillis();

Table table = mySqlSchema.getTableSchema(jdbc, tableId).getTable();
Column splitColumn = ChunkUtils.getSplitColumn(table);
Column chunkKeyColumn =
ChunkUtils.getChunkKeyColumn(table, sourceConfig.getChunkKeyColumn());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
chunks = splitTableIntoChunks(jdbc, tableId, chunkKeyColumn);
} catch (SQLException e) {
throw new FlinkRuntimeException("Failed to split chunks for table " + tableId, e);
}

// convert chunks into splits
List<MySqlSnapshotSplit> splits = new ArrayList<>();
RowType splitType = ChunkUtils.getSplitType(splitColumn);
RowType chunkKeyColumnType = ChunkUtils.getChunkKeyColumnType(chunkKeyColumn);
for (int i = 0; i < chunks.size(); i++) {
ChunkRange chunk = chunks.get(i);
MySqlSnapshotSplit split =
createSnapshotSplit(
jdbc,
tableId,
i,
splitType,
chunkKeyColumnType,
chunk.getChunkStart(),
chunk.getChunkEnd());
splits.add(split);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class MySqlSourceConfig implements Serializable {
private final boolean includeSchemaChanges;
private final boolean scanNewlyAddedTableEnabled;
private final Properties jdbcProperties;
@Nullable private final String chunkKeyColumn;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand Down Expand Up @@ -84,7 +85,8 @@ public class MySqlSourceConfig implements Serializable {
boolean includeSchemaChanges,
boolean scanNewlyAddedTableEnabled,
Properties dbzProperties,
Properties jdbcProperties) {
Properties jdbcProperties,
@Nullable String chunkKeyColumn) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
Expand All @@ -108,6 +110,7 @@ public class MySqlSourceConfig implements Serializable {
this.dbzConfiguration = Configuration.from(dbzProperties);
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
this.jdbcProperties = jdbcProperties;
this.chunkKeyColumn = chunkKeyColumn;
}

public String getHostname() {
Expand Down Expand Up @@ -206,4 +209,9 @@ public RelationalTableFilters getTableFilters() {
public Properties getJdbcProperties() {
return jdbcProperties;
}

@Nullable
public String getChunkKeyColumn() {
return chunkKeyColumn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Properties;
import java.util.UUID;

import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_META_GROUP_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
Expand All @@ -37,8 +39,6 @@
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** A factory to construct {@link MySqlSourceConfig}. */
Expand All @@ -63,14 +63,15 @@ public class MySqlSourceConfigFactory implements Serializable {
private int connectMaxRetries = CONNECT_MAX_RETRIES.defaultValue();
private int connectionPoolSize = CONNECTION_POOL_SIZE.defaultValue();
private double distributionFactorUpper =
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue();
private double distributionFactorLower =
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
private boolean scanNewlyAddedTableEnabled = false;
private Properties jdbcProperties;
private Duration heartbeatInterval = HEARTBEAT_INTERVAL.defaultValue();
private Properties dbzProperties;
private String chunkKeyColumn;

public MySqlSourceConfigFactory hostname(String hostname) {
this.hostname = hostname;
Expand Down Expand Up @@ -140,6 +141,15 @@ public MySqlSourceConfigFactory serverTimeZone(String timeZone) {
return this;
}

/**
* The chunk key of table snapshot, captured tables are split into multiple chunks by the chunk
* key column when read the snapshot of table.
*/
public MySqlSourceConfigFactory chunkKeyColumn(String chunkKeyColumn) {
this.chunkKeyColumn = chunkKeyColumn;
return this;
}

/**
* The split size (number of rows) of table snapshot, captured tables are split into multiple
* splits when read the snapshot of table.
Expand Down Expand Up @@ -329,6 +339,7 @@ public MySqlSourceConfig createConfig(int subtaskId) {
includeSchemaChanges,
scanNewlyAddedTableEnabled,
props,
jdbcProperties);
jdbcProperties,
chunkKeyColumn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,24 +181,26 @@ public class MySqlSourceOptions {
"The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups.");

@Experimental
public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
ConfigOptions.key("split-key.even-distribution.factor.upper-bound")
public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
ConfigOptions.key("chunk-key.even-distribution.factor.upper-bound")
.doubleType()
.defaultValue(1000.0d)
.withFallbackKeys("split-key.even-distribution.factor.upper-bound")
.withDescription(
"The upper bound of split key distribution factor. The distribution factor is used to determine whether the"
"The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");

@Experimental
public static final ConfigOption<Double> SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
ConfigOptions.key("split-key.even-distribution.factor.lower-bound")
public static final ConfigOption<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
ConfigOptions.key("chunk-key.even-distribution.factor.lower-bound")
.doubleType()
.defaultValue(0.05d)
.withFallbackKeys("split-key.even-distribution.factor.lower-bound")
.withDescription(
"The lower bound of split key distribution factor. The distribution factor is used to determine whether the"
"The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the"
+ " table is evenly distribution or not."
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
+ " and the query MySQL for splitting would happen when it is uneven."
Expand All @@ -211,4 +213,14 @@ public class MySqlSourceOptions {
.defaultValue(false)
.withDescription(
"Whether capture the scan the newly added tables or not, by default is false.");

@Experimental
public static final ConfigOption<String> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
.stringType()
.noDefaultValue()
.withDescription(
"The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table."
+ "By default, the chunk key is the first column of the primary key."
+ "This column must be a column of the primary key.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
import io.debezium.relational.Column;
import io.debezium.relational.Table;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.ROW;
Expand All @@ -34,27 +38,17 @@ public class ChunkUtils {

private ChunkUtils() {}

public static RowType getSplitType(Table table) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
String.format(
"Incremental snapshot for tables requires primary key,"
+ " but table %s doesn't have primary key.",
table.id()));
}

// use first field in primary key as the split key
return getSplitType(primaryKeys.get(0));
public static RowType getChunkKeyColumnType(Table table, @Nullable String chunkKeyColumn) {
return getChunkKeyColumnType(getChunkKeyColumn(table, chunkKeyColumn));
}

public static RowType getSplitType(Column splitColumn) {
public static RowType getChunkKeyColumnType(Column chunkKeyColumn) {
return (RowType)
ROW(FIELD(splitColumn.name(), MySqlTypeUtils.fromDbzColumn(splitColumn)))
ROW(FIELD(chunkKeyColumn.name(), MySqlTypeUtils.fromDbzColumn(chunkKeyColumn)))
.getLogicalType();
}

public static Column getSplitColumn(Table table) {
public static Column getChunkKeyColumn(Table table, @Nullable String chunkKeyColumn) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
Expand All @@ -64,7 +58,23 @@ public static Column getSplitColumn(Table table) {
table.id()));
}

// use first field in primary key as the split key
if (chunkKeyColumn != null) {
Optional<Column> targetPkColumn =
primaryKeys.stream()
.filter(col -> chunkKeyColumn.equals(col.name()))
.findFirst();
if (targetPkColumn.isPresent()) {
return targetPkColumn.get();
}
throw new ValidationException(
String.format(
"Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.",
chunkKeyColumn,
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
table.id()));
}

// use the first column of primary key columns as the chunk key column by default
return primaryKeys.get(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final boolean scanNewlyAddedTableEnabled;
private final Properties jdbcProperties;
private final Duration heartbeatInterval;
private final String chunkKeyColumn;

// --------------------------------------------------------------------------------------------
// Mutable attributes
Expand Down Expand Up @@ -135,7 +136,8 @@ public MySqlTableSource(
startupOptions,
false,
new Properties(),
heartbeatInterval);
heartbeatInterval,
null);
}

public MySqlTableSource(
Expand All @@ -161,7 +163,8 @@ public MySqlTableSource(
StartupOptions startupOptions,
boolean scanNewlyAddedTableEnabled,
Properties jdbcProperties,
Duration heartbeatInterval) {
Duration heartbeatInterval,
@Nullable String chunkKeyColumn) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
Expand All @@ -188,6 +191,7 @@ public MySqlTableSource(
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
this.heartbeatInterval = heartbeatInterval;
this.chunkKeyColumn = chunkKeyColumn;
}

@Override
Expand Down Expand Up @@ -242,6 +246,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.jdbcProperties(jdbcProperties)
.heartbeatInterval(heartbeatInterval)
.chunkKeyColumn(chunkKeyColumn)
.build();
return SourceProvider.of(parallelSource);
} else {
Expand Down Expand Up @@ -320,7 +325,8 @@ public DynamicTableSource copy() {
startupOptions,
scanNewlyAddedTableEnabled,
jdbcProperties,
heartbeatInterval);
heartbeatInterval,
chunkKeyColumn);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down Expand Up @@ -359,7 +365,8 @@ public boolean equals(Object o) {
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(jdbcProperties, that.jdbcProperties)
&& Objects.equals(heartbeatInterval, that.heartbeatInterval);
&& Objects.equals(heartbeatInterval, that.heartbeatInterval)
&& Objects.equals(chunkKeyColumn, that.chunkKeyColumn);
}

@Override
Expand Down Expand Up @@ -389,7 +396,8 @@ public int hashCode() {
metadataKeys,
scanNewlyAddedTableEnabled,
jdbcProperties,
heartbeatInterval);
heartbeatInterval,
chunkKeyColumn);
}

@Override
Expand Down
Loading