From 30ecf7bbf7938824182ab0ed7ef45bcc0446dfca Mon Sep 17 00:00:00 2001 From: Thomas Cooper Date: Thu, 15 Jul 2021 22:49:22 +0100 Subject: [PATCH] Add support to switch from ZK to Kafka Admin Client for topic config provider class (#1569) --- .../config/JsonFileTopicConfigProvider.java | 54 ++++++ .../config/KafkaAdminTopicConfigProvider.java | 173 ++++++++++++++++++ .../config/KafkaTopicConfigProvider.java | 88 +++++---- .../config/TopicConfigProvider.java | 14 +- .../cruisecontrol/monitor/LoadMonitor.java | 8 +- .../KafkaCruiseControlUnitTestUtils.java | 2 +- 6 files changed, 296 insertions(+), 43 deletions(-) create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java create mode 100644 cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java new file mode 100644 index 0000000000..b835103c34 --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/JsonFileTopicConfigProvider.java @@ -0,0 +1,54 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.config; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Properties; + +/** + * Abstract implementation of {@link TopicConfigProvider} which provides a method for loading cluster configurations + * from a JSON file. + */ +public abstract class JsonFileTopicConfigProvider implements TopicConfigProvider { + + public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; + + /** + * Method which will find the file path from the supplied config map using the supplied cluster file config key and + * load the configs contained in that JSON file into a {@link java.util.Properties} instance. + * + * The format of the file is JSON, with properties listed as top level key/value pairs: + * + *
+     *   {
+     *     "min.insync.replicas": 1,
+     *     "an.example.cluster.config": false
+     *   }
+     * 
+ * + * @param configs The map of config key/value pairs + * @param clusterConfigsFileKey The key under which the config file path is stored. + * @return A {@link java.util.Properties} instance containing the contents of the specified JSON config file. + */ + protected static Properties loadClusterConfigs(Map configs, String clusterConfigsFileKey) { + String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, clusterConfigsFileKey); + try { + try (JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(configFile), StandardCharsets.UTF_8))) { + Gson gson = new Gson(); + return gson.fromJson(reader, Properties.class); + } + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java new file mode 100644 index 0000000000..99372b051b --- /dev/null +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -0,0 +1,173 @@ +/* + * Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information. + */ + +package com.linkedin.kafka.cruisecontrol.config; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.config.ConfigResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull; + + +/** + * The Kafka topic config provider implementation based on using the Kafka Admin Client for topic level configurations + * and files for cluster level configurations. The format of the file is JSON, listing properties: + *
+ *   {
+ *     "min.insync.replicas": 1,
+ *     "an.example.cluster.config": false
+ *   }
+ * 
+ * + */ +public class KafkaAdminTopicConfigProvider extends JsonFileTopicConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaAdminTopicConfigProvider.class); + + private Properties _clusterConfigs; + private AdminClient _adminClient; + + @Override + public Properties clusterConfigs() { + return _clusterConfigs; + } + + /** + * Fetches the configuration for the requested topic. If an error is encountered the details will be logged and an + * empty Properties instance will be returned. + * + * @param topic Topic name for which the topic-level configurations are required. + * @return Properties instance containing the topic configuration. + */ + @Override + public Properties topicConfigs(String topic) { + Config topicConfig = null; + ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic); + try { + LOG.debug("Requesting details for topic '{}'", topic); + topicConfig = _adminClient + .describeConfigs(Collections.singletonList(topicResource)) + .all() + .get() + .get(topicResource); + } catch (ExecutionException ee) { + if (org.apache.kafka.common.errors.TimeoutException.class == ee.getCause().getClass()) { + LOG.warn("Failed to retrieve configuration for topic '{}' due to describeConfigs request time out. Check for Kafka-side issues" + + " and consider increasing the configured timeout.", topic); + } else { + // e.g. could be UnknownTopicOrPartitionException due to topic deletion or InvalidTopicException + LOG.warn("Cannot retrieve configuration for topic '{}'.", topic, ee); + } + } catch (InterruptedException ie) { + LOG.debug("Interrupted while getting configuration for topic '{}'.", topic, ie); + } + + if (topicConfig != null) { + return convertTopicConfigToProperties(topicConfig); + } else { + LOG.warn("The configuration for topic '{}' could not be retrieved, returning empty Properties instance.", topic); + return new Properties(); + } + } + + /** + * Fetches the configuration for the requested topics. If an error is encountered, for each topic, the details will be + * logged and the entry for that topic will be omitted from the returned map. + * + * @param topics The set of topic names for which the topic-level configurations are required. + * @return A Map from topic name string to Properties instance containing that topic's configuration. + */ + @Override + public Map topicConfigs(Set topics) { + + Map> topicConfigs; + topicConfigs = _adminClient.describeConfigs( + topics.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList()) + ).values(); + + Map propsMap = new HashMap<>(); + if (topicConfigs != null) { + for (Map.Entry> entry : topicConfigs.entrySet()) { + try { + Config config = entry.getValue().get(); + propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(config)); + } catch (ExecutionException ee) { + if (org.apache.kafka.common.errors.TimeoutException.class == ee.getCause().getClass()) { + LOG.warn("Failed to retrieve config for topics due to describeConfigs request timing out. " + + "Check for Kafka-side issues and consider increasing the configured timeout."); + // If one has timed out then they all will so abort the loop. + break; + } else { + // e.g. could be UnknownTopicOrPartitionException due to topic deletion or InvalidTopicException + LOG.debug("Cannot retrieve config for topic {}.", entry.getKey().name(), ee); + } + } catch (InterruptedException ie) { + LOG.debug("Interrupted while getting config for topic {}.", entry.getKey().name(), ie); + } + } + } + + return propsMap; + } + + /** + * Fetches the configuration for all the topics on the Kafka cluster. If an error is encountered when retrieving the + * topic names then the error details will be logged and an empty Map instance will be returned. + * + * @return A Map from topic name string to Properties instance containing that topic's configuration. + */ + @Override + public Map allTopicConfigs() { + + // Request a map of futures for the config of each topic on the Kafka cluster + LOG.debug("Requesting configurations for all topics"); + Set topicNames = null; + try { + topicNames = _adminClient.listTopics().names().get(); + } catch (InterruptedException | ExecutionException e) { + LOG.warn("Unable to obtain list of all topic names from the Kafka Cluster"); + } + + if (topicNames == null) { + return Collections.emptyMap(); + } else { + return topicConfigs(topicNames); + } + } + + private static Properties convertTopicConfigToProperties(Config config) { + Properties props = new Properties(); + for (ConfigEntry entry : config.entries()) { + props.put(entry.name(), entry.value()); + } + return props; + } + + @Override + public void configure(Map configs) { + _adminClient = (AdminClient) validateNotNull( + configs.get(LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG), + () -> String.format("Missing %s when creating Kafka Admin Client based Topic Config Provider", + LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG)); + _clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE); + } + + @Override + public void close() { + //no-op + } +} diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java index 81ea4c6596..de9ccfa75f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaTopicConfigProvider.java @@ -4,21 +4,18 @@ package com.linkedin.kafka.cruisecontrol.config; -import com.google.gson.Gson; -import com.google.gson.stream.JsonReader; import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.Properties; import kafka.server.ConfigType; import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.collection.JavaConversions; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; /** @@ -30,14 +27,21 @@ * } * * + * @deprecated This class uses the Zookeeper based admin client that will be removed in Kafka 3.0. Therefore this class has been + * deprecated and will be removed in a future Cruise Control release. A new {@link TopicConfigProvider} implementation + * using the Kafka Admin Client has been created ({@link KafkaAdminTopicConfigProvider}) and can be set using the + * {@code topic.config.provider.class} configuration setting. + * */ -public class KafkaTopicConfigProvider implements TopicConfigProvider { - public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; +@Deprecated +public class KafkaTopicConfigProvider extends JsonFileTopicConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConfigProvider.class); public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider"; public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs"; private String _connectString; private boolean _zkSecurityEnabled; - private static Properties _clusterConfigs; + private Properties _clusterConfigs; @Override public Properties clusterConfigs() { @@ -46,10 +50,11 @@ public Properties clusterConfigs() { @Override public Properties topicConfigs(String topic) { - KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, - _zkSecurityEnabled); + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient( + _connectString, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, + _zkSecurityEnabled); try { AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); return adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); @@ -59,30 +64,44 @@ public Properties topicConfigs(String topic) { } @Override - public Map allTopicConfigs() { - KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, - ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, - _zkSecurityEnabled); + public Map topicConfigs(Set topics) { + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient( + _connectString, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, + _zkSecurityEnabled); + + Map topicConfigs = new HashMap<>(topics.size()); try { AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); - return JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs()); + + for (String topic : topics) { + try { + Properties topicConfig = adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic); + topicConfigs.put(topic, topicConfig); + } catch (Exception e) { + LOG.warn("Unable to retrieve config for topic '{}'", topic, e); + } + } } finally { KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); } + + return topicConfigs; } - private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException { - JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8)); + @Override + public Map allTopicConfigs() { + KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient( + _connectString, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP, + ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE, + _zkSecurityEnabled); try { - Gson gson = new Gson(); - _clusterConfigs = gson.fromJson(reader, Properties.class); + AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient); + return JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs()); } finally { - try { - reader.close(); - } catch (IOException e) { - // let it go. - } + KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient); } } @@ -90,12 +109,7 @@ private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundEx public void configure(Map configs) { _connectString = (String) configs.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG); _zkSecurityEnabled = (Boolean) configs.get(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG); - String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE); - try { - loadClusterConfigs(configFile); - } catch (FileNotFoundException e) { - throw new IllegalArgumentException(e); - } + _clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE); } @Override diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/TopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/TopicConfigProvider.java index dac4e0ec13..1af25eb337 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/TopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/TopicConfigProvider.java @@ -7,6 +7,7 @@ import com.linkedin.cruisecontrol.common.CruiseControlConfigurable; import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.kafka.common.annotation.InterfaceStability; @@ -23,12 +24,19 @@ public interface TopicConfigProvider extends CruiseControlConfigurable, AutoClos Properties clusterConfigs(); /** - * Get topic-level configs for the requested topic. - * @param topic Topic for which the topic-level configs are requested. - * @return Topic-level configs for the requested topic. + * Get topic-level configurations for the requested topic. + * @param topic Topic name for which the topic-level configurations are required. + * @return A {@link Properties} instance containing the topic-level configuration for the requested topic. */ Properties topicConfigs(String topic); + /** + * Get the topic-level configurations for the requested topics. + * @param topics The set of topic names for which the topic-level configurations are required. + * @return A map from the topic name to a {@link Properties} instance containing that topic's configuration. + */ + Map topicConfigs(Set topics); + /** * @return Topic-level configs for all topics. */ diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java index b0e62ecde4..51a49de13f 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java @@ -82,6 +82,7 @@ public class LoadMonitor { // Metadata TTL is set based on experience -- i.e. a short TTL with large metadata may cause excessive load on brokers. private static final long METADATA_TTL = TimeUnit.SECONDS.toMillis(10); private static final long METADATA_REFRESH_BACKOFF = TimeUnit.SECONDS.toMillis(5); + public static final String KAFKA_ADMIN_CLIENT_OBJECT_CONFIG = "kafka.admin.client.object"; // The maximum time allowed to make a state update. If the state value cannot be updated in time it will be invalidated. // TODO: Make this configurable. private final long _monitorStateUpdateTimeoutMs; @@ -152,8 +153,11 @@ public LoadMonitor(KafkaCruiseControlConfig config, Time time, MetricRegistry dr BrokerCapacityConfigResolver.class); long monitorStateUpdateIntervalMs = config.getLong(MonitorConfig.MONITOR_STATE_UPDATE_INTERVAL_MS_CONFIG); _monitorStateUpdateTimeoutMs = 10 * monitorStateUpdateIntervalMs; - _topicConfigProvider = config.getConfiguredInstance(MonitorConfig.TOPIC_CONFIG_PROVIDER_CLASS_CONFIG, - TopicConfigProvider.class); + _topicConfigProvider = config.getConfiguredInstance( + MonitorConfig.TOPIC_CONFIG_PROVIDER_CLASS_CONFIG, + TopicConfigProvider.class, + Collections.singletonMap(KAFKA_ADMIN_CLIENT_OBJECT_CONFIG, _adminClient) + ); _partitionMetricSampleAggregator = new KafkaPartitionMetricSampleAggregator(config, metadataClient.metadata()); _brokerMetricSampleAggregator = new KafkaBrokerMetricSampleAggregator(config); diff --git a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUnitTestUtils.java b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUnitTestUtils.java index 69e28b0c8e..21e5ba9cae 100644 --- a/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUnitTestUtils.java +++ b/cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlUnitTestUtils.java @@ -50,7 +50,7 @@ public static Properties getKafkaCruiseControlProperties() { String clusterConfigsFile = Objects.requireNonNull(KafkaCruiseControlUnitTestUtils.class.getClassLoader().getResource( TestConstants.DEFAULT_CLUSTER_CONFIGS_FILE)).getFile(); props.setProperty(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2121"); - props.setProperty(MonitorConfig.BOOTSTRAP_SERVERS_CONFIG, "aaa"); + props.setProperty(MonitorConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.setProperty(MonitorConfig.METRIC_SAMPLER_CLASS_CONFIG, NoopSampler.class.getName()); props.setProperty(BrokerCapacityConfigFileResolver.CAPACITY_CONFIG_FILE, capacityConfigFile); props.setProperty(KafkaTopicConfigProvider.CLUSTER_CONFIGS_FILE, clusterConfigsFile);