diff --git a/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java b/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java index 2269c15463..ff592e302b 100644 --- a/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java +++ b/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java @@ -9,6 +9,8 @@ import com.newrelic.agent.bridge.AgentBridge; import com.newrelic.api.agent.NewRelic; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,6 +36,16 @@ public class NewRelicMetricsReporter implements MetricsReporter { private final Map metrics = new ConcurrentHashMap<>(); + private final List nodes; + + public NewRelicMetricsReporter() { + this.nodes = Collections.emptyList(); + } + + public NewRelicMetricsReporter(List nodes) { + this.nodes = nodes; + } + @Override public void init(final List initMetrics) { for (KafkaMetric kafkaMetric : initMetrics) { @@ -45,6 +57,16 @@ public void init(final List initMetrics) { } final String metricPrefix = "MessageBroker/Kafka/Internal/"; + + final String nodePrefix = "MessageBroker/Kafka/Nodes/"; + final List nodeMetricNames = new ArrayList(nodes.size()); + final List nodeMetricAsEventsNames = new ArrayList(nodes.size()); + for (String node : nodes) { + String metricName = nodePrefix + node; + nodeMetricNames.add(metricName); + nodeMetricAsEventsNames.add(metricName.replace('/', '.')); + } + executor.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -66,7 +88,15 @@ public void run() { } } } - + if (metricsAsEvents) { + for (String nodeMetric : nodeMetricAsEventsNames) { + eventData.put(nodeMetric, 1f); + } + } else { + for (String nodeMetric : nodeMetricNames) { + NewRelic.recordMetric(nodeMetric, 1f); + } + } if (metricsAsEvents) { NewRelic.getAgent().getInsights().recordCustomEvent("KafkaMetrics", eventData); } diff --git a/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer_Instrumentation.java b/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer_Instrumentation.java index 314c1b79c2..83a5219162 100644 --- a/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer_Instrumentation.java +++ b/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer_Instrumentation.java @@ -8,8 +8,14 @@ package org.apache.kafka.clients.consumer; import java.time.Duration; +import java.util.List; +import java.util.ArrayList; + +import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.Node; import com.newrelic.agent.bridge.AgentBridge; import com.newrelic.api.agent.DestinationType; @@ -20,20 +26,26 @@ import com.newrelic.api.agent.weaver.WeaveAllConstructors; import com.newrelic.api.agent.weaver.Weaver; import com.nr.instrumentation.kafka.NewRelicMetricsReporter; -import org.apache.kafka.common.utils.Timer; @Weave(originalName = "org.apache.kafka.clients.consumer.KafkaConsumer") public class KafkaConsumer_Instrumentation { private final Metrics metrics = Weaver.callOriginal(); + private final ConsumerMetadata metadata = Weaver.callOriginal(); + @NewField private boolean initialized; @WeaveAllConstructors public KafkaConsumer_Instrumentation() { if (!initialized) { - metrics.addReporter(new NewRelicMetricsReporter()); + List nodes = metadata.fetch().nodes(); + List nodeNames = new ArrayList<>(nodes.size()); + for (Node node : nodes) { + nodeNames.add(node.host() + ":" + node.port()); + } + metrics.addReporter(new NewRelicMetricsReporter(nodeNames)); initialized = true; } } diff --git a/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java b/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java index 62e7a3a3c8..08064053b8 100644 --- a/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java +++ b/instrumentation/kafka-clients-metrics-3.0.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java @@ -7,6 +7,10 @@ package org.apache.kafka.clients.producer; +import org.apache.kafka.clients.producer.internals.ProducerMetadata; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.Node; + import com.newrelic.agent.bridge.AgentBridge; import com.newrelic.api.agent.DestinationType; import com.newrelic.api.agent.MessageProduceParameters; @@ -18,24 +22,32 @@ import com.newrelic.api.agent.weaver.Weaver; import com.nr.instrumentation.kafka.CallbackWrapper; import com.nr.instrumentation.kafka.NewRelicMetricsReporter; -import org.apache.kafka.common.metrics.Metrics; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Future; +import java.util.List; +import java.util.ArrayList; @Weave(originalName = "org.apache.kafka.clients.producer.KafkaProducer") public class KafkaProducer_Instrumentation { private final Metrics metrics = Weaver.callOriginal(); + private final ProducerMetadata metadata = Weaver.callOriginal(); + @NewField private boolean initialized; @WeaveAllConstructors public KafkaProducer_Instrumentation() { if (!initialized) { - metrics.addReporter(new NewRelicMetricsReporter()); + List nodes = metadata.fetch().nodes(); + List nodeNames = new ArrayList<>(nodes.size()); + for (Node node : nodes) { + nodeNames.add(node.host() + ":" + node.port()); + } + metrics.addReporter(new NewRelicMetricsReporter(nodeNames)); initialized = true; } }