Skip to content

Commit

Permalink
Added new method to TopicConfigProvider and refactored implementations
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Cooper <[email protected]>
  • Loading branch information
tomncooper committed Jun 7, 2021
1 parent d1adc6d commit 83ecc95
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
Expand Down Expand Up @@ -45,6 +46,13 @@ public Properties clusterConfigs() {
return _clusterConfigs;
}

/**
* Fetches the configuration for the requested topic. If an error is encountered the details will be logged and an
* empty Properties instance will be returned.
*
* @param topic Topic name for which the topic-level configurations are required.
* @return Properties instance containing the topic configuration.
*/
@Override
public Properties topicConfigs(String topic) {
Config topicConfig = null;
Expand All @@ -58,44 +66,64 @@ public Properties topicConfigs(String topic) {
.get(topicResource);
} catch (ExecutionException ee) {
if (org.apache.kafka.common.errors.TimeoutException.class == ee.getCause().getClass()) {
LOG.warn("Failed to retrieve config for topic '{}' due to describeConfigs request time out. Check for Kafka-side issues"
LOG.warn("Failed to retrieve configuration for topic '{}' due to describeConfigs request time out. Check for Kafka-side issues"
+ " and consider increasing the configured timeout.", topic);
} else {
// e.g. could be UnknownTopicOrPartitionException due to topic deletion or InvalidTopicException
LOG.debug("Cannot retrieve config for topic {}.", topic, ee);
LOG.warn("Cannot retrieve configuration for topic '{}'.", topic, ee);
}
} catch (InterruptedException ie) {
LOG.debug("Interrupted while getting config for topic {}.", topic, ie);
LOG.debug("Interrupted while getting configuration for topic '{}'.", topic, ie);
}

if (topicConfig != null) {
return convertTopicConfigToProperties(topicConfig);
} else {
LOG.error("The configuration for topic '{}' could not be retrieved", topic);
LOG.warn("The configuration for topic '{}' could not be retrieved, returning empty Properties instance.", topic);
return new Properties();
}
}

/**
* Fetches the configuration for all the topics on the Kafka cluster. If an error is encountered when retrieving the
* topic names then the error details will be logged and an empty Map instance will be returned.
*
* @return A Map from topic name string to Properties instance containing that topic's configuration.
*/
@Override
public Map<String, Properties> allTopicConfigs() {

// Request a map of futures for the config of each topic on the Kafka cluster
Map<ConfigResource, KafkaFuture<Config>> topicConfigs = null;
LOG.debug("Requesting configurations for all topics");
Set<String> topicNames = null;
try {
LOG.debug("Requesting configurations for all topics");
topicConfigs = _adminClient
.listTopics()
.names()
.thenApply(
topicNameSet -> _adminClient.describeConfigs(
topicNameSet.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList())
).values()
)
.get();
topicNames = _adminClient.listTopics().names().get();
} catch (InterruptedException | ExecutionException e) {
LOG.warn("Unable to get topic configuration futures for all topics via Kafka admin client", e);
LOG.warn("Unable to obtain list of all topic names from the Kafka Cluster");
}

if (topicNames == null) {
return new HashMap<>();
} else {
return topicConfigs(topicNames);
}
}

/**
* Fetches the configuration for the requested topics. If an error is encountered, for each topic, the details will be
* logged and the entry for that topic will be omitted from the returned map.
*
* @param topics The set of topic names for which the topic-level configurations are required.
* @return A Map from topic name string to Properties instance containing that topic's configuration.
*/
@Override
public Map<String, Properties> topicConfigs(Set<String> topics) {

Map<ConfigResource, KafkaFuture<Config>> topicConfigs;
topicConfigs = _adminClient.describeConfigs(
topics.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList())
).values();

Map<String, Properties> propsMap = new HashMap<>();
if (topicConfigs != null) {
for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : topicConfigs.entrySet()) {
Expand All @@ -116,11 +144,7 @@ public Map<String, Properties> allTopicConfigs() {
}
}

if (!propsMap.isEmpty()) {
return propsMap;
} else {
throw new RuntimeException("Unable to retrieve topic configuration for any topics in the Kafka cluster");
}
return propsMap;
}

private static Properties convertTopicConfigToProperties(Config config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@
import kafka.server.ConfigType;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;


/**
Expand All @@ -31,6 +36,8 @@
*/
@Deprecated
public class KafkaTopicConfigProvider extends JsonFileTopicConfigProvider {

private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicConfigProvider.class);
public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP = "KafkaTopicConfigProvider";
public static final String ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE = "GetAllActiveTopicConfigs";
private String _connectString;
Expand All @@ -56,10 +63,33 @@ public Properties topicConfigs(String topic) {
}
}

@Override
public Map<String, Properties> topicConfigs(Set<String> topics) {
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);

AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);

Map<String, Properties> topicConfigs = new HashMap<>();
for (String topic : topics) {
try {
Properties topicConfig = adminZkClient.fetchEntityConfig(ConfigType.Topic(), topic);
topicConfigs.put(topic, topicConfig);
} catch (Exception e) {
LOG.warn("Unable to retrieve config for topic '{}'", topic, e);
}
}

KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(kafkaZkClient);
return topicConfigs;
}

@Override
public Map<String, Properties> allTopicConfigs() {
KafkaZkClient kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(_connectString,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_GROUP,
ZK_KAFKA_TOPIC_CONFIG_PROVIDER_METRIC_TYPE,
_zkSecurityEnabled);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.linkedin.cruisecontrol.common.CruiseControlConfigurable;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

import org.apache.kafka.common.annotation.InterfaceStability;


Expand All @@ -23,12 +25,19 @@ public interface TopicConfigProvider extends CruiseControlConfigurable, AutoClos
Properties clusterConfigs();

/**
* Get topic-level configs for the requested topic.
* @param topic Topic for which the topic-level configs are requested.
* @return Topic-level configs for the requested topic.
* Get topic-level configurations for the requested topic.
* @param topic Topic name for which the topic-level configurations are required.
* @return A {@link Properties} instance containing the topic-level configuration for the requested topic.
*/
Properties topicConfigs(String topic);

/**
* Get the topic-level configurations for the requested topics.
* @param topics The set of topic names for which the topic-level configurations are required.
* @return A map from the topic name to a {@link Properties} instance containing that topic's configuration.
*/
Map<String, Properties> topicConfigs(Set<String> topics);

/**
* @return Topic-level configs for all topics.
*/
Expand Down

0 comments on commit 83ecc95

Please sign in to comment.