Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from ZK to Kafka Admin Client #1569

Merged
merged 10 commits into from
Jul 15, 2021
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.config;

import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;

/**
* Abstract implementation of {@link TopicConfigProvider} which provides a method for loading cluster configurations
* from a JSON file.
*/
public abstract class JsonFileTopicConfigProvider implements TopicConfigProvider {

public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file";

/**
* Method which will find the file path from the supplied config map using the supplied cluster file config key and
* load the configs contained in that JSON file into a {@link java.util.Properties} instance.
*
* The format of the file is JSON, with properties listed as top level key/value pairs:
*
* <pre>
* {
* "min.insync.replicas": 1,
* "an.example.cluster.config": false
* }
* </pre>
*
* @param configs The map of config key/value pairs
* @param clusterConfigsFileKey The key under which the config file path is stored.
* @return A {@link java.util.Properties} instance containing the contents of the specified JSON config file.
*/
protected static Properties loadClusterConfigs(Map<String, ?> configs, String clusterConfigsFileKey) {
String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, clusterConfigsFileKey);
try {
try (JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(configFile), StandardCharsets.UTF_8))) {
Gson gson = new Gson();
return gson.fromJson(reader, Properties.class);
}
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright 2021 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
*/

package com.linkedin.kafka.cruisecontrol.config;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;


/**
* The Kafka topic config provider implementation based on using the Kafka Admin Client for topic level configurations
* and files for cluster level configurations. The format of the file is JSON, listing properties:
* <pre>
* {
* "min.insync.replicas": 1,
* "an.example.cluster.config": false
* }
* </pre>
*
*/
public class KafkaAdminTopicConfigProvider extends JsonFileTopicConfigProvider {

private static final Logger LOG = LoggerFactory.getLogger(KafkaAdminTopicConfigProvider.class);

private Properties _clusterConfigs;
private AdminClient _adminClient;

@Override
public Properties clusterConfigs() {
return _clusterConfigs;
}

/**
* Fetches the configuration for the requested topic. If an error is encountered the details will be logged and an
* empty Properties instance will be returned.
*
* @param topic Topic name for which the topic-level configurations are required.
* @return Properties instance containing the topic configuration.
*/
@Override
public Properties topicConfigs(String topic) {
Config topicConfig = null;
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
try {
LOG.debug("Requesting details for topic '{}'", topic);
topicConfig = _adminClient
.describeConfigs(Collections.singletonList(topicResource))
.all()
.get()
.get(topicResource);
} catch (ExecutionException ee) {
if (org.apache.kafka.common.errors.TimeoutException.class == ee.getCause().getClass()) {
LOG.warn("Failed to retrieve configuration for topic '{}' due to describeConfigs request time out. Check for Kafka-side issues"
+ " and consider increasing the configured timeout.", topic);
} else {
// e.g. could be UnknownTopicOrPartitionException due to topic deletion or InvalidTopicException
LOG.warn("Cannot retrieve configuration for topic '{}'.", topic, ee);
}
} catch (InterruptedException ie) {
LOG.debug("Interrupted while getting configuration for topic '{}'.", topic, ie);
}

if (topicConfig != null) {
return convertTopicConfigToProperties(topicConfig);
} else {
LOG.warn("The configuration for topic '{}' could not be retrieved, returning empty Properties instance.", topic);
return new Properties();
}
}

/**
* Fetches the configuration for the requested topics. If an error is encountered, for each topic, the details will be
* logged and the entry for that topic will be omitted from the returned map.
*
* @param topics The set of topic names for which the topic-level configurations are required.
* @return A Map from topic name string to Properties instance containing that topic's configuration.
*/
@Override
public Map<String, Properties> topicConfigs(Set<String> topics) {

Map<ConfigResource, KafkaFuture<Config>> topicConfigs;
topicConfigs = _adminClient.describeConfigs(
topics.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList())
).values();

Map<String, Properties> propsMap = new HashMap<>();
if (topicConfigs != null) {
for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : topicConfigs.entrySet()) {
try {
Config config = entry.getValue().get();
propsMap.put(entry.getKey().name(), convertTopicConfigToProperties(config));
} catch (ExecutionException ee) {
if (org.apache.kafka.common.errors.TimeoutException.class == ee.getCause().getClass()) {
LOG.warn("Failed to retrieve config for topics due to describeConfigs request timing out. "
+ "Check for Kafka-side issues and consider increasing the configured timeout.");
// If one has timed out then they all will so abort the loop.
break;
} else {
// e.g. could be UnknownTopicOrPartitionException due to topic deletion or InvalidTopicException
LOG.debug("Cannot retrieve config for topic {}.", entry.getKey().name(), ee);
}
} catch (InterruptedException ie) {
LOG.debug("Interrupted while getting config for topic {}.", entry.getKey().name(), ie);
}
}
}

return propsMap;
}

/**
* Fetches the configuration for all the topics on the Kafka cluster. If an error is encountered when retrieving the
* topic names then the error details will be logged and an empty Map instance will be returned.
*
* @return A Map from topic name string to Properties instance containing that topic's configuration.
*/
@Override
public Map<String, Properties> allTopicConfigs() {

// Request a map of futures for the config of each topic on the Kafka cluster
LOG.debug("Requesting configurations for all topics");
Set<String> topicNames = null;
try {
topicNames = _adminClient.listTopics().names().get();
} catch (InterruptedException | ExecutionException e) {
LOG.warn("Unable to obtain list of all topic names from the Kafka Cluster");
}

if (topicNames == null) {
return Collections.emptyMap();
} else {
return topicConfigs(topicNames);
}
}

private static Properties convertTopicConfigToProperties(Config config) {
Properties props = new Properties();
for (ConfigEntry entry : config.entries()) {
props.put(entry.name(), entry.value());
}
return props;
}

@Override
public void configure(Map<String, ?> configs) {
_adminClient = (AdminClient) validateNotNull(
configs.get(LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG),
() -> String.format("Missing %s when creating Kafka Admin Client based Topic Config Provider",
LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG));
_clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE);
}

@Override
public void close() {
//no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,18 @@

package com.linkedin.kafka.cruisecontrol.config;

import com.google.gson.Gson;
import com.google.gson.stream.JsonReader;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;
import kafka.server.ConfigType;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;


/**
Expand All @@ -30,14 +27,21 @@
* }
* </pre>
*
* @deprecated This class uses the Zookeeper based admin client that will be removed in Kafka 3.0. Therefore this class has been
* deprecated and will be removed in a future Cruise Control release. A new {@link TopicConfigProvider} implementation
* using the Kafka Admin Client has been created ({@link KafkaAdminTopicConfigProvider}) and can be set using the
* {@code topic.config.provider.class} configuration setting.
*
*/
public class KafkaTopicConfigProvider implements TopicConfigProvider {
public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file";
@Deprecated
tomncooper marked this conversation as resolved.
Show resolved Hide resolved
public class KafkaTopicConfigProvider extends JsonFileTopicConfigProvider {

private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConfigProvider.class);
public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider";
public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs";
private String _connectString;
private boolean _zkSecurityEnabled;
private static Properties _clusterConfigs;
private Properties _clusterConfigs;

@Override
public Properties clusterConfigs() {
Expand All @@ -46,10 +50,11 @@ public Properties clusterConfigs() {

@Override
public Properties topicConfigs(String topic) {
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(
_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);
try {
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
return adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic);
Expand All @@ -59,43 +64,52 @@ public Properties topicConfigs(String topic) {
}

@Override
public Map<String, Properties> allTopicConfigs() {
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);
public Map<String, Properties> topicConfigs(Set<String> topics) {
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(
_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);

Map<String, Properties> topicConfigs = new HashMap<>(topics.size());
try {
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
return JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());

for (String topic : topics) {
try {
Properties topicConfig = adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic);
topicConfigs.put(topic, topicConfig);
} catch (Exception e) {
LOG.warn("Unable to retrieve config for topic '{}'", topic, e);
}
}
} finally {
KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient);
}

return topicConfigs;
}

private void loadClusterConfigs(String clusterConfigsFile) throws FileNotFoundException {
JsonReader reader = new JsonReader(new InputStreamReader(new FileInputStream(clusterConfigsFile), StandardCharsets.UTF_8));
@Override
public Map<String, Properties> allTopicConfigs() {
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(
_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);
try {
Gson gson = new Gson();
_clusterConfigs = gson.fromJson(reader, Properties.class);
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
return JavaConversions.mapAsJavaMap(adminZkClient.getAllTopicConfigs());
} finally {
try {
reader.close();
} catch (IOException e) {
// let it go.
}
KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient);
}
}

@Override
public void configure(Map<String, ?> configs) {
_connectString = (String) configs.get(ExecutorConfig.ZOOKEEPER_CONNECT_CONFIG);
_zkSecurityEnabled = (Boolean) configs.get(ExecutorConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG);
String configFile = KafkaCruiseControlUtils.getRequiredConfig(configs, CLUSTER_CONFIGS_FILE);
try {
loadClusterConfigs(configFile);
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
_clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE);
}

@Override
Expand Down
Loading