diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java new file mode 100644 index 0000000000..a7cdf868a7 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java @@ -0,0 +1,53 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.config; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Properties; + +/** + * Abstract implementation of {@link TopicConfigProvider} which provides a method for loading cluster configurations + * from a JSON file. + */ +public abstract class JsonFileTopicConfigProvider implements TopicConfigProvider { + + /** + * Method which will find the file path from the supplied config map using the supplied cluster file config key and + * load the configs contained in that JSON file into a {@link java.util.Properties} instance. + * + * The format of the file is JSON, with properties listed as top level key/value pairs: + * + *
+     *   {
+     *     "min.insync.replicas": 1,
+     *     "an.example.cluster.config": false
+     *   }
+     * 
+ * + * @param configs The map of config key/value pairs + * @param clusterConfigsFileKey The key under which the config file path is stored. + * @return A {@link java.util.Properties} instance containing the contents of the specified JSON config file. + */ + protected static Properties loadClusterConfigs(Map configs, String clusterConfigsFileKey) { + String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, clusterConfigsFileKey); + try { + try (JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(configFile), StandardCharsets.UTF_8))) { + Gson gson = new Gson(); + return gson.fromJson(reader, Properties.class); + } + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java index ababed13ab..47d60eaf1c 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -4,14 +4,7 @@ package com.linkedin.kafka.cruisecontrol.config; -import com.google.gson.Gson; -import com.google.gson.stream.JsonReader; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -41,12 +34,12 @@ * * */ -public class KafkaAdminTopicConfigProvider implements TopicConfigProvider { +public class KafkaAdminTopicConfigProvider extends JsonFileTopicConfigProvider { private static final Logger LOG = LoggerFactory.getLogger(KafkaAdminTopicConfigProvider.class); public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; - private static Properties _clusterConfigs; + private Properties _clusterConfigs; private AdminClient _adminClient; private long _adminTimeoutMs; @@ -132,33 +125,13 @@ private static Properties convertTopicConfigToProperties(Config config) { return props; } - private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException { - JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8)); - try { - Gson gson = new Gson(); - _clusterConfigs = gson.fromJson(reader, Properties.class); - } finally { - try { - reader.close(); - } catch (IOException e) { - // let it go. - } - } - } @Override public void configure(Map configs) { - KafkaCruiseControlConfig ccConfig = new KafkaCruiseControlConfig(configs); _adminTimeoutMs = ccConfig.getConfiguredInstance(ExecutorConfig.ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG, Integer.class); _adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(ccConfig)); - - String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE); - try { - loadClusterConfigs(configFile); - } catch (FileNotFoundException e) { - throw new IllegalArgumentException(e); - } + _clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE); } @Override diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java index 3b0d46e889..305254884c 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java @@ -4,19 +4,12 @@ package com.linkedin.kafka.cruisecontrol.config; -import com.google.gson.Gson; -import com.google.gson.stream.JsonReader; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; import kafka.server.ConfigType; import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; import scala.collection.JavaConversions; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Properties; @@ -31,20 +24,19 @@ * * * This class uses the Zookeeper based admin client that will be removed in Kafka 3.0. Therefore this class has been - * deprecated and will be removed in a future Cruise Control release. A new - * {@link com.linkedin.kafka.cruisecontrol.config.TopicConfigProvider} implementation using the Kafka Admin Client has - * been created ({@link com.linkedin.kafka.cruisecontrol.config.KafkaAdminTopicConfigProvider}) and can be set using the + * deprecated and will be removed in a future Cruise Control release. A new {@link TopicConfigProvider} implementation + * using the Kafka Admin Client has been created ({@link KafkaAdminTopicConfigProvider}) and can be set using the * {@code topic.config.provider.class} configuration setting. * */ @Deprecated -public class KafkaTopicConfigProvider implements TopicConfigProvider { +public class KafkaTopicConfigProvider extends JsonFileTopicConfigProvider { public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider"; public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs"; private String _connectString; private boolean _zkSecurityEnabled; - private static Properties _clusterConfigs; + private Properties _clusterConfigs; @Override public Properties clusterConfigs() { @@ -79,30 +71,11 @@ public Map allTopicConfigs() { } } - private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException { - JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8)); - try { - Gson gson = new Gson(); - _clusterConfigs = gson.fromJson(reader, Properties.class); - } finally { - try { - reader.close(); - } catch (IOException e) { - // let it go. - } - } - } - @Override public void configure(Map configs) { _connectString = (String) configs.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG); _zkSecurityEnabled = (Boolean) configs.get(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG); - String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE); - try { - loadClusterConfigs(configFile); - } catch (FileNotFoundException e) { - throw new IllegalArgumentException(e); - } + _clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE); } @Override