Skip to content

Commit

Permalink
Add support to switch from ZK to Kafka Admin Client for topic config …
Browse files Browse the repository at this point in the history
…provider class (#1569)
  • Loading branch information
tomncooper authored Jul 15, 2021
1 parent deb5101 commit 30ecf7b
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.config;

import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;

/**
* Abstract implementation of {@link TopicConfigProvider} which provides a method for loading cluster configurations
* from a JSON file.
*/
public abstract class JsonFileTopicConfigProvider implements TopicConfigProvider {

public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file";

/**
* Method which will find the file path from the supplied config map using the supplied cluster file config key and
* load the configs contained in that JSON file into a {@link java.util.Properties} instance.
*
* The format of the file is JSON, with properties listed as top level key/value pairs:
*
* <pre>
* {
* "min.insync.replicas": 1,
* "an.example.cluster.config": false
* }
* </pre>
*
* @param configs The map of config key/value pairs
* @param clusterConfigsFileKey The key under which the config file path is stored.
* @return A {@link java.util.Properties} instance containing the contents of the specified JSON config file.
*/
protected static Properties loadClusterConfigs(Map<String, ?> configs, String clusterConfigsFileKey) {
String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, clusterConfigsFileKey);
try {
try (JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(configFile), StandardCharsets.UTF_8))) {
Gson gson = new Gson();
return gson.fromJson(reader, Properties.class);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.config;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;


/**
* The Kafka topic config provider implementation based on using the Kafka Admin Client for topic level configurations
* and files for cluster level configurations. The format of the file is JSON, listing properties:
* <pre>
* {
* "min.insync.replicas": 1,
* "an.example.cluster.config": false
* }
* </pre>
*
*/
public class KafkaAdminTopicConfigProvider extends JsonFileTopicConfigProvider {

private static final Logger LOG = LoggerFactory.getLogger(KafkaAdminTopicConfigProvider.class);

private Properties _clusterConfigs;
private AdminClient _adminClient;

@Override
public Properties clusterConfigs() {
return _clusterConfigs;
}

/**
* Fetches the configuration for the requested topic. If an error is encountered the details will be logged and an
* empty Properties instance will be returned.
*
* @param topic Topic name for which the topic-level configurations are required.
* @return Properties instance containing the topic configuration.
*/
@Override
public Properties topicConfigs(String topic) {
Config topicConfig = null;
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
try {
LOG.debug("Requesting details for topic '{}'", topic);
topicConfig = _adminClient
.describeConfigs(Collections.singletonList(topicResource))
.all()
.get()
.get(topicResource);
} catch (ExecutionException ee) {
if (org.apache.kafka.common.errors.TimeoutException.class == ee.getCause().getClass()) {
LOG.warn("Failed to retrieve configuration for topic '{}' due to describeConfigs request time out. Check for Kafka-side issues"
+ " and consider increasing the configured timeout.", topic);
} else {
// e.g. could be UnknownTopicOrPartitionException due to topic deletion or InvalidTopicException
LOG.warn("Cannot retrieve configuration for topic '{}'.", topic, ee);
}
} catch (InterruptedException ie) {
LOG.debug("Interrupted while getting configuration for topic '{}'.", topic, ie);
}

if (topicConfig != null) {
return convertTopicConfigToProperties(topicConfig);
} else {
LOG.warn("The configuration for topic '{}' could not be retrieved, returning empty Properties instance.", topic);
return new Properties();
}
}

/**
* Fetches the configuration for the requested topics. If an error is encountered, for each topic, the details will be
* logged and the entry for that topic will be omitted from the returned map.
*
* @param topics The set of topic names for which the topic-level configurations are required.
* @return A Map from topic name string to Properties instance containing that topic's configuration.
*/
@Override
public Map<String, Properties> topicConfigs(Set<String> topics) {

Map<ConfigResource, KafkaFuture<Config>> topicConfigs;
topicConfigs = _adminClient.describeConfigs(
topics.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList())
).values();

Map<String, Properties> propsMap = new HashMap<>();
if (topicConfigs != null) {
for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : topicConfigs.entrySet()) {
try {
Config config = entry.getValue().get();
propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(config));
} catch (ExecutionException ee) {
if (org.apache.kafka.common.errors.TimeoutException.class == ee.getCause().getClass()) {
LOG.warn("Failed to retrieve config for topics due to describeConfigs request timing out. "
+ "Check for Kafka-side issues and consider increasing the configured timeout.");
// If one has timed out then they all will so abort the loop.
break;
} else {
// e.g. could be UnknownTopicOrPartitionException due to topic deletion or InvalidTopicException
LOG.debug("Cannot retrieve config for topic {}.", entry.getKey().name(), ee);
}
} catch (InterruptedException ie) {
LOG.debug("Interrupted while getting config for topic {}.", entry.getKey().name(), ie);
}
}
}

return propsMap;
}

/**
* Fetches the configuration for all the topics on the Kafka cluster. If an error is encountered when retrieving the
* topic names then the error details will be logged and an empty Map instance will be returned.
*
* @return A Map from topic name string to Properties instance containing that topic's configuration.
*/
@Override
public Map<String, Properties> allTopicConfigs() {

// Request a map of futures for the config of each topic on the Kafka cluster
LOG.debug("Requesting configurations for all topics");
Set<String> topicNames = null;
try {
topicNames = _adminClient.listTopics().names().get();
} catch (InterruptedException | ExecutionException e) {
LOG.warn("Unable to obtain list of all topic names from the Kafka Cluster");
}

if (topicNames == null) {
return Collections.emptyMap();
} else {
return topicConfigs(topicNames);
}
}

private static Properties convertTopicConfigToProperties(Config config) {
Properties props = new Properties();
for (ConfigEntry entry : config.entries()) {
props.put(entry.name(), entry.value());
}
return props;
}

@Override
public void configure(Map<String, ?> configs) {
_adminClient = (AdminClient) validateNotNull(
configs.get(LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG),
() -> String.format("Missing %s when creating Kafka Admin Client based Topic Config Provider",
LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG));
_clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE);
}

@Override
public void close() {
//no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,18 @@

package com.linkedin.kafka.cruisecontrol.config;

import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;
import kafka.server.ConfigType;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;


/**
Expand All @@ -30,14 +27,21 @@
* }
* </pre>
*
* @deprecated This class uses the Zookeeper based admin client that will be removed in Kafka 3.0. Therefore this class has been
* deprecated and will be removed in a future Cruise Control release. A new {@link TopicConfigProvider} implementation
* using the Kafka Admin Client has been created ({@link KafkaAdminTopicConfigProvider}) and can be set using the
* {@code topic.config.provider.class} configuration setting.
*
*/
public class KafkaTopicConfigProvider implements TopicConfigProvider {
public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file";
@Deprecated
public class KafkaTopicConfigProvider extends JsonFileTopicConfigProvider {

private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConfigProvider.class);
public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider";
public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs";
private String _connectString;
private boolean _zkSecurityEnabled;
private static Properties _clusterConfigs;
private Properties _clusterConfigs;

@Override
public Properties clusterConfigs() {
Expand All @@ -46,10 +50,11 @@ public Properties clusterConfigs() {

@Override
public Properties topicConfigs(String topic) {
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(
_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);
try {
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
return adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic);
Expand All @@ -59,43 +64,52 @@ public Properties topicConfigs(String topic) {
}

@Override
public Map<String, Properties> allTopicConfigs() {
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);
public Map<String, Properties> topicConfigs(Set<String> topics) {
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(
_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);

Map<String, Properties> topicConfigs = new HashMap<>(topics.size());
try {
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
return JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());

for (String topic : topics) {
try {
Properties topicConfig = adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic);
topicConfigs.put(topic, topicConfig);
} catch (Exception e) {
LOG.warn("Unable to retrieve config for topic '{}'", topic, e);
}
}
} finally {
KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient);
}

return topicConfigs;
}

private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException {
JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8));
@Override
public Map<String, Properties> allTopicConfigs() {
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(
_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);
try {
Gson gson = new Gson();
_clusterConfigs = gson.fromJson(reader, Properties.class);
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
return JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());
} finally {
try {
reader.close();
} catch (IOException e) {
// let it go.
}
KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient);
}
}

@Override
public void configure(Map<String, ?> configs) {
_connectString = (String) configs.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG);
_zkSecurityEnabled = (Boolean) configs.get(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG);
String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE);
try {
loadClusterConfigs(configFile);
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
_clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE);
}

@Override
Expand Down
Loading

0 comments on commit 30ecf7b

Please sign in to comment.