diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/OptionUtils.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/OptionUtils.java index 094fa72fb5d..73ade369a98 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/OptionUtils.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/OptionUtils.java @@ -17,33 +17,25 @@ package com.ververica.cdc.connectors.base.utils; import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.ConfigurationUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.List; +import java.util.Map; /** A utility class to print configuration of connectors. */ public class OptionUtils { private static final Logger LOG = LoggerFactory.getLogger(OptionUtils.class); - private static final List SENSITIVE_OPTIONS = Arrays.asList("password"); - - private static final String SHADE = "**********"; - /** Utility class can not be instantiated. */ private OptionUtils() {} - public static void printOptions(ReadableConfig config, ConfigOption... options) { + public static void printOptions(Map config, ConfigOption... options) { + Map hideMap = ConfigurationUtils.hideSensitiveValues(config); for (ConfigOption option : options) { - if (SENSITIVE_OPTIONS.contains(option.key())) { - LOG.info("{} = {}", option.key(), SHADE); - } else { - LOG.info("{} = {}", option.key(), config.get(option)); - } + LOG.info("{} = {}", option.key(), hideMap.get(option.key())); } } } diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/utils/OptionUtilsTest.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/utils/OptionUtilsTest.java index 834038a1e24..ea99fa716c6 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/utils/OptionUtilsTest.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/utils/OptionUtilsTest.java @@ -18,8 +18,8 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.util.HashMap; @@ -34,10 +34,9 @@ public void testOptionsPrint() { options.put("host", "localhost:3306"); options.put("user", "tyrantlucifer"); options.put("password", "tyrantlucifer"); - Configuration configuration = Configuration.fromMap(options); ConfigOption host = ConfigOptions.key("host").stringType().noDefaultValue(); ConfigOption user = ConfigOptions.key("user").stringType().noDefaultValue(); ConfigOption password = ConfigOptions.key("password").stringType().noDefaultValue(); - OptionUtils.printOptions(configuration, host, user, password); + OptionUtils.printOptions(options, host, user, password); } } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java index 20388f06756..0c5cf3fe25a 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java @@ -17,6 +17,7 @@ package com.ververica.cdc.connectors.mongodb.table; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.ResolvedSchema; @@ -99,7 +100,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { checkPrimaryKey(physicalSchema.getPrimaryKey().get(), "Primary key must be _id field"); OptionUtils.printOptions( - config, + ((Configuration) config).toMap(), HOSTS, USERNAME, PASSWORD, diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index fbf37000ced..8d8cefc862b 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -17,6 +17,7 @@ package com.ververica.cdc.connectors.mysql.table; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ResolvedSchema; @@ -124,7 +125,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { } OptionUtils.printOptions( - config, + ((Configuration) config).toMap(), HOSTNAME, PORT, USERNAME, diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java index 100c99c1786..159e79e2711 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java @@ -18,6 +18,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -190,7 +191,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { String workingMode = config.get(WORKING_MODE); OptionUtils.printOptions( - config, + ((Configuration) config).toMap(), SCAN_STARTUP_MODE, USERNAME, PASSWORD, diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java index 7ec36feb2c1..42a0f707bee 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSourceFactory.java @@ -17,6 +17,7 @@ package com.ververica.cdc.connectors.oracle.table; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ResolvedSchema; @@ -104,7 +105,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { } OptionUtils.printOptions( - config, + ((Configuration) config).toMap(), HOSTNAME, USERNAME, PASSWORD, diff --git a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java index 2187a007eb1..98a3040f639 100644 --- a/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java +++ b/flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/table/PostgreSQLTableFactory.java @@ -18,6 +18,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -138,7 +139,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c } OptionUtils.printOptions( - config, + ((Configuration) config).toMap(), HOSTNAME, PORT, USERNAME, diff --git a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java index 1b81a27ee21..12d41318e6c 100644 --- a/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java +++ b/flink-connector-sqlserver-cdc/src/main/java/com/ververica/cdc/connectors/sqlserver/table/SqlServerTableFactory.java @@ -18,6 +18,7 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ResolvedSchema; @@ -118,7 +119,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { getPhysicalSchema(context.getCatalogTable().getResolvedSchema()); OptionUtils.printOptions( - config, + ((Configuration) config).toMap(), HOSTNAME, PORT, USERNAME, diff --git a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactory.java b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactory.java index 0733f911049..e1dfb0a76a2 100644 --- a/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactory.java +++ b/flink-connector-tidb-cdc/src/main/java/com/ververica/cdc/connectors/tidb/table/TiDBTableSourceFactory.java @@ -17,6 +17,7 @@ package com.ververica.cdc.connectors.tidb.table; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ResolvedSchema; @@ -60,7 +61,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { getPhysicalSchema(context.getCatalogTable().getResolvedSchema()); OptionUtils.printOptions( - config, + ((Configuration) config).toMap(), DATABASE_NAME, TABLE_NAME, SCAN_STARTUP_MODE,