Skip to content

Commit

Permalink
Switched KafkaAdminTopicConfigProvider to use provided Admin Client i…
Browse files Browse the repository at this point in the history
…nstance from LoadMonitor

Signed-off-by: Thomas Cooper <[email protected]>
  • Loading branch information
tomncooper committed Jun 3, 2021
1 parent 44727ee commit 0e42039
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

package com.linkedin.kafka.cruisecontrol.config;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import java.time.Duration;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -15,13 +14,18 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig;
import com.linkedin.kafka.cruisecontrol.model.Load;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull;
import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG;


/**
* The Kafka topic config provider implementation based on using the Kafka Admin Client for topic level configurations
Expand All @@ -41,7 +45,6 @@ public class KafkaAdminTopicConfigProvider extends JsonFileTopicConfigProvider {
public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file";
private Properties _clusterConfigs;
private AdminClient _adminClient;
private long _adminTimeoutMs;

@Override
public Properties clusterConfigs() {
Expand All @@ -57,17 +60,14 @@ public Properties topicConfigs(String topic) {
topicConfig = _adminClient
.describeConfigs(Collections.singletonList(topicResource))
.all()
.get(_adminTimeoutMs, TimeUnit.MILLISECONDS)
.get()
.get(topicResource);
} catch (InterruptedException e) {
LOG.error("The request for the configuration of topic '{}' was interrupted", topic);
e.printStackTrace();
} catch (ExecutionException e) {
LOG.error("The request for the configuration of topic '{}' failed", topic);
e.printStackTrace();
} catch (TimeoutException e) {
LOG.error("The request for the configuration of topic '{}' timed out", topic);
e.printStackTrace();
}

if (topicConfig != null) {
Expand All @@ -91,17 +91,14 @@ public Map<String, Properties> allTopicConfigs() {
topicNameSet.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList())
).all()
)
.get(_adminTimeoutMs, TimeUnit.MILLISECONDS)
.get(_adminTimeoutMs, TimeUnit.MILLISECONDS);
.get()
.get();
} catch (InterruptedException e) {
LOG.error("The request for the configuration of all topics was interrupted");
e.printStackTrace();
} catch (ExecutionException e) {
LOG.error("The request for the configuration of all topics failed");
e.printStackTrace();
} catch (TimeoutException e) {
LOG.error("The request for the configuration of all topics timed out");
e.printStackTrace();
}

Map<String, Properties> propsMap = new HashMap<>();
Expand All @@ -128,14 +125,14 @@ private static Properties convertTopicConfigToProperties(Config config) {

@Override
public void configure(Map<String, ?> configs) {
KafkaCruiseControlConfig ccConfig = new KafkaCruiseControlConfig(configs);
_adminTimeoutMs = ccConfig.getConfiguredInstance(ExecutorConfig.ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG, Integer.class);
_adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(ccConfig));
_adminClient = (AdminClient) validateNotNull(
configs.get(LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG),
() -> String.format("Missing %s when creating Kafka Admin Client based Topic Config Provider", LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG));
_clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE);
}

@Override
public void close() {
_adminClient.close(Duration.ofMillis(KafkaCruiseControlUtils.ADMIN_CLIENT_CLOSE_TIMEOUT_MS));
//no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class LoadMonitor {
// Metadata TTL is set based on experience -- i.e. a short TTL with large metadata may cause excessive load on brokers.
private static final long METADATA_TTL = TimeUnit.SECONDS.toMillis(10);
private static final long METADATA_REFRESH_BACKOFF = TimeUnit.SECONDS.toMillis(5);
public static final String KAFKA_ADMIN_CLIENT_OBJECT_CONFIG = "kafka.admin.client.object";
// The maximum time allowed to make a state update. If the state value cannot be updated in time it will be invalidated.
// TODO: Make this configurable.
private final long _monitorStateUpdateTimeoutMs;
Expand Down Expand Up @@ -152,8 +153,11 @@ public LoadMonitor(KafkaCruiseControlConfig config, Time time, MetricRegistry dr
BrokerCapacityConfigResolver.class);
long monitorStateUpdateIntervalMs = config.getLong(MonitorConfig.MONITOR_STATE_UPDATE_INTERVAL_MS_CONFIG);
_monitorStateUpdateTimeoutMs = 10 * monitorStateUpdateIntervalMs;
_topicConfigProvider = config.getConfiguredInstance(MonitorConfig.TOPIC_CONFIG_PROVIDER_CLASS_CONFIG,
TopicConfigProvider.class);
_topicConfigProvider = config.getConfiguredInstance(
MonitorConfig.TOPIC_CONFIG_PROVIDER_CLASS_CONFIG,
TopicConfigProvider.class,
Collections.singletonMap(KAFKA_ADMIN_CLIENT_OBJECT_CONFIG, _adminClient)
);

_partitionMetricSampleAggregator = new KafkaPartitionMetricSampleAggregator(config, metadataClient.metadata());
_brokerMetricSampleAggregator = new KafkaBrokerMetricSampleAggregator(config);
Expand Down

0 comments on commit 0e42039

Please sign in to comment.