Skip to content

Commit

Permalink
Moved JSON config loading method for TopicConfigProviders to abstract…
Browse files Browse the repository at this point in the history
… parent class

Signed-off-by: Thomas Cooper <[email protected]>
  • Loading branch information
tomncooper committed Jun 3, 2021
1 parent 9881ba0 commit 44727ee
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.config;

import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;

/**
* Abstract implementation of {@link TopicConfigProvider} which provides a method for loading cluster configurations
* from a JSON file.
*/
public abstract class JsonFileTopicConfigProvider implements TopicConfigProvider {

/**
* Method which will find the file path from the supplied config map using the supplied cluster file config key and
* load the configs contained in that JSON file into a {@link java.util.Properties} instance.
*
* The format of the file is JSON, with properties listed as top level key/value pairs:
*
* <pre>
* {
* "min.insync.replicas": 1,
* "an.example.cluster.config": false
* }
* </pre>
*
* @param configs The map of config key/value pairs
* @param clusterConfigsFileKey The key under which the config file path is stored.
* @return A {@link java.util.Properties} instance containing the contents of the specified JSON config file.
*/
protected static Properties loadClusterConfigs(Map<String, ?> configs, String clusterConfigsFileKey) {
String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, clusterConfigsFileKey);
try {
try (JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(configFile), StandardCharsets.UTF_8))) {
Gson gson = new Gson();
return gson.fromJson(reader, Properties.class);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,7 @@

package com.linkedin.kafka.cruisecontrol.config;

import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -41,12 +34,12 @@
* </pre>
*
*/
public class KafkaAdminTopicConfigProvider implements TopicConfigProvider {
public class KafkaAdminTopicConfigProvider extends JsonFileTopicConfigProvider {

private static final Logger LOG = LoggerFactory.getLogger(KafkaAdminTopicConfigProvider.class);

public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file";
private static Properties _clusterConfigs;
private Properties _clusterConfigs;
private AdminClient _adminClient;
private long _adminTimeoutMs;

Expand Down Expand Up @@ -132,33 +125,13 @@ private static Properties convertTopicConfigToProperties(Config config) {
return props;
}

private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException {
JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8));
try {
Gson gson = new Gson();
_clusterConfigs = gson.fromJson(reader, Properties.class);
} finally {
try {
reader.close();
} catch (IOException e) {
// let it go.
}
}
}

@Override
public void configure(Map<String, ?> configs) {

KafkaCruiseControlConfig ccConfig = new KafkaCruiseControlConfig(configs);
_adminTimeoutMs = ccConfig.getConfiguredInstance(ExecutorConfig.ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG, Integer.class);
_adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(ccConfig));

String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE);
try {
loadClusterConfigs(configFile);
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
_clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,12 @@

package com.linkedin.kafka.cruisecontrol.config;

import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import kafka.server.ConfigType;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import scala.collection.JavaConversions;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;

Expand All @@ -31,20 +24,19 @@
* </pre>
*
* This class uses the Zookeeper based admin client that will be removed in Kafka 3.0. Therefore this class has been
* deprecated and will be removed in a future Cruise Control release. A new
* {@link com.linkedin.kafka.cruisecontrol.config.TopicConfigProvider} implementation using the Kafka Admin Client has
* been created ({@link com.linkedin.kafka.cruisecontrol.config.KafkaAdminTopicConfigProvider}) and can be set using the
* deprecated and will be removed in a future Cruise Control release. A new {@link TopicConfigProvider} implementation
* using the Kafka Admin Client has been created ({@link KafkaAdminTopicConfigProvider}) and can be set using the
* {@code topic.config.provider.class} configuration setting.
*
*/
@Deprecated
public class KafkaTopicConfigProvider implements TopicConfigProvider {
public class KafkaTopicConfigProvider extends JsonFileTopicConfigProvider {
public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file";
public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider";
public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs";
private String _connectString;
private boolean _zkSecurityEnabled;
private static Properties _clusterConfigs;
private Properties _clusterConfigs;

@Override
public Properties clusterConfigs() {
Expand Down Expand Up @@ -79,30 +71,11 @@ public Map<String, Properties> allTopicConfigs() {
}
}

private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException {
JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8));
try {
Gson gson = new Gson();
_clusterConfigs = gson.fromJson(reader, Properties.class);
} finally {
try {
reader.close();
} catch (IOException e) {
// let it go.
}
}
}

@Override
public void configure(Map<String, ?> configs) {
_connectString = (String) configs.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG);
_zkSecurityEnabled = (Boolean) configs.get(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG);
String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE);
try {
loadClusterConfigs(configFile);
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
_clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE);
}

@Override
Expand Down

0 comments on commit 44727ee

Please sign in to comment.