From 64fa381f0ba0a8175305449dd050a777a7863900 Mon Sep 17 00:00:00 2001 From: Thomas Cooper Date: Wed, 19 May 2021 10:39:00 +0100 Subject: [PATCH 01/10] Created TopicConfigProvider using Kafka Admin Client Signed-off-by: Thomas Cooper --- .../KafkaCruiseControlUtils.java | 1 + .../config/KafkaTopicConfigProvider.java | 116 +++++++++++++----- .../config/KafkaZkTopicConfigProvider.java | 105 ++++++++++++++++ .../KafkaCruiseControlUnitTestUtils.java | 2 +- 4 files changed, 195 insertions(+), 29 deletions(-) create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaZkTopicConfigProvider.java diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java index 04406c857d..c4034415e5 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java @@ -90,6 +90,7 @@ public final class KafkaCruiseControlUtils { public static final int ZK_CONNECTION_TIMEOUT = (int) TimeUnit.MINUTES.toMillis(2); public static final long KAFKA_ZK_CLIENT_CLOSE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); public static final long ADMIN_CLIENT_CLOSE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); + public static final long ADMIN_CLIENT_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); public static final int SEC_TO_MS = (int) TimeUnit.SECONDS.toMillis(1); private static final int MIN_TO_MS = SEC_TO_MS * 60; private static final int HOUR_TO_MS = MIN_TO_MS * 60; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java index 81ea4c6596..56baac932e 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java @@ -7,18 +7,26 @@ import com.google.gson.Gson; import com.google.gson.stream.JsonReader; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; -import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Properties; -import kafka.server.ConfigType; -import kafka.zk.AdminZkClient; -import kafka.zk.KafkaZkClient; -import scala.collection.JavaConversions; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.config.ConfigResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -32,44 +40,94 @@ * */ public class KafkaTopicConfigProvider implements TopicConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConfigProvider.class); + public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; - public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider"; - public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs"; - private String _connectString; - private boolean _zkSecurityEnabled; private static Properties _clusterConfigs; + private AdminClient _adminClient; @Override + //TODO: This is just returning a parsed file? Why not the cluster config from the actual cluster? public Properties clusterConfigs() { return _clusterConfigs; } @Override public Properties topicConfigs(String topic) { - KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, - _zkSecurityEnabled); + Config topicConfig = null; + ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); try { - AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); - return adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); - } finally { - KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); + LOG.debug("Requesting details for topic '{}'", topic); + topicConfig = _adminClient + .describeConfigs(Collections.singletonList(topicResource)) + .all() + .get(KafkaCruiseControlUtils.ADMIN_CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .get(topicResource); + } catch (InterruptedException e) { + LOG.error("The request for the configuration of topic '{}' was interrupted", topic); + e.printStackTrace(); + } catch (ExecutionException e) { + LOG.error("The request for the configuration of topic '{}' failed", topic); + e.printStackTrace(); + } catch (TimeoutException e) { + LOG.error("The request for the configuration of topic '{}' timed out", topic); + e.printStackTrace(); + } + + if (topicConfig != null) { + return convertTopicConfigToProperties(topicConfig); + } else { + LOG.error("The configuration for topic '{}' could not be retrieved", topic); + return new Properties(); } } @Override public Map allTopicConfigs() { - KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, - _zkSecurityEnabled); + Map topicConfigs = null; try { - AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); - return JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs()); - } finally { - KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); + LOG.debug("Requesting configurations for all topics"); + topicConfigs = _adminClient + .listTopics() + .names() + .thenApply( + topicNameSet -> _adminClient.describeConfigs( + topicNameSet.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList()) + ).all() + ) + .get(KafkaCruiseControlUtils.ADMIN_CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .get(KafkaCruiseControlUtils.ADMIN_CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.error("The request for the configuration of all topics was interrupted"); + e.printStackTrace(); + } catch (ExecutionException e) { + LOG.error("The request for the configuration of all topics failed"); + e.printStackTrace(); + } catch (TimeoutException e) { + LOG.error("The request for the configuration of all topics timed out"); + e.printStackTrace(); + } + + Map propsMap = new HashMap<>(); + if (topicConfigs != null) { + LOG.debug("Converting {} Topic Configs into Properties", topicConfigs.size()); + for (Map.Entry entry : topicConfigs.entrySet()) { + propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(entry.getValue())); + } + LOG.debug("Topic Config conversion complete"); + } else { + LOG.error("Topic configurations for all topics on the cluster could not be retrieved"); } + return propsMap; + } + + private static Properties convertTopicConfigToProperties(Config config) { + Properties props = new Properties(); + for (ConfigEntry entry : config.entries()) { + props.put(entry.name(), entry.value()); + } + return props; } private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException { @@ -88,8 +146,10 @@ private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundEx @Override public void configure(Map configs) { - _connectString = (String) configs.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG); - _zkSecurityEnabled = (Boolean) configs.get(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG); + + KafkaCruiseControlConfig ccConfig = new KafkaCruiseControlConfig(configs); + _adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(ccConfig)); + String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE); try { loadClusterConfigs(configFile); @@ -100,6 +160,6 @@ public void configure(Map configs) { @Override public void close() { - // nothing to do. + _adminClient.close(Duration.ofMillis(KafkaCruiseControlUtils.ADMIN_CLIENT_CLOSE_TIMEOUT_MS)); } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaZkTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaZkTopicConfigProvider.java new file mode 100644 index 0000000000..213b53d810 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaZkTopicConfigProvider.java @@ -0,0 +1,105 @@ +/* + * Copyright 2018 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.config; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; +import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; +import kafka.server.ConfigType; +import kafka.zk.AdminZkClient; +import kafka.zk.KafkaZkClient; +import scala.collection.JavaConversions; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Properties; + + +/** + * The Kafka topic config provider implementation based on files. The format of the file is JSON, listing properties: + *
+ *   {
+ *     "min.insync.replicas": 1,
+ *     "an.example.cluster.config": false
+ *   }
+ * 
+ * + */ +public class KafkaZkTopicConfigProvider implements TopicConfigProvider { + public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; + public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider"; + public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs"; + private String _connectString; + private boolean _zkSecurityEnabled; + private static Properties _clusterConfigs; + + @Override + public Properties clusterConfigs() { + return _clusterConfigs; + } + + @Override + public Properties topicConfigs(String topic) { + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, + _zkSecurityEnabled); + try { + AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); + return adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); + } finally { + KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); + } + } + + @Override + public Map allTopicConfigs() { + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, + _zkSecurityEnabled); + try { + AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); + return JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs()); + } finally { + KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); + } + } + + private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException { + JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8)); + try { + Gson gson = new Gson(); + _clusterConfigs = gson.fromJson(reader, Properties.class); + } finally { + try { + reader.close(); + } catch (IOException e) { + // let it go. + } + } + } + + @Override + public void configure(Map configs) { + _connectString = (String) configs.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG); + _zkSecurityEnabled = (Boolean) configs.get(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG); + String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE); + try { + loadClusterConfigs(configFile); + } catch (FileNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public void close() { + // nothing to do. + } +} diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUnitTestUtils.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUnitTestUtils.java index 69e28b0c8e..21e5ba9cae 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUnitTestUtils.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUnitTestUtils.java @@ -50,7 +50,7 @@ public static Properties getKafkaCruiseControlProperties() { String clusterConfigsFile = Objects.requireNonNull(KafkaCruiseControlUnitTestUtils.class.getClassLoader().getResource( TestConstants.DEFAULT_CLUSTER_CONFIGS_FILE)).getFile(); props.setProperty(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2121"); - props.setProperty(MonitorConfig.BOOTSTRAP_SERVERS_CONFIG, "aaa"); + props.setProperty(MonitorConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.setProperty(MonitorConfig.METRIC_SAMPLER_CLASS_CONFIG, NoopSampler.class.getName()); props.setProperty(BrokerCapacityConfigFileResolver.CAPACITY_CONFIG_FILE, capacityConfigFile); props.setProperty(KafkaTopicConfigProvider.CLUSTER_CONFIGS_FILE, clusterConfigsFile); From 596b60f218aeabe6e801cbe8959c6c2334c5a590 Mon Sep 17 00:00:00 2001 From: Thomas Cooper Date: Mon, 24 May 2021 17:47:54 +0100 Subject: [PATCH 02/10] Addressed reviewer comments * Changed ZK based provider to use old class name and gave new Kafka Admin provider new name * Added deprecated annotation to old ZK provider and additional javadoc explaining the deprecation. * Switched to using the configured admin client timeout from the main config class * Fixed Copy Right date --- .../KafkaCruiseControlUtils.java | 1 - .../config/KafkaAdminTopicConfigProvider.java | 168 ++++++++++++++++++ .../config/KafkaTopicConfigProvider.java | 123 ++++--------- .../config/KafkaZkTopicConfigProvider.java | 105 ----------- 4 files changed, 203 insertions(+), 194 deletions(-) create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java delete mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaZkTopicConfigProvider.java diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java index c4034415e5..04406c857d 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUtils.java @@ -90,7 +90,6 @@ public final class KafkaCruiseControlUtils { public static final int ZK_CONNECTION_TIMEOUT = (int) TimeUnit.MINUTES.toMillis(2); public static final long KAFKA_ZK_CLIENT_CLOSE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); public static final long ADMIN_CLIENT_CLOSE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); - public static final long ADMIN_CLIENT_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10); public static final int SEC_TO_MS = (int) TimeUnit.SECONDS.toMillis(1); private static final int MIN_TO_MS = SEC_TO_MS * 60; private static final int HOUR_TO_MS = MIN_TO_MS * 60; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java new file mode 100644 index 0000000000..ababed13ab --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -0,0 +1,168 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.config; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.config.ConfigResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The Kafka topic config provider implementation based on using the Kafka Admin Client for topic level configurations + * and files for cluster level configurations. The format of the file is JSON, listing properties: + *
+ *   {
+ *     "min.insync.replicas": 1,
+ *     "an.example.cluster.config": false
+ *   }
+ * 
+ * + */ +public class KafkaAdminTopicConfigProvider implements TopicConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaAdminTopicConfigProvider.class); + + public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; + private static Properties _clusterConfigs; + private AdminClient _adminClient; + private long _adminTimeoutMs; + + @Override + public Properties clusterConfigs() { + return _clusterConfigs; + } + + @Override + public Properties topicConfigs(String topic) { + Config topicConfig = null; + ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); + try { + LOG.debug("Requesting details for topic '{}'", topic); + topicConfig = _adminClient + .describeConfigs(Collections.singletonList(topicResource)) + .all() + .get(_adminTimeoutMs, TimeUnit.MILLISECONDS) + .get(topicResource); + } catch (InterruptedException e) { + LOG.error("The request for the configuration of topic '{}' was interrupted", topic); + e.printStackTrace(); + } catch (ExecutionException e) { + LOG.error("The request for the configuration of topic '{}' failed", topic); + e.printStackTrace(); + } catch (TimeoutException e) { + LOG.error("The request for the configuration of topic '{}' timed out", topic); + e.printStackTrace(); + } + + if (topicConfig != null) { + return convertTopicConfigToProperties(topicConfig); + } else { + LOG.error("The configuration for topic '{}' could not be retrieved", topic); + return new Properties(); + } + } + + @Override + public Map allTopicConfigs() { + Map topicConfigs = null; + try { + LOG.debug("Requesting configurations for all topics"); + topicConfigs = _adminClient + .listTopics() + .names() + .thenApply( + topicNameSet -> _adminClient.describeConfigs( + topicNameSet.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList()) + ).all() + ) + .get(_adminTimeoutMs, TimeUnit.MILLISECONDS) + .get(_adminTimeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.error("The request for the configuration of all topics was interrupted"); + e.printStackTrace(); + } catch (ExecutionException e) { + LOG.error("The request for the configuration of all topics failed"); + e.printStackTrace(); + } catch (TimeoutException e) { + LOG.error("The request for the configuration of all topics timed out"); + e.printStackTrace(); + } + + Map propsMap = new HashMap<>(); + if (topicConfigs != null) { + LOG.debug("Converting {} Topic Configs into Properties", topicConfigs.size()); + for (Map.Entry entry : topicConfigs.entrySet()) { + propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(entry.getValue())); + } + LOG.debug("Topic Config conversion complete"); + } else { + LOG.error("Topic configurations for all topics on the cluster could not be retrieved"); + } + return propsMap; + } + + private static Properties convertTopicConfigToProperties(Config config) { + Properties props = new Properties(); + for (ConfigEntry entry : config.entries()) { + props.put(entry.name(), entry.value()); + } + return props; + } + + private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException { + JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8)); + try { + Gson gson = new Gson(); + _clusterConfigs = gson.fromJson(reader, Properties.class); + } finally { + try { + reader.close(); + } catch (IOException e) { + // let it go. + } + } + } + + @Override + public void configure(Map configs) { + + KafkaCruiseControlConfig ccConfig = new KafkaCruiseControlConfig(configs); + _adminTimeoutMs = ccConfig.getConfiguredInstance(ExecutorConfig.ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG, Integer.class); + _adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(ccConfig)); + + String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE); + try { + loadClusterConfigs(configFile); + } catch (FileNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public void close() { + _adminClient.close(Duration.ofMillis(KafkaCruiseControlUtils.ADMIN_CLIENT_CLOSE_TIMEOUT_MS)); + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java index 56baac932e..3b0d46e889 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java @@ -7,26 +7,18 @@ import com.google.gson.Gson; import com.google.gson.stream.JsonReader; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; +import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; +import kafka.server.ConfigType; +import kafka.zk.AdminZkClient; +import kafka.zk.KafkaZkClient; +import scala.collection.JavaConversions; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.Config; -import org.apache.kafka.clients.admin.ConfigEntry; -import org.apache.kafka.common.config.ConfigResource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** @@ -38,96 +30,53 @@ * } * * + * This class uses the Zookeeper based admin client that will be removed in Kafka 3.0. Therefore this class has been + * deprecated and will be removed in a future Cruise Control release. A new + * {@link com.linkedin.kafka.cruisecontrol.config.TopicConfigProvider} implementation using the Kafka Admin Client has + * been created ({@link com.linkedin.kafka.cruisecontrol.config.KafkaAdminTopicConfigProvider}) and can be set using the + * {@code topic.config.provider.class} configuration setting. + * */ +@Deprecated public class KafkaTopicConfigProvider implements TopicConfigProvider { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConfigProvider.class); - public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; + public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider"; + public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs"; + private String _connectString; + private boolean _zkSecurityEnabled; private static Properties _clusterConfigs; - private AdminClient _adminClient; @Override - //TODO: This is just returning a parsed file? Why not the cluster config from the actual cluster? public Properties clusterConfigs() { return _clusterConfigs; } @Override public Properties topicConfigs(String topic) { - Config topicConfig = null; - ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, + _zkSecurityEnabled); try { - LOG.debug("Requesting details for topic '{}'", topic); - topicConfig = _adminClient - .describeConfigs(Collections.singletonList(topicResource)) - .all() - .get(KafkaCruiseControlUtils.ADMIN_CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) - .get(topicResource); - } catch (InterruptedException e) { - LOG.error("The request for the configuration of topic '{}' was interrupted", topic); - e.printStackTrace(); - } catch (ExecutionException e) { - LOG.error("The request for the configuration of topic '{}' failed", topic); - e.printStackTrace(); - } catch (TimeoutException e) { - LOG.error("The request for the configuration of topic '{}' timed out", topic); - e.printStackTrace(); - } - - if (topicConfig != null) { - return convertTopicConfigToProperties(topicConfig); - } else { - LOG.error("The configuration for topic '{}' could not be retrieved", topic); - return new Properties(); + AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); + return adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); + } finally { + KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); } } @Override public Map allTopicConfigs() { - Map topicConfigs = null; + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, + _zkSecurityEnabled); try { - LOG.debug("Requesting configurations for all topics"); - topicConfigs = _adminClient - .listTopics() - .names() - .thenApply( - topicNameSet -> _adminClient.describeConfigs( - topicNameSet.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList()) - ).all() - ) - .get(KafkaCruiseControlUtils.ADMIN_CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS) - .get(KafkaCruiseControlUtils.ADMIN_CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.error("The request for the configuration of all topics was interrupted"); - e.printStackTrace(); - } catch (ExecutionException e) { - LOG.error("The request for the configuration of all topics failed"); - e.printStackTrace(); - } catch (TimeoutException e) { - LOG.error("The request for the configuration of all topics timed out"); - e.printStackTrace(); - } - - Map propsMap = new HashMap<>(); - if (topicConfigs != null) { - LOG.debug("Converting {} Topic Configs into Properties", topicConfigs.size()); - for (Map.Entry entry : topicConfigs.entrySet()) { - propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(entry.getValue())); - } - LOG.debug("Topic Config conversion complete"); - } else { - LOG.error("Topic configurations for all topics on the cluster could not be retrieved"); - } - return propsMap; - } - - private static Properties convertTopicConfigToProperties(Config config) { - Properties props = new Properties(); - for (ConfigEntry entry : config.entries()) { - props.put(entry.name(), entry.value()); + AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); + return JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs()); + } finally { + KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); } - return props; } private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException { @@ -146,10 +95,8 @@ private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundEx @Override public void configure(Map configs) { - - KafkaCruiseControlConfig ccConfig = new KafkaCruiseControlConfig(configs); - _adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(ccConfig)); - + _connectString = (String) configs.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG); + _zkSecurityEnabled = (Boolean) configs.get(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG); String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE); try { loadClusterConfigs(configFile); @@ -160,6 +107,6 @@ public void configure(Map configs) { @Override public void close() { - _adminClient.close(Duration.ofMillis(KafkaCruiseControlUtils.ADMIN_CLIENT_CLOSE_TIMEOUT_MS)); + // nothing to do. } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaZkTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaZkTopicConfigProvider.java deleted file mode 100644 index 213b53d810..0000000000 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaZkTopicConfigProvider.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright 2018 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. - */ - -package com.linkedin.kafka.cruisecontrol.config; - -import com.google.gson.Gson; -import com.google.gson.stream.JsonReader; -import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; -import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; -import kafka.server.ConfigType; -import kafka.zk.AdminZkClient; -import kafka.zk.KafkaZkClient; -import scala.collection.JavaConversions; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.Properties; - - -/** - * The Kafka topic config provider implementation based on files. The format of the file is JSON, listing properties: - *
- *   {
- *     "min.insync.replicas": 1,
- *     "an.example.cluster.config": false
- *   }
- * 
- * - */ -public class KafkaZkTopicConfigProvider implements TopicConfigProvider { - public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; - public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider"; - public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs"; - private String _connectString; - private boolean _zkSecurityEnabled; - private static Properties _clusterConfigs; - - @Override - public Properties clusterConfigs() { - return _clusterConfigs; - } - - @Override - public Properties topicConfigs(String topic) { - KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, - _zkSecurityEnabled); - try { - AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); - return adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); - } finally { - KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); - } - } - - @Override - public Map allTopicConfigs() { - KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, - _zkSecurityEnabled); - try { - AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); - return JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs()); - } finally { - KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); - } - } - - private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException { - JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8)); - try { - Gson gson = new Gson(); - _clusterConfigs = gson.fromJson(reader, Properties.class); - } finally { - try { - reader.close(); - } catch (IOException e) { - // let it go. - } - } - } - - @Override - public void configure(Map configs) { - _connectString = (String) configs.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG); - _zkSecurityEnabled = (Boolean) configs.get(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG); - String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE); - try { - loadClusterConfigs(configFile); - } catch (FileNotFoundException e) { - throw new IllegalArgumentException(e); - } - } - - @Override - public void close() { - // nothing to do. - } -} From fde2168aaca2301ce6600498e50d2309ebc13733 Mon Sep 17 00:00:00 2001 From: Thomas Cooper Date: Thu, 3 Jun 2021 17:31:30 +0100 Subject: [PATCH 03/10] Moved JSON config loading method for TopicConfigProviders to abstract parent class Signed-off-by: Thomas Cooper --- .../config/JsonFileTopicConfigProvider.java | 53 +++++++++++++++++++ .../config/KafkaAdminTopicConfigProvider.java | 33 ++---------- .../config/KafkaTopicConfigProvider.java | 37 ++----------- 3 files changed, 61 insertions(+), 62 deletions(-) create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java new file mode 100644 index 0000000000..a7cdf868a7 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java @@ -0,0 +1,53 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.config; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Properties; + +/** + * Abstract implementation of {@link TopicConfigProvider} which provides a method for loading cluster configurations + * from a JSON file. + */ +public abstract class JsonFileTopicConfigProvider implements TopicConfigProvider { + + /** + * Method which will find the file path from the supplied config map using the supplied cluster file config key and + * load the configs contained in that JSON file into a {@link java.util.Properties} instance. + * + * The format of the file is JSON, with properties listed as top level key/value pairs: + * + *
+     *   {
+     *     "min.insync.replicas": 1,
+     *     "an.example.cluster.config": false
+     *   }
+     * 
+ * + * @param configs The map of config key/value pairs + * @param clusterConfigsFileKey The key under which the config file path is stored. + * @return A {@link java.util.Properties} instance containing the contents of the specified JSON config file. + */ + protected static Properties loadClusterConfigs(Map configs, String clusterConfigsFileKey) { + String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, clusterConfigsFileKey); + try { + try (JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(configFile), StandardCharsets.UTF_8))) { + Gson gson = new Gson(); + return gson.fromJson(reader, Properties.class); + } + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java index ababed13ab..47d60eaf1c 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -4,14 +4,7 @@ package com.linkedin.kafka.cruisecontrol.config; -import com.google.gson.Gson; -import com.google.gson.stream.JsonReader; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -41,12 +34,12 @@ * * */ -public class KafkaAdminTopicConfigProvider implements TopicConfigProvider { +public class KafkaAdminTopicConfigProvider extends JsonFileTopicConfigProvider { private static final Logger LOG = LoggerFactory.getLogger(KafkaAdminTopicConfigProvider.class); public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; - private static Properties _clusterConfigs; + private Properties _clusterConfigs; private AdminClient _adminClient; private long _adminTimeoutMs; @@ -132,33 +125,13 @@ private static Properties convertTopicConfigToProperties(Config config) { return props; } - private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException { - JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8)); - try { - Gson gson = new Gson(); - _clusterConfigs = gson.fromJson(reader, Properties.class); - } finally { - try { - reader.close(); - } catch (IOException e) { - // let it go. - } - } - } @Override public void configure(Map configs) { - KafkaCruiseControlConfig ccConfig = new KafkaCruiseControlConfig(configs); _adminTimeoutMs = ccConfig.getConfiguredInstance(ExecutorConfig.ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG, Integer.class); _adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(ccConfig)); - - String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE); - try { - loadClusterConfigs(configFile); - } catch (FileNotFoundException e) { - throw new IllegalArgumentException(e); - } + _clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE); } @Override diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java index 3b0d46e889..305254884c 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java @@ -4,19 +4,12 @@ package com.linkedin.kafka.cruisecontrol.config; -import com.google.gson.Gson; -import com.google.gson.stream.JsonReader; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; import kafka.server.ConfigType; import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; import scala.collection.JavaConversions; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.Properties; @@ -31,20 +24,19 @@ * * * This class uses the Zookeeper based admin client that will be removed in Kafka 3.0. Therefore this class has been - * deprecated and will be removed in a future Cruise Control release. A new - * {@link com.linkedin.kafka.cruisecontrol.config.TopicConfigProvider} implementation using the Kafka Admin Client has - * been created ({@link com.linkedin.kafka.cruisecontrol.config.KafkaAdminTopicConfigProvider}) and can be set using the + * deprecated and will be removed in a future Cruise Control release. A new {@link TopicConfigProvider} implementation + * using the Kafka Admin Client has been created ({@link KafkaAdminTopicConfigProvider}) and can be set using the * {@code topic.config.provider.class} configuration setting. * */ @Deprecated -public class KafkaTopicConfigProvider implements TopicConfigProvider { +public class KafkaTopicConfigProvider extends JsonFileTopicConfigProvider { public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider"; public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs"; private String _connectString; private boolean _zkSecurityEnabled; - private static Properties _clusterConfigs; + private Properties _clusterConfigs; @Override public Properties clusterConfigs() { @@ -79,30 +71,11 @@ public Map allTopicConfigs() { } } - private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException { - JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8)); - try { - Gson gson = new Gson(); - _clusterConfigs = gson.fromJson(reader, Properties.class); - } finally { - try { - reader.close(); - } catch (IOException e) { - // let it go. - } - } - } - @Override public void configure(Map configs) { _connectString = (String) configs.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG); _zkSecurityEnabled = (Boolean) configs.get(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG); - String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE); - try { - loadClusterConfigs(configFile); - } catch (FileNotFoundException e) { - throw new IllegalArgumentException(e); - } + _clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE); } @Override From d7d093b65bfb405420fca352f3a086e754251aa7 Mon Sep 17 00:00:00 2001 From: Thomas Cooper Date: Thu, 3 Jun 2021 18:54:28 +0100 Subject: [PATCH 04/10] Switched KafkaAdminTopicConfigProvider to use provided Admin Client instance from LoadMonitor Signed-off-by: Thomas Cooper --- .../config/KafkaAdminTopicConfigProvider.java | 29 +++++++++---------- .../cruisecontrol/monitor/LoadMonitor.java | 8 +++-- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java index 47d60eaf1c..801f3a590a 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -4,8 +4,7 @@ package com.linkedin.kafka.cruisecontrol.config; -import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; -import java.time.Duration; +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -15,6 +14,8 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; +import com.linkedin.kafka.cruisecontrol.model.Load; +import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; @@ -22,6 +23,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull; +import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG; + /** * The Kafka topic config provider implementation based on using the Kafka Admin Client for topic level configurations @@ -41,7 +45,6 @@ public class KafkaAdminTopicConfigProvider extends JsonFileTopicConfigProvider { public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; private Properties _clusterConfigs; private AdminClient _adminClient; - private long _adminTimeoutMs; @Override public Properties clusterConfigs() { @@ -57,7 +60,7 @@ public Properties topicConfigs(String topic) { topicConfig = _adminClient .describeConfigs(Collections.singletonList(topicResource)) .all() - .get(_adminTimeoutMs, TimeUnit.MILLISECONDS) + .get() .get(topicResource); } catch (InterruptedException e) { LOG.error("The request for the configuration of topic '{}' was interrupted", topic); @@ -65,9 +68,6 @@ public Properties topicConfigs(String topic) { } catch (ExecutionException e) { LOG.error("The request for the configuration of topic '{}' failed", topic); e.printStackTrace(); - } catch (TimeoutException e) { - LOG.error("The request for the configuration of topic '{}' timed out", topic); - e.printStackTrace(); } if (topicConfig != null) { @@ -91,17 +91,14 @@ public Map allTopicConfigs() { topicNameSet.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList()) ).all() ) - .get(_adminTimeoutMs, TimeUnit.MILLISECONDS) - .get(_adminTimeoutMs, TimeUnit.MILLISECONDS); + .get() + .get(); } catch (InterruptedException e) { LOG.error("The request for the configuration of all topics was interrupted"); e.printStackTrace(); } catch (ExecutionException e) { LOG.error("The request for the configuration of all topics failed"); e.printStackTrace(); - } catch (TimeoutException e) { - LOG.error("The request for the configuration of all topics timed out"); - e.printStackTrace(); } Map propsMap = new HashMap<>(); @@ -128,14 +125,14 @@ private static Properties convertTopicConfigToProperties(Config config) { @Override public void configure(Map configs) { - KafkaCruiseControlConfig ccConfig = new KafkaCruiseControlConfig(configs); - _adminTimeoutMs = ccConfig.getConfiguredInstance(ExecutorConfig.ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG, Integer.class); - _adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(ccConfig)); + _adminClient = (AdminClient) validateNotNull( + configs.get(LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG), + () -> String.format("Missing %s when creating Kafka Admin Client based Topic Config Provider", LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG)); _clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE); } @Override public void close() { - _adminClient.close(Duration.ofMillis(KafkaCruiseControlUtils.ADMIN_CLIENT_CLOSE_TIMEOUT_MS)); + //no-op } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java index b0e62ecde4..51a49de13f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java @@ -82,6 +82,7 @@ public class LoadMonitor { // Metadata TTL is set based on experience -- i.e. a short TTL with large metadata may cause excessive load on brokers. private static final long METADATA_TTL = TimeUnit.SECONDS.toMillis(10); private static final long METADATA_REFRESH_BACKOFF = TimeUnit.SECONDS.toMillis(5); + public static final String KAFKA_ADMIN_CLIENT_OBJECT_CONFIG = "kafka.admin.client.object"; // The maximum time allowed to make a state update. If the state value cannot be updated in time it will be invalidated. // TODO: Make this configurable. private final long _monitorStateUpdateTimeoutMs; @@ -152,8 +153,11 @@ public LoadMonitor(KafkaCruiseControlConfig config, Time time, MetricRegistry dr BrokerCapacityConfigResolver.class); long monitorStateUpdateIntervalMs = config.getLong(MonitorConfig.MONITOR_STATE_UPDATE_INTERVAL_MS_CONFIG); _monitorStateUpdateTimeoutMs = 10 * monitorStateUpdateIntervalMs; - _topicConfigProvider = config.getConfiguredInstance(MonitorConfig.TOPIC_CONFIG_PROVIDER_CLASS_CONFIG, - TopicConfigProvider.class); + _topicConfigProvider = config.getConfiguredInstance( + MonitorConfig.TOPIC_CONFIG_PROVIDER_CLASS_CONFIG, + TopicConfigProvider.class, + Collections.singletonMap(KAFKA_ADMIN_CLIENT_OBJECT_CONFIG, _adminClient) + ); _partitionMetricSampleAggregator = new KafkaPartitionMetricSampleAggregator(config, metadataClient.metadata()); _brokerMetricSampleAggregator = new KafkaBrokerMetricSampleAggregator(config); From 0bcc1f366321ca5d8f517593d09a2f216b5a563f Mon Sep 17 00:00:00 2001 From: Thomas Cooper Date: Thu, 3 Jun 2021 19:01:01 +0100 Subject: [PATCH 05/10] Reduced ERROR level logging in KafkaAdminTopicConfigProvider Signed-off-by: Thomas Cooper --- .../config/KafkaAdminTopicConfigProvider.java | 25 +++++-------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java index 801f3a590a..8753801420 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -4,17 +4,12 @@ package com.linkedin.kafka.cruisecontrol.config; -import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; -import com.linkedin.kafka.cruisecontrol.model.Load; import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; @@ -24,7 +19,6 @@ import org.slf4j.LoggerFactory; import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull; -import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG; /** @@ -62,12 +56,8 @@ public Properties topicConfigs(String topic) { .all() .get() .get(topicResource); - } catch (InterruptedException e) { - LOG.error("The request for the configuration of topic '{}' was interrupted", topic); - e.printStackTrace(); - } catch (ExecutionException e) { - LOG.error("The request for the configuration of topic '{}' failed", topic); - e.printStackTrace(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Config check for topic {} failed due to failure to describe its configs.", topic, e); } if (topicConfig != null) { @@ -93,12 +83,8 @@ public Map allTopicConfigs() { ) .get() .get(); - } catch (InterruptedException e) { - LOG.error("The request for the configuration of all topics was interrupted"); - e.printStackTrace(); - } catch (ExecutionException e) { - LOG.error("The request for the configuration of all topics failed"); - e.printStackTrace(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Config check for all topics failed due to failure to describe their configs.", e); } Map propsMap = new HashMap<>(); @@ -127,7 +113,8 @@ private static Properties convertTopicConfigToProperties(Config config) { public void configure(Map configs) { _adminClient = (AdminClient) validateNotNull( configs.get(LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG), - () -> String.format("Missing %s when creating Kafka Admin Client based Topic Config Provider", LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG)); + () -> String.format("Missing %s when creating Kafka Admin Client based Topic Config Provider", + LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG)); _clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE); } From 4b26a6068bf50cc2764aea45a822e7fdf34ef90d Mon Sep 17 00:00:00 2001 From: Thomas Cooper Date: Thu, 3 Jun 2021 19:39:23 +0100 Subject: [PATCH 06/10] Switch from using .all() to .values() for alltopics config fetching in the KafkaAdminTopicConfigProvider Signed-off-by: Thomas Cooper --- .../config/KafkaAdminTopicConfigProvider.java | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java index 8753801420..acaff18edb 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -14,6 +14,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.config.ConfigResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +71,9 @@ public Properties topicConfigs(String topic) { @Override public Map allTopicConfigs() { - Map topicConfigs = null; + + // Request a map of futures for the config of each topic on the Kafka cluster + Map> topicConfigs = null; try { LOG.debug("Requesting configurations for all topics"); topicConfigs = _adminClient @@ -79,9 +82,8 @@ public Map allTopicConfigs() { .thenApply( topicNameSet -> _adminClient.describeConfigs( topicNameSet.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList()) - ).all() + ).values() ) - .get() .get(); } catch (InterruptedException | ExecutionException e) { LOG.warn("Config check for all topics failed due to failure to describe their configs.", e); @@ -89,14 +91,28 @@ public Map allTopicConfigs() { Map propsMap = new HashMap<>(); if (topicConfigs != null) { - LOG.debug("Converting {} Topic Configs into Properties", topicConfigs.size()); - for (Map.Entry entry : topicConfigs.entrySet()) { - propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(entry.getValue())); + + // Set a method to run when each topic config future completes which either logs any error or adds the config to the properties map + for (Map.Entry> entry : topicConfigs.entrySet()) { + + entry.getValue().whenComplete((config, error) -> { + if (error != null) { + LOG.warn("Topic configurations for topic '{}' on the cluster could not be retrieved due to: {}", entry.getKey(), error.getMessage()); + } else { + propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(config)); + } + }); + + } + + //Block on all the config futures completing + try { + KafkaFuture.allOf(topicConfigs.values().toArray(new KafkaFuture[0])).get(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Config check for all topics failed due to failure to describe their configs.", e); } - LOG.debug("Topic Config conversion complete"); - } else { - LOG.error("Topic configurations for all topics on the cluster could not be retrieved"); } + return propsMap; } From 9705e013671b53cf37df75d2b8f8835d976bbe78 Mon Sep 17 00:00:00 2001 From: Thomas Cooper Date: Fri, 4 Jun 2021 10:51:45 +0100 Subject: [PATCH 07/10] Added deprecated javadoc to ZK based topic config provider and moved common config key to abstract parent class Signed-off-by: Thomas Cooper --- .../cruisecontrol/config/JsonFileTopicConfigProvider.java | 2 ++ .../cruisecontrol/config/KafkaAdminTopicConfigProvider.java | 1 - .../kafka/cruisecontrol/config/KafkaTopicConfigProvider.java | 3 +-- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java index a7cdf868a7..a65730510f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java @@ -20,6 +20,8 @@ */ public abstract class JsonFileTopicConfigProvider implements TopicConfigProvider { + public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; + /** * Method which will find the file path from the supplied config map using the supplied cluster file config key and * load the configs contained in that JSON file into a {@link java.util.Properties} instance. diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java index acaff18edb..fe680658a7 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -37,7 +37,6 @@ public class KafkaAdminTopicConfigProvider extends JsonFileTopicConfigProvider { private static final Logger LOG = LoggerFactory.getLogger(KafkaAdminTopicConfigProvider.class); - public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; private Properties _clusterConfigs; private AdminClient _adminClient; diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java index 305254884c..92cf3c1ef9 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java @@ -23,7 +23,7 @@ * } * * - * This class uses the Zookeeper based admin client that will be removed in Kafka 3.0. Therefore this class has been + * @deprecated This class uses the Zookeeper based admin client that will be removed in Kafka 3.0. Therefore this class has been * deprecated and will be removed in a future Cruise Control release. A new {@link TopicConfigProvider} implementation * using the Kafka Admin Client has been created ({@link KafkaAdminTopicConfigProvider}) and can be set using the * {@code topic.config.provider.class} configuration setting. @@ -31,7 +31,6 @@ */ @Deprecated public class KafkaTopicConfigProvider extends JsonFileTopicConfigProvider { - public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider"; public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs"; private String _connectString; From bf009b3b2dd8ec3ef39d906509c3d108f4c13064 Mon Sep 17 00:00:00 2001 From: Thomas Cooper Date: Mon, 7 Jun 2021 16:04:34 +0100 Subject: [PATCH 08/10] Simplified the future processing in the all topic configs method of Kafka Admin Topic Config Provider Signed-off-by: Thomas Cooper --- .../config/KafkaAdminTopicConfigProvider.java | 52 +++++++++++-------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java index fe680658a7..b10e3aceb4 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -56,8 +56,16 @@ public Properties topicConfigs(String topic) { .all() .get() .get(topicResource); - } catch (InterruptedException | ExecutionException e) { - LOG.warn("Config check for topic {} failed due to failure to describe its configs.", topic, e); + } catch (ExecutionException ee) { + if (org.apache.kafka.common.errors.TimeoutException.class == ee.getCause().getClass()) { + LOG.warn("Failed to retrieve config for topic '{}' due to describeConfigs request time out. Check for Kafka-side issues" + + " and consider increasing the configured timeout.", topic); + } else { + // e.g. could be UnknownTopicOrPartitionException due to topic deletion or InvalidTopicException + LOG.debug("Cannot retrieve config for topic {}.", topic, ee); + } + } catch (InterruptedException ie) { + LOG.debug("Interrupted while getting config for topic {}.", topic, ie); } if (topicConfig != null) { @@ -85,34 +93,34 @@ public Map allTopicConfigs() { ) .get(); } catch (InterruptedException | ExecutionException e) { - LOG.warn("Config check for all topics failed due to failure to describe their configs.", e); + LOG.warn("Unable to get topic configuration futures for all topics via Kafka admin client", e); } Map propsMap = new HashMap<>(); if (topicConfigs != null) { - - // Set a method to run when each topic config future completes which either logs any error or adds the config to the properties map for (Map.Entry> entry : topicConfigs.entrySet()) { - - entry.getValue().whenComplete((config, error) -> { - if (error != null) { - LOG.warn("Topic configurations for topic '{}' on the cluster could not be retrieved due to: {}", entry.getKey(), error.getMessage()); - } else { - propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(config)); - } - }); - - } - - //Block on all the config futures completing - try { - KafkaFuture.allOf(topicConfigs.values().toArray(new KafkaFuture[0])).get(); - } catch (InterruptedException | ExecutionException e) { - LOG.warn("Config check for all topics failed due to failure to describe their configs.", e); + try { + Config config = entry.getValue().get(); + propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(config)); + } catch (ExecutionException ee) { + if (org.apache.kafka.common.errors.TimeoutException.class == ee.getCause().getClass()) { + LOG.warn("Failed to retrieve config for topic '{}' due to describeConfigs request time out. Check for Kafka-side issues" + + " and consider increasing the configured timeout.", entry.getKey().name()); + } else { + // e.g. could be UnknownTopicOrPartitionException due to topic deletion or InvalidTopicException + LOG.debug("Cannot retrieve config for topic {}.", entry.getKey().name(), ee); + } + } catch (InterruptedException ie) { + LOG.debug("Interrupted while getting config for topic {}.", entry.getKey().name(), ie); + } } } - return propsMap; + if (!propsMap.isEmpty()) { + return propsMap; + } else { + throw new RuntimeException("Unable to retrieve topic configuration for any topics in the Kafka cluster"); + } } private static Properties convertTopicConfigToProperties(Config config) { From 4e45b4928391e7c161601eeda86f7b4541be150f Mon Sep 17 00:00:00 2001 From: Thomas Cooper Date: Mon, 7 Jun 2021 18:44:26 +0100 Subject: [PATCH 09/10] Added new method to TopicConfigProvider and refactored implementations Signed-off-by: Thomas Cooper --- .../config/JsonFileTopicConfigProvider.java | 1 - .../config/KafkaAdminTopicConfigProvider.java | 67 +++++++++++++------ .../config/KafkaTopicConfigProvider.java | 31 ++++++++- .../config/TopicConfigProvider.java | 14 +++- 4 files changed, 86 insertions(+), 27 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java index a65730510f..b835103c34 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java @@ -51,5 +51,4 @@ protected static Properties loadClusterConfigs(Map configs, String cl } } - } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java index b10e3aceb4..7e2578cfa3 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -8,6 +8,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor; @@ -45,6 +46,13 @@ public Properties clusterConfigs() { return _clusterConfigs; } + /** + * Fetches the configuration for the requested topic. If an error is encountered the details will be logged and an + * empty Properties instance will be returned. + * + * @param topic Topic name for which the topic-level configurations are required. + * @return Properties instance containing the topic configuration. + */ @Override public Properties topicConfigs(String topic) { Config topicConfig = null; @@ -58,44 +66,64 @@ public Properties topicConfigs(String topic) { .get(topicResource); } catch (ExecutionException ee) { if (org.apache.kafka.common.errors.TimeoutException.class == ee.getCause().getClass()) { - LOG.warn("Failed to retrieve config for topic '{}' due to describeConfigs request time out. Check for Kafka-side issues" + LOG.warn("Failed to retrieve configuration for topic '{}' due to describeConfigs request time out. Check for Kafka-side issues" + " and consider increasing the configured timeout.", topic); } else { // e.g. could be UnknownTopicOrPartitionException due to topic deletion or InvalidTopicException - LOG.debug("Cannot retrieve config for topic {}.", topic, ee); + LOG.warn("Cannot retrieve configuration for topic '{}'.", topic, ee); } } catch (InterruptedException ie) { - LOG.debug("Interrupted while getting config for topic {}.", topic, ie); + LOG.debug("Interrupted while getting configuration for topic '{}'.", topic, ie); } if (topicConfig != null) { return convertTopicConfigToProperties(topicConfig); } else { - LOG.error("The configuration for topic '{}' could not be retrieved", topic); + LOG.warn("The configuration for topic '{}' could not be retrieved, returning empty Properties instance.", topic); return new Properties(); } } + /** + * Fetches the configuration for all the topics on the Kafka cluster. If an error is encountered when retrieving the + * topic names then the error details will be logged and an empty Map instance will be returned. + * + * @return A Map from topic name string to Properties instance containing that topic's configuration. + */ @Override public Map allTopicConfigs() { // Request a map of futures for the config of each topic on the Kafka cluster - Map> topicConfigs = null; + LOG.debug("Requesting configurations for all topics"); + Set topicNames = null; try { - LOG.debug("Requesting configurations for all topics"); - topicConfigs = _adminClient - .listTopics() - .names() - .thenApply( - topicNameSet -> _adminClient.describeConfigs( - topicNameSet.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList()) - ).values() - ) - .get(); + topicNames = _adminClient.listTopics().names().get(); } catch (InterruptedException | ExecutionException e) { - LOG.warn("Unable to get topic configuration futures for all topics via Kafka admin client", e); + LOG.warn("Unable to obtain list of all topic names from the Kafka Cluster"); } + if (topicNames == null) { + return new HashMap<>(); + } else { + return topicConfigs(topicNames); + } + } + + /** + * Fetches the configuration for the requested topics. If an error is encountered, for each topic, the details will be + * logged and the entry for that topic will be omitted from the returned map. + * + * @param topics The set of topic names for which the topic-level configurations are required. + * @return A Map from topic name string to Properties instance containing that topic's configuration. + */ + @Override + public Map topicConfigs(Set topics) { + + Map> topicConfigs; + topicConfigs = _adminClient.describeConfigs( + topics.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList()) + ).values(); + Map propsMap = new HashMap<>(); if (topicConfigs != null) { for (Map.Entry> entry : topicConfigs.entrySet()) { @@ -116,11 +144,7 @@ public Map allTopicConfigs() { } } - if (!propsMap.isEmpty()) { - return propsMap; - } else { - throw new RuntimeException("Unable to retrieve topic configuration for any topics in the Kafka cluster"); - } + return propsMap; } private static Properties convertTopicConfigToProperties(Config config) { @@ -131,7 +155,6 @@ private static Properties convertTopicConfigToProperties(Config config) { return props; } - @Override public void configure(Map configs) { _adminClient = (AdminClient) validateNotNull( diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java index 92cf3c1ef9..55b020a084 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java @@ -9,9 +9,13 @@ import kafka.server.ConfigType; import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.collection.JavaConversions; +import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Set; /** @@ -31,6 +35,8 @@ */ @Deprecated public class KafkaTopicConfigProvider extends JsonFileTopicConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConfigProvider.class); public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider"; public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs"; private String _connectString; @@ -56,10 +62,33 @@ public Properties topicConfigs(String topic) { } } + @Override + public Map topicConfigs(Set topics) { + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, + _zkSecurityEnabled); + + AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); + + Map topicConfigs = new HashMap<>(); + for (String topic : topics) { + try { + Properties topicConfig = adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); + topicConfigs.put(topic, topicConfig); + } catch (Exception e) { + LOG.warn("Unable to retrieve config for topic '{}'", topic, e); + } + } + + KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); + return topicConfigs; + } + @Override public Map allTopicConfigs() { KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, _zkSecurityEnabled); try { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/TopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/TopicConfigProvider.java index dac4e0ec13..1af25eb337 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/TopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/TopicConfigProvider.java @@ -7,6 +7,7 @@ import com.linkedin.cruisecontrol.common.CruiseControlConfigurable; import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.kafka.common.annotation.InterfaceStability; @@ -23,12 +24,19 @@ public interface TopicConfigProvider extends CruiseControlConfigurable, AutoClos Properties clusterConfigs(); /** - * Get topic-level configs for the requested topic. - * @param topic Topic for which the topic-level configs are requested. - * @return Topic-level configs for the requested topic. + * Get topic-level configurations for the requested topic. + * @param topic Topic name for which the topic-level configurations are required. + * @return A {@link Properties} instance containing the topic-level configuration for the requested topic. */ Properties topicConfigs(String topic); + /** + * Get the topic-level configurations for the requested topics. + * @param topics The set of topic names for which the topic-level configurations are required. + * @return A map from the topic name to a {@link Properties} instance containing that topic's configuration. + */ + Map topicConfigs(Set topics); + /** * @return Topic-level configs for all topics. */ From 4cbfd8f729a66bf53e63e8eb956673fe20b87d3c Mon Sep 17 00:00:00 2001 From: Thomas Cooper Date: Sat, 3 Jul 2021 09:57:00 +0100 Subject: [PATCH 10/10] Addressed reviewer comments Signed-off-by: Thomas Cooper --- .../config/KafkaAdminTopicConfigProvider.java | 56 ++++++++++--------- .../config/KafkaTopicConfigProvider.java | 48 +++++++++------- 2 files changed, 56 insertions(+), 48 deletions(-) diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java index 7e2578cfa3..99372b051b 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -84,31 +84,6 @@ public Properties topicConfigs(String topic) { } } - /** - * Fetches the configuration for all the topics on the Kafka cluster. If an error is encountered when retrieving the - * topic names then the error details will be logged and an empty Map instance will be returned. - * - * @return A Map from topic name string to Properties instance containing that topic's configuration. - */ - @Override - public Map allTopicConfigs() { - - // Request a map of futures for the config of each topic on the Kafka cluster - LOG.debug("Requesting configurations for all topics"); - Set topicNames = null; - try { - topicNames = _adminClient.listTopics().names().get(); - } catch (InterruptedException | ExecutionException e) { - LOG.warn("Unable to obtain list of all topic names from the Kafka Cluster"); - } - - if (topicNames == null) { - return new HashMap<>(); - } else { - return topicConfigs(topicNames); - } - } - /** * Fetches the configuration for the requested topics. If an error is encountered, for each topic, the details will be * logged and the entry for that topic will be omitted from the returned map. @@ -132,8 +107,10 @@ public Map topicConfigs(Set topics) { propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(config)); } catch (ExecutionException ee) { if (org.apache.kafka.common.errors.TimeoutException.class == ee.getCause().getClass()) { - LOG.warn("Failed to retrieve config for topic '{}' due to describeConfigs request time out. Check for Kafka-side issues" - + " and consider increasing the configured timeout.", entry.getKey().name()); + LOG.warn("Failed to retrieve config for topics due to describeConfigs request timing out. " + + "Check for Kafka-side issues and consider increasing the configured timeout."); + // If one has timed out then they all will so abort the loop. + break; } else { // e.g. could be UnknownTopicOrPartitionException due to topic deletion or InvalidTopicException LOG.debug("Cannot retrieve config for topic {}.", entry.getKey().name(), ee); @@ -147,6 +124,31 @@ public Map topicConfigs(Set topics) { return propsMap; } + /** + * Fetches the configuration for all the topics on the Kafka cluster. If an error is encountered when retrieving the + * topic names then the error details will be logged and an empty Map instance will be returned. + * + * @return A Map from topic name string to Properties instance containing that topic's configuration. + */ + @Override + public Map allTopicConfigs() { + + // Request a map of futures for the config of each topic on the Kafka cluster + LOG.debug("Requesting configurations for all topics"); + Set topicNames = null; + try { + topicNames = _adminClient.listTopics().names().get(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Unable to obtain list of all topic names from the Kafka Cluster"); + } + + if (topicNames == null) { + return Collections.emptyMap(); + } else { + return topicConfigs(topicNames); + } + } + private static Properties convertTopicConfigToProperties(Config config) { Properties props = new Properties(); for (ConfigEntry entry : config.entries()) { diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java index 55b020a084..de9ccfa75f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java @@ -50,10 +50,11 @@ public Properties clusterConfigs() { @Override public Properties topicConfigs(String topic) { - KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, - _zkSecurityEnabled); + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient( + _connectString, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, + _zkSecurityEnabled); try { AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); return adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); @@ -64,33 +65,38 @@ public Properties topicConfigs(String topic) { @Override public Map topicConfigs(Set topics) { - KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, - _zkSecurityEnabled); + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient( + _connectString, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, + _zkSecurityEnabled); - AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); + Map topicConfigs = new HashMap<>(topics.size()); + try { + AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); - Map topicConfigs = new HashMap<>(); - for (String topic : topics) { - try { - Properties topicConfig = adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); - topicConfigs.put(topic, topicConfig); - } catch (Exception e) { - LOG.warn("Unable to retrieve config for topic '{}'", topic, e); + for (String topic : topics) { + try { + Properties topicConfig = adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); + topicConfigs.put(topic, topicConfig); + } catch (Exception e) { + LOG.warn("Unable to retrieve config for topic '{}'", topic, e); + } } + } finally { + KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); } - KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); return topicConfigs; } @Override public Map allTopicConfigs() { - KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, - _zkSecurityEnabled); + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient( + _connectString, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, + _zkSecurityEnabled); try { AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); return JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());