Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Newrelic-57520 kafka streams #1170

Merged
merged 40 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
fd40eb0
add kafka-streams-2.6.0 instrumentation module
obenkenobi Jan 24, 2023
b710557
add kafka-streams-1.0.0 instrumentation module
obenkenobi Jan 24, 2023
633a63a
add missing copyright comments
obenkenobi Jan 24, 2023
991d0ee
Merge branch 'main' of github.com:newrelic/newrelic-java-agent into N…
obenkenobi Jan 24, 2023
8f26985
add kafka-streams-0.11.0 instrumentation module
obenkenobi Jan 24, 2023
8d8e9da
Prevent kafka-streams:0.10.2.2 from applying to 2.6.0 module
obenkenobi Jan 24, 2023
a4486bd
change distributed trace header passing (still needs to improve)
obenkenobi Jan 25, 2023
d082f2a
add missing imports
obenkenobi Jan 25, 2023
72532c3
uncomment utils call from spans instrumentation
obenkenobi Jan 25, 2023
bb5d60c
undo kafka clients spans changes
obenkenobi Jan 26, 2023
c20ef1a
fix missing distributed tracing & removed older version streams modul…
obenkenobi Jan 26, 2023
00f2707
distributed tracing on streams now properly connects on both the prod…
obenkenobi Jan 27, 2023
57d0195
refactor header removal
obenkenobi Jan 27, 2023
8f03251
undo kafka-clients-spans changes & start txn in event loop
obenkenobi Feb 2, 2023
aca3b8a
remove unneeded SinkNode span & throw back event loop error
obenkenobi Feb 6, 2023
aad332e
ignore transaction if nothing polled & processed
obenkenobi Feb 6, 2023
2ed880b
report to external outgoing topics
obenkenobi Feb 9, 2023
348d3eb
Merge branch 'main' of github.com:newrelic/newrelic-java-agent into N…
obenkenobi Feb 13, 2023
4a8108a
refactor to have per topic segments
obenkenobi Feb 14, 2023
6e51c6b
include records add tp txn trace
obenkenobi Feb 15, 2023
42b66d2
add metrics exporter & rename add records segment
obenkenobi Feb 16, 2023
adf29cd
split kafka streams module into spans & metrics modules
obenkenobi Feb 16, 2023
8bb9410
rename transaction and metrics
obenkenobi Feb 17, 2023
f703c7c
update metric names
obenkenobi Feb 17, 2023
32ee11d
adding application id to transaction name
obenkenobi Feb 21, 2023
b28227d
add client.id to txn name
obenkenobi Feb 22, 2023
eaf63d0
change streams matric prefix to Kafka/Streams/
obenkenobi Feb 24, 2023
7a5e1c0
add streams spans modules up until 2.0.0
obenkenobi Feb 24, 2023
6187517
add un-versioned files from last streams commit for 2.0.0
obenkenobi Feb 24, 2023
8ea1cc2
Merge branch 'main' of github.com:newrelic/newrelic-java-agent into N…
obenkenobi Feb 24, 2023
e5738ff
refactor settings.gradle for kafka streams spans 2.1.0
obenkenobi Feb 24, 2023
b2157da
added kafka streams tests & increased clients test timing
obenkenobi Feb 27, 2023
92fc585
rename kafka streams tests & add streams metric test
obenkenobi Mar 1, 2023
aec7ddf
rename kafka streams tests
obenkenobi Mar 1, 2023
a05919f
Merge branch 'main' of github.com:newrelic/newrelic-java-agent into N…
obenkenobi Mar 1, 2023
6507a19
add readme and better documentation for kafka streams spans
obenkenobi Mar 1, 2023
8b3d9c5
reformat kafka streams spans Readme.md
obenkenobi Mar 1, 2023
50f55b8
re-ignore kafka clients test
obenkenobi Mar 1, 2023
2540ce9
add more readmes to kafka streams spans modules
obenkenobi Mar 2, 2023
bf59176
removed debug code & updated method/variable naming for Kafka Streams
obenkenobi Mar 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
@InstrumentationTestConfig(includePrefixes = "org.apache.kafka")
public class Kafka3MessageTest {
@Rule
public KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.0"));
public KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));

private final String TOPIC = "life-universe-everything";
private final String ANOTHER_TOPIC = "vogon-poetry";
Expand Down Expand Up @@ -84,7 +84,7 @@ private boolean readMessages() {
consumer.subscribe(Collections.singleton(TOPIC));

// setting a timeout so this does not drag forever if something goes wrong.
long waitUntil = System.currentTimeMillis() + 5000L;
long waitUntil = System.currentTimeMillis() + 15000L;
while (waitUntil > System.currentTimeMillis()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
messagesRead += records.count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@

package org.apache.kafka.clients.producer;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.agent.bridge.NoOpDistributedTracePayload;
import com.newrelic.agent.bridge.Transaction;
import com.newrelic.api.agent.DistributedTracePayload;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.Transaction;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Future;

@Weave(originalName = "org.apache.kafka.clients.producer.KafkaProducer")
public class KafkaProducer_Instrumentation<K, V> {

Expand Down
22 changes: 22 additions & 0 deletions instrumentation/kafka-streams-metrics-1.0.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
jar {
manifest {
attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-streams-metrics-1.0.0',
'Implementation-Title-Alias': 'kafka-streams-metrics'
}
}

dependencies {
implementation(project(":agent-bridge"))
implementation("org.apache.kafka:kafka-streams:1.0.0")

testImplementation("org.testcontainers:kafka:1.16.3")
}

verifyInstrumentation {
passesOnly 'org.apache.kafka:kafka-streams:[1.0.0,)'
}

site {
title 'Kafka'
type 'Messaging'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package com.nr.instrumentation.kafka.streams;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

public class NewRelicMetricsReporter implements MetricsReporter {

private static final boolean KAFKA_METRICS_DEBUG = NewRelic.getAgent().getConfig().getValue("kafka.metrics.debug.enabled", false);

private static final boolean METRICS_AS_EVENTS = NewRelic.getAgent().getConfig().getValue("kafka.metrics.as_events.enabled", false);

private static final long REPORTING_INTERVAL_IN_SECONDS = NewRelic.getAgent().getConfig().getValue("kafka.metrics.interval", 30);

private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, buildThreadFactory("com.nr.instrumentation.kafka.streams.NewRelicMetricsReporter-%d"));

private final Map<String, KafkaMetric> metrics = new ConcurrentHashMap<>();

@Override
public void init(final List<KafkaMetric> initMetrics) {
for (KafkaMetric kafkaMetric : initMetrics) {
String metricGroupAndName = getMetricGroupAndName(kafkaMetric);
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "init(): {0} = {1}", metricGroupAndName, kafkaMetric.metricName());
}
metrics.put(metricGroupAndName, kafkaMetric);
}

final String metricPrefix = "Kafka/Streams/";
executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
Map<String, Object> eventData = new HashMap<>();
for (final Map.Entry<String, KafkaMetric> metric : metrics.entrySet()) {
Object metricValue = metric.getValue().metricValue();
if (metricValue instanceof Double) {
final float value = ((Double) metricValue).floatValue();
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "getMetric: {0} = {1}", metric.getKey(), value);
}
if (!Float.isNaN(value) && !Float.isInfinite(value)) {
if (METRICS_AS_EVENTS) {
eventData.put(metric.getKey().replace('/', '.'), value);
} else {
NewRelic.recordMetric(metricPrefix + metric.getKey(), value);
}
}
}
}

if (METRICS_AS_EVENTS) {
NewRelic.getAgent().getInsights().recordCustomEvent("KafkaStreamsMetrics", eventData);
}
} catch (Exception e) {
AgentBridge.getAgent().getLogger().log(Level.FINE, e, "Unable to record kafka metrics");
}
}
}, 0L, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
}

@Override
public void metricChange(final KafkaMetric metric) {
String metricGroupAndName = getMetricGroupAndName(metric);
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "metricChange(): {0} = {1}", metricGroupAndName, metric.metricName());
}
metrics.put(metricGroupAndName, metric);
}

@Override
public void metricRemoval(final KafkaMetric metric) {
String metricGroupAndName = getMetricGroupAndName(metric);
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "metricRemoval(): {0} = {1}", metricGroupAndName, metric.metricName());
}
metrics.remove(metricGroupAndName);
}

private String getMetricGroupAndName(final KafkaMetric metric) {
if (metric.metricName().tags().containsKey("topic")) {
// Special case for handling topic names in metrics
return metric.metricName().group() + "/" + metric.metricName().tags().get("topic") + "/" + metric.metricName().name();
}
return metric.metricName().group() + "/" + metric.metricName().name();
}

@Override
public void close() {
executor.shutdown();
metrics.clear();
}

@Override
public void configure(final Map<String, ?> configs) {
}

private static ThreadFactory buildThreadFactory(final String nameFormat) {
// fail fast if the format is invalid
String.format(nameFormat, 0);

final ThreadFactory factory = Executors.defaultThreadFactory();
final AtomicInteger count = new AtomicInteger();

return new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
final Thread thread = factory.newThread(runnable);
thread.setName(String.format(nameFormat, count.incrementAndGet()));
thread.setDaemon(true);
return thread;
}
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package org.apache.kafka.streams;

import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.kafka.streams.NewRelicMetricsReporter;
import org.apache.kafka.common.metrics.Metrics;

@Weave(originalName = "org.apache.kafka.streams.KafkaStreams")
public class KafkaStreams_Instrumentation {
private final Metrics metrics = Weaver.callOriginal();

@NewField
private boolean nrMetricsInitialized;

@WeaveAllConstructors
public KafkaStreams_Instrumentation() {
if (!nrMetricsInitialized) {
metrics.addReporter(new NewRelicMetricsReporter());
nrMetricsInitialized = true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import com.newrelic.agent.introspec.InstrumentationTestConfig;
import com.newrelic.agent.introspec.InstrumentationTestRunner;
import com.newrelic.agent.introspec.MetricsHelper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static org.junit.Assert.assertTrue;

@RunWith(InstrumentationTestRunner.class)
@InstrumentationTestConfig(includePrefixes = {"org.apache.kafka.streams"})
public class KafkaStreams1MetricsTest {
@Rule
public KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"));

private final String TOPIC = "life-universe-everything";
private final String OUTPUT_TOPIC = "vogon-poetry";

@Before
public void before() {
kafkaContainer.start();
}

@After
public void after() {
kafkaContainer.stop();
}

@Test
public void testStreams() throws ExecutionException, InterruptedException {
sendMessages();
runStreams();
assertMetrics();

}

private void sendMessages() throws ExecutionException, InterruptedException {
try (KafkaProducer<String, String> producer = KafkaStreamsHelper.newProducer(kafkaContainer)) {
List<Future<RecordMetadata>> futures = Arrays.asList(
producer.send(new ProducerRecord<>(TOPIC, "Life, don't talk to me about life.")),
producer.send(new ProducerRecord<>(TOPIC, "Don't Panic")),
producer.send(new ProducerRecord<>(OUTPUT_TOPIC, "Oh freddled gruntbuggly"))
);
for (Future<RecordMetadata> future : futures) {
future.get();
}
}
}

private void runStreams() throws InterruptedException {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
stream.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams kafkaStreams = KafkaStreamsHelper.newKafkaStreams(builder.build(), kafkaContainer);
try {
kafkaStreams.start();
Thread.sleep(20000);
} finally {
kafkaStreams.close();
}
}

private void assertMetrics() {
int metricCount = MetricsHelper.getUnscopedMetricCount("Kafka/Streams/kafka-metrics-count/count");
assertTrue("Metric count for \"Kafka/Streams/kafka-metrics-count/count\" is not greater than or equal 1", metricCount >= 1);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.testcontainers.containers.KafkaContainer;

import java.util.Properties;

public class KafkaStreamsHelper {
public static final String APPLICATION_ID = "test-streams-app";
public static final String CLIENT_ID = "test-client-id";
public static KafkaProducer<String, String> newProducer(KafkaContainer kafkaContainer) {
Properties props = getProps(kafkaContainer.getBootstrapServers(), true);
return new KafkaProducer<>(props);
}

public static KafkaStreams newKafkaStreams(Topology topology, KafkaContainer kafkaContainer) {
Properties props = getProps(kafkaContainer.getBootstrapServers(), false);
return new KafkaStreams(topology, props);
}

public static Properties getProps(String bootstrapServers, boolean isClientProps) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
if (isClientProps) {
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test-consumer-group");
} else {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
props.put(StreamsConfig.CLIENT_ID_CONFIG, CLIENT_ID);
}
return props;
}

}
16 changes: 16 additions & 0 deletions instrumentation/kafka-streams-spans-2.0.0/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Kafka Streams Spans instrumentation
Kafka Streams is a library that runs on top of kafka clients to stream and process data from kafka.
This instrumentation module creates transaction every time records gets polled and then processed from kafka.

## Troubleshooting

If you are using kafka streams and encounter a transaction with the name: `Kafka/Streams/APPLICATION_ID_UNKNOWN`,
here are the possible causes:

1. You are using at least 2 Kafka Stream instances with the same `client.id` configured but then closed one of the streams instances.
A possible workaround is to give a different `client.id` for each instance. Another is to run each instance in a separate app.
2. If the above does not apply, and you are using the latest Kafka Streams instrumentation module, it is possible this is a silent failure created from a new
Kafka Streams version. This would likely have happened due to Kafka Streams naming their threads differently for the new version. This is because under the hood
we generally name our transactions by parsing the name of the current thread the transaction began in to get the client id. Then we use the client id to access
the `application.id` configured for your Kafka Streams instance. If this happens, please report this as this will signal us that we need a new instrumentation
module for more up-to-date Kafka Streams versions.
Loading