From dbe6a3f5ac710f4030ff869250d6826593e69bc3 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Tue, 5 Dec 2023 15:52:44 +0800 Subject: [PATCH] [pipeline-connector][mysql] Enable send schema change by default --- .../cdc/connectors/mysql/source/MySqlDataSourceOptions.java | 4 ++-- .../cdc/connectors/mysql/source/MySqlPipelineITCase.java | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 3e33e23db7..018e033b64 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -227,7 +227,7 @@ public class MySqlDataSourceOptions { public static final ConfigOption SCHEMA_CHANGE_ENABLED = ConfigOptions.key("schema-change.enabled") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription( - "Whether send schema change events, by default is false. If set to false, the schema changes will not be sent."); + "Whether send schema change events, by default is true. If set to false, the schema changes will not be sent."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java index 27689d7b2e..f01fa6ec3a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlPipelineITCase.java @@ -60,6 +60,7 @@ import java.util.List; import java.util.stream.Stream; +import static com.ververica.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED; import static com.ververica.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD; import static com.ververica.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER; import static com.ververica.cdc.connectors.mysql.testutils.MySqSourceTestUtils.fetchResults; @@ -114,7 +115,7 @@ public void testInitialStartupMode() throws Exception { .startupOptions(StartupOptions.initial()) .serverId(getServerId(env.getParallelism())) .serverTimeZone("UTC") - .includeSchemaChanges(true); + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); FlinkSourceProvider sourceProvider = (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider(); @@ -242,7 +243,7 @@ public void testParseAlterStatement() throws Exception { .startupOptions(StartupOptions.latest()) .serverId(getServerId(env.getParallelism())) .serverTimeZone("UTC") - .includeSchemaChanges(true); + .includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue()); FlinkSourceProvider sourceProvider = (FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();