Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36115][pipeline-connector][mysql] add scan.incremental.newly-added-table.enabled option #3560

Merged
merged 4 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,17 @@ pipeline:
<td>Boolean</td>
<td>是否启用动态加表特性,默认关闭。 此配置项只有作业从savepoint/checkpoint启动时才生效。</td>
</tr>
<tr>
<td>scan.binlog.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>在 binlog 读取阶段,是否读取新增表的表结构变更和数据变更,默认值是 false。 <br>
scan.newly-added-table.enabled 和 scan.binlog.newly-added-table.enabled 参数的不同在于: <br>
scan.newly-added-table.enabled: 在作业重启后,对新增表的全量和增量数据进行读取; <br>
scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,17 @@ pipeline:
<td>Boolean</td>
<td>Whether to enable scan the newly added tables feature or not, by default is false. This option is only useful when we start the job from a savepoint/checkpoint.</td>
</tr>
<tr>
<td>scan.binlog.newly-added-table.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false. <br>
The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: <br>
scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; <br>
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
Expand All @@ -38,6 +39,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;

import io.debezium.relational.Tables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -61,6 +63,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
Expand Down Expand Up @@ -128,6 +131,8 @@ public DataSource createDataSource(Context context) {
int connectMaxRetries = config.get(CONNECT_MAX_RETRIES);
int connectionPoolSize = config.get(CONNECTION_POOL_SIZE);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
boolean scanIncrementalNewlyAddedTableEnabled =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
boolean scanIncrementalNewlyAddedTableEnabled =
boolean scanInBinlogPhaseNewlyAddedTableEnabled =

config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -166,26 +171,32 @@ public DataSource createDataSource(Context context) {
.jdbcProperties(getJdbcProperties(configMap))
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled);

Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
List<String> capturedTables = getTableList(configFactory.createConfig(0), selectors);
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table by the option 'tables' = " + tables);
}
if (tablesExclude != null) {
Selectors selectExclude =
new Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
List<String> excludeTables = getTableList(configFactory.createConfig(0), selectExclude);
if (!excludeTables.isEmpty()) {
capturedTables.removeAll(excludeTables);
}
if (scanIncrementalNewlyAddedTableEnabled) {
String newTables = validateTableAndReturnDebeziumStyle(tables);
configFactory.tableList(newTables);
} else {
Selectors selectors = new Selectors.SelectorsBuilder().includeTables(tables).build();
List<String> capturedTables = getTableList(configFactory.createConfig(0), selectors);
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table with by the option 'tables.exclude' = "
+ tablesExclude);
"Cannot find any table by the option 'tables' = " + tables);
}
if (tablesExclude != null) {
Selectors selectExclude =
new Selectors.SelectorsBuilder().includeTables(tablesExclude).build();
List<String> excludeTables =
getTableList(configFactory.createConfig(0), selectExclude);
if (!excludeTables.isEmpty()) {
capturedTables.removeAll(excludeTables);
}
if (capturedTables.isEmpty()) {
throw new IllegalArgumentException(
"Cannot find any table with by the option 'tables.exclude' = "
+ tablesExclude);
}
}
configFactory.tableList(capturedTables.toArray(new String[0]));
}
configFactory.tableList(capturedTables.toArray(new String[0]));

String chunkKeyColumns = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
if (chunkKeyColumns != null) {
Expand Down Expand Up @@ -256,6 +267,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
return options;
}

Expand Down Expand Up @@ -410,6 +422,33 @@ && doubleCompare(distributionFactorLower, 1.0d) <= 0,
distributionFactorLower));
}

/**
* Currently, The supported regular syntax is not exactly the same in {@link Selectors} and
* {@link Tables.TableFilter}.
*
* <p>The main distinction are :
*
* <p>1) {@link Selectors} use `,` to split table names and {@link Tables.TableFilter} use use
* `|` to split table names.
*
* <p>2) If there is a need to use a dot (.) in a regular expression to match any character, it
* is necessary to escape the dot with a backslash, refer to {@link
* MySqlDataSourceOptions#TABLES}.
*/
private String validateTableAndReturnDebeziumStyle(String tables) {
// MySQL table names are not allowed to have `,` character.
if (tables.contains(",")) {
throw new IllegalArgumentException(
"the `,` in "
+ tables
+ " is not supported when "
+ SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED
+ " was enabled.");
}

return tables.replace("\\.", ".");
}

/** Replaces the default timezone placeholder with session timezone, if applicable. */
private static ZoneId getServerTimeZone(Configuration config) {
final String serverTimeZone = config.get(SERVER_TIME_ZONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,15 @@ public class MySqlDataSourceOptions {
+ "If there is a need to use a dot (.) in a regular expression to match any character, "
+ "it is necessary to escape the dot with a backslash."
+ "eg. db0.\\.*, db1.user_table_[0-9]+, db[1-2].[app|web]_order_\\.*");

@Experimental
public static final ConfigOption<Boolean> SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED =
ConfigOptions.key("scan.binlog.newly-added-table.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"In binlog reading stage, whether to scan the ddl and dml statements of newly added tables or not, by default is false. \n"
+ "The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: \n"
+ "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; \n"
+ "scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
Expand Down Expand Up @@ -83,7 +84,10 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_ID;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
Expand Down Expand Up @@ -149,6 +153,40 @@ private MySqlConnection getConnection() {
return DebeziumUtils.createMySqlConnection(configuration, new Properties());
}

@Test
public void testScanBinlogNewlyAddedTableEnabled() throws Exception {
List<String> tables = Collections.singletonList("address_\\.*");
Map<String, String> options = new HashMap<>();
options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.put(SCAN_STARTUP_MODE.key(), "timestamp");
options.put(
SCAN_STARTUP_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));

FlinkSourceProvider sourceProvider = getFlinkSourceProvider(tables, 4, options);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
env.enableCheckpointing(200);
DataStreamSource<Event> source =
env.fromSource(
sourceProvider.getSource(),
WatermarkStrategy.noWatermarks(),
MySqlDataSourceFactory.IDENTIFIER,
new EventTypeInfo());

TypeSerializer<Event> serializer =
source.getTransformation().getOutputType().createSerializer(env.getConfig());
CheckpointedCollectResultBuffer<Event> resultBuffer =
new CheckpointedCollectResultBuffer<>(serializer);
String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
CollectResultIterator<Event> iterator =
addCollector(env, source, resultBuffer, serializer, accumulatorName);
env.executeAsync("AddNewlyTablesWhenReadingBinlog");
initialAddressTables(getConnection(), Collections.singletonList("address_beijing"));
List<Event> actual = fetchResults(iterator, 4);
assertThat(((ChangeEvent) actual.get(0)).tableId())
.isEqualTo(TableId.tableId(customDatabase.getDatabaseName(), "address_beijing"));
}

@Test
public void testAddNewTableOneByOneSingleParallelism() throws Exception {
TestParam testParam =
Expand Down Expand Up @@ -228,7 +266,7 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except
List<String> listenTablesFirstRound = testParam.getFirstRoundListenTables();

FlinkSourceProvider sourceProvider =
getFlinkSourceProvider(listenTablesFirstRound, parallelism);
getFlinkSourceProvider(listenTablesFirstRound, parallelism, new HashMap<>());
DataStreamSource<Event> source =
env.fromSource(
sourceProvider.getSource(),
Expand Down Expand Up @@ -272,7 +310,7 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except
getStreamExecutionEnvironment(finishedSavePointPath, parallelism);
List<String> listenTablesSecondRound = testParam.getSecondRoundListenTables();
FlinkSourceProvider restoredSourceProvider =
getFlinkSourceProvider(listenTablesSecondRound, parallelism);
getFlinkSourceProvider(listenTablesSecondRound, parallelism, new HashMap<>());
DataStreamSource<Event> restoreSource =
restoredEnv.fromSource(
restoredSourceProvider.getSource(),
Expand Down Expand Up @@ -432,7 +470,8 @@ private void initialAddressTables(JdbcConnection connection, List<String> addres
}
}

private FlinkSourceProvider getFlinkSourceProvider(List<String> tables, int parallelism) {
private FlinkSourceProvider getFlinkSourceProvider(
List<String> tables, int parallelism, Map<String, String> additionalOptions) {
List<String> fullTableNames =
tables.stream()
.map(table -> customDatabase.getDatabaseName() + "." + table)
Expand All @@ -446,6 +485,7 @@ private FlinkSourceProvider getFlinkSourceProvider(List<String> tables, int para
options.put(TABLES.key(), StringUtils.join(fullTableNames, ","));
options.put(SERVER_ID.key(), getServerId(parallelism));
options.put(SCAN_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
options.putAll(additionalOptions);
Factory.Context context =
new FactoryHelper.DefaultContext(
org.apache.flink.cdc.common.configuration.Configuration.fromMap(options),
Expand Down
Loading