diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java index 47d60eaf1c..801f3a590a 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/KafkaAdminTopicConfigProvider.java @@ -4,8 +4,7 @@ package com.linkedin.kafka.cruisecontrol.config; -import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils; -import java.time.Duration; +import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -15,6 +14,8 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import com.linkedin.kafka.cruisecontrol.config.constants.ExecutorConfig; +import com.linkedin.kafka.cruisecontrol.model.Load; +import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.ConfigEntry; @@ -22,6 +23,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.linkedin.cruisecontrol.common.utils.Utils.validateNotNull; +import static com.linkedin.kafka.cruisecontrol.detector.AnomalyDetectorUtils.KAFKA_CRUISE_CONTROL_OBJECT_CONFIG; + /** * The Kafka topic config provider implementation based on using the Kafka Admin Client for topic level configurations @@ -41,7 +45,6 @@ public class KafkaAdminTopicConfigProvider extends JsonFileTopicConfigProvider { public static final String CLUSTER_CONFIGS_FILE = "cluster.configs.file"; private Properties _clusterConfigs; private AdminClient _adminClient; - private long _adminTimeoutMs; @Override public Properties clusterConfigs() { @@ -57,7 +60,7 @@ public Properties topicConfigs(String topic) { topicConfig = _adminClient .describeConfigs(Collections.singletonList(topicResource)) .all() - .get(_adminTimeoutMs, TimeUnit.MILLISECONDS) + .get() .get(topicResource); } catch (InterruptedException e) { LOG.error("The request for the configuration of topic '{}' was interrupted", topic); @@ -65,9 +68,6 @@ public Properties topicConfigs(String topic) { } catch (ExecutionException e) { LOG.error("The request for the configuration of topic '{}' failed", topic); e.printStackTrace(); - } catch (TimeoutException e) { - LOG.error("The request for the configuration of topic '{}' timed out", topic); - e.printStackTrace(); } if (topicConfig != null) { @@ -91,17 +91,14 @@ public Map allTopicConfigs() { topicNameSet.stream().map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)).collect(Collectors.toList()) ).all() ) - .get(_adminTimeoutMs, TimeUnit.MILLISECONDS) - .get(_adminTimeoutMs, TimeUnit.MILLISECONDS); + .get() + .get(); } catch (InterruptedException e) { LOG.error("The request for the configuration of all topics was interrupted"); e.printStackTrace(); } catch (ExecutionException e) { LOG.error("The request for the configuration of all topics failed"); e.printStackTrace(); - } catch (TimeoutException e) { - LOG.error("The request for the configuration of all topics timed out"); - e.printStackTrace(); } Map propsMap = new HashMap<>(); @@ -128,14 +125,14 @@ private static Properties convertTopicConfigToProperties(Config config) { @Override public void configure(Map configs) { - KafkaCruiseControlConfig ccConfig = new KafkaCruiseControlConfig(configs); - _adminTimeoutMs = ccConfig.getConfiguredInstance(ExecutorConfig.ADMIN_CLIENT_REQUEST_TIMEOUT_MS_CONFIG, Integer.class); - _adminClient = KafkaCruiseControlUtils.createAdminClient(KafkaCruiseControlUtils.parseAdminClientConfigs(ccConfig)); + _adminClient = (AdminClient) validateNotNull( + configs.get(LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG), + () -> String.format("Missing %s when creating Kafka Admin Client based Topic Config Provider", LoadMonitor.KAFKA_ADMIN_CLIENT_OBJECT_CONFIG)); _clusterConfigs = loadClusterConfigs(configs, CLUSTER_CONFIGS_FILE); } @Override public void close() { - _adminClient.close(Duration.ofMillis(KafkaCruiseControlUtils.ADMIN_CLIENT_CLOSE_TIMEOUT_MS)); + //no-op } } diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java index 7d6c69ce3d..7b136b7877 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/LoadMonitor.java @@ -82,6 +82,7 @@ public class LoadMonitor { // Metadata TTL is set based on experience -- i.e. a short TTL with large metadata may cause excessive load on brokers. private static final long METADATA_TTL = TimeUnit.SECONDS.toMillis(10); private static final long METADATA_REFRESH_BACKOFF = TimeUnit.SECONDS.toMillis(5); + public static final String KAFKA_ADMIN_CLIENT_OBJECT_CONFIG = "kafka.admin.client.object"; // The maximum time allowed to make a state update. If the state value cannot be updated in time it will be invalidated. // TODO: Make this configurable. private final long _monitorStateUpdateTimeoutMs; @@ -152,8 +153,11 @@ public LoadMonitor(KafkaCruiseControlConfig config, Time time, MetricRegistry dr BrokerCapacityConfigResolver.class); long monitorStateUpdateIntervalMs = config.getLong(MonitorConfig.MONITOR_STATE_UPDATE_INTERVAL_MS_CONFIG); _monitorStateUpdateTimeoutMs = 10 * monitorStateUpdateIntervalMs; - _topicConfigProvider = config.getConfiguredInstance(MonitorConfig.TOPIC_CONFIG_PROVIDER_CLASS_CONFIG, - TopicConfigProvider.class); + _topicConfigProvider = config.getConfiguredInstance( + MonitorConfig.TOPIC_CONFIG_PROVIDER_CLASS_CONFIG, + TopicConfigProvider.class, + Collections.singletonMap(KAFKA_ADMIN_CLIENT_OBJECT_CONFIG, _adminClient) + ); _partitionMetricSampleAggregator = new KafkaPartitionMetricSampleAggregator(config, metadataClient.metadata()); _brokerMetricSampleAggregator = new KafkaBrokerMetricSampleAggregator(config);