Skip to content

Commit

Permalink
Label thread pool names with tracing info
Browse files Browse the repository at this point in the history
Elasticsearch clients can supply X-Opaque-Id or trace.id tracing headers
in requests to help with debugging issues. This change adds any tracing
headers into the thread name for the duration of the request, such
that this information can be visible in HotThreads or while using other
profiling tools (e.g. JFR/Mission Control, VisualVM, etc. ).

Closes elastic#74580
  • Loading branch information
Nikola Grcevski committed Sep 15, 2021
1 parent 7ae4f35 commit 43f4821
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -111,15 +112,8 @@ public StoredContext stashContext() {
* Otherwise when context is stash, it should be empty.
*/

if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID) || context.requestHeaders.containsKey(Task.TRACE_ID)) {
Map<String, String> map = new HashMap<>(2, 1);
if (context.requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
map.put(Task.X_OPAQUE_ID, context.requestHeaders.get(Task.X_OPAQUE_ID));
}
if (context.requestHeaders.containsKey(Task.TRACE_ID)) {
map.put(Task.TRACE_ID, context.requestHeaders.get(Task.TRACE_ID));
}
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(map);
if (context.hasTracingIdInHeaders()) {
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(context.tracingHeaders());
threadLocal.set(threadContextStruct);
}
else {
Expand Down Expand Up @@ -657,12 +651,60 @@ private void writeTo(StreamOutput out, Map<String, String> defaultHeaders) throw

out.writeMap(responseHeaders, StreamOutput::writeString, StreamOutput::writeStringCollection);
}

boolean hasTracingIdInHeaders() {
return (requestHeaders.containsKey(Task.X_OPAQUE_ID) || requestHeaders.containsKey(Task.TRACE_ID));
}

Map<String, String> tracingHeaders() {
Map<String, String> result = new HashMap<>(2, 1);
if (requestHeaders.containsKey(Task.X_OPAQUE_ID)) {
result.put(Task.X_OPAQUE_ID, requestHeaders.get(Task.X_OPAQUE_ID));
}
if (requestHeaders.containsKey(Task.TRACE_ID)) {
result.put(Task.TRACE_ID, requestHeaders.get(Task.TRACE_ID));
}

return result;
}
}

public interface WithTracingIdsInThreadName {
String ID_LABEL = " tracing-ids=";

private String tracingHeadersAsString(ThreadContextStruct context) {
Map<String, String> tracingHeaders = context.tracingHeaders();
StringBuilder resultBuilder = new StringBuilder();
tracingHeaders.forEach((key, value) -> resultBuilder.append(String.format(Locale.ROOT, "[%s:%s]", key, value)));

return resultBuilder.toString();
}

default boolean addLabelToThreadName(ThreadContextStruct context) {
String originalName = Thread.currentThread().getName();
// The current thread may wrap Runnables multiple times.
// We return true or false to establish which call sets/restores the thread name.
if (context.hasTracingIdInHeaders() && originalName.contains(ID_LABEL) == false) {
Thread.currentThread().setName(originalName + ID_LABEL + tracingHeadersAsString(context));
return true;
}

return false;
}

default void clearThreadNameLabel() {
String threadName = Thread.currentThread().getName();
if (threadName.contains(ID_LABEL)) {
String originalName = threadName.split(ID_LABEL)[0];
Thread.currentThread().setName(originalName);
}
}
}

/**
* Wraps a Runnable to preserve the thread context.
*/
private class ContextPreservingRunnable implements WrappedRunnable {
private class ContextPreservingRunnable implements WrappedRunnable, WithTracingIdsInThreadName {
private final Runnable in;
private final ThreadContext.StoredContext ctx;

Expand All @@ -673,9 +715,15 @@ private ContextPreservingRunnable(Runnable in) {

@Override
public void run() {
boolean labelledThread = false;
try (ThreadContext.StoredContext ignore = stashContext()){
ctx.restore();
labelledThread = addLabelToThreadName(threadLocal.get());
in.run();
} finally {
if (labelledThread) {
clearThreadNameLabel();
}
}
}

Expand All @@ -693,7 +741,7 @@ public Runnable unwrap() {
/**
* Wraps an AbstractRunnable to preserve the thread context.
*/
private class ContextPreservingAbstractRunnable extends AbstractRunnable implements WrappedRunnable {
private class ContextPreservingAbstractRunnable extends AbstractRunnable implements WrappedRunnable, WithTracingIdsInThreadName {
private final AbstractRunnable in;
private final ThreadContext.StoredContext creatorsContext;

Expand Down Expand Up @@ -734,7 +782,14 @@ public void onRejection(Exception e) {
protected void doRun() throws Exception {
threadsOriginalContext = stashContext();
creatorsContext.restore();
in.doRun();
boolean labelledThread = addLabelToThreadName(threadLocal.get());
try {
in.doRun();
} finally {
if (labelledThread) {
clearThreadNameLabel();
}
}
}

@Override
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/elasticsearch/monitor/jvm/HotThreads.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.Task;

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
Expand Down Expand Up @@ -278,6 +279,17 @@ String innerDetect(ThreadMXBean threadBean, long currentThreadId) throws Excepti
}
}
}

// Short-lived requests are in general not tracked accurately in HotThreads. The thread pools
// will reuse the thread-ids across multiple short requests, so the reported stacks and in this case
// tracing-ids are on best effort basis. For true performance bottlenecks the information will be accurate.
sb.append('\n')
.append("Note: Any reported tracing IDs (e.g. ")
.append(Task.X_OPAQUE_ID)
.append(',')
.append(Task.TRACE_ID)
.append(") may not be accurate for short-lived requests.");

return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.HeaderWarning;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
Expand Down Expand Up @@ -695,4 +696,172 @@ protected void doRun() throws Exception {
}
};
}

public void testThreadNamingWithTracingIds() {
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Runnable withContext;

// an abstract runnable that has X-Opaque-Id in its header.
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
threadContext.putHeader(Task.X_OPAQUE_ID, "some_id");
withContext = threadContext.preserveContext(new AbstractRunnable() {

@Override
public void onAfter() {
assertFalse(Thread.currentThread().getName().contains("some_id"));
assertFalse(Thread.currentThread().getName().contains(Task.X_OPAQUE_ID));
assertFalse(Thread.currentThread().getName().contains(ThreadContext.WithTracingIdsInThreadName.ID_LABEL));
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException("from onFailure", e);
}

@Override
protected void doRun() {
// Assertions are errors, turn this into exception to make sure we get the error on failure
try {
assertTrue(Thread.currentThread().getName().contains("some_id"));
assertTrue(Thread.currentThread().getName().contains(Task.X_OPAQUE_ID));
assertTrue(Thread.currentThread().getName().contains(ThreadContext.WithTracingIdsInThreadName.ID_LABEL));
} catch (Throwable t) {
throw new RuntimeException("Assertion failed " + t.getMessage());
}
}
});
}

withContext.run();

// an abstract runnable that has trace.id in its header
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
threadContext.putHeader(Task.TRACE_ID, "some_trace_id");
withContext = threadContext.preserveContext(new AbstractRunnable() {

@Override
public void onAfter() {
assertFalse(Thread.currentThread().getName().contains("some_trace_id"));
assertFalse(Thread.currentThread().getName().contains(Task.TRACE_ID));
assertFalse(Thread.currentThread().getName().contains(ThreadContext.WithTracingIdsInThreadName.ID_LABEL));
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException("from onFailure", e);
}

@Override
protected void doRun() {
try {
assertTrue(Thread.currentThread().getName().contains("some_trace_id"));
assertTrue(Thread.currentThread().getName().contains(Task.TRACE_ID));
assertTrue(Thread.currentThread().getName().contains(ThreadContext.WithTracingIdsInThreadName.ID_LABEL));
} catch (Throwable t) {
throw new RuntimeException("Assertion failed " + t.getMessage());
}
}
});
}

withContext.run();

// an abstract runnable that has both X-Opaque-Id and trace.id in its headers,
// but this time they are in the parent caller context
ThreadContext outsideContext = new ThreadContext(Settings.EMPTY);
outsideContext.putHeader(Task.TRACE_ID, "some_trace_id");
outsideContext.putHeader(Task.X_OPAQUE_ID, "some_opaque_id");

try (ThreadContext.StoredContext ignored = outsideContext.stashContext()) {
withContext = outsideContext.preserveContext(new AbstractRunnable() {

@Override
public void onAfter() {
assertFalse(Thread.currentThread().getName().contains("some_trace_id"));
assertFalse(Thread.currentThread().getName().contains("some_opaque_id"));
assertFalse(Thread.currentThread().getName().contains(Task.TRACE_ID));
assertFalse(Thread.currentThread().getName().contains(Task.X_OPAQUE_ID));
assertFalse(Thread.currentThread().getName().contains(ThreadContext.WithTracingIdsInThreadName.ID_LABEL));
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException("from onFailure", e);
}

@Override
protected void doRun() {
try {
assertTrue(Thread.currentThread().getName().contains("some_trace_id"));
assertTrue(Thread.currentThread().getName().contains("some_opaque_id"));
assertTrue(Thread.currentThread().getName().contains(Task.TRACE_ID));
assertTrue(Thread.currentThread().getName().contains(Task.X_OPAQUE_ID));
assertTrue(Thread.currentThread().getName().contains(ThreadContext.WithTracingIdsInThreadName.ID_LABEL));
} catch (Throwable t) {
throw new RuntimeException("Assertion failed " + t.getMessage());
}
}
});
}

withContext.run();

// Make sure naming works with Runnable that's not wrapped in AbstractRunnable
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
threadContext.putHeader(Task.X_OPAQUE_ID, "foo-bar-id");
withContext = threadContext.preserveContext(() -> {
assertTrue(Thread.currentThread().getName().contains("foo-bar-id"));
assertTrue(Thread.currentThread().getName().contains(Task.X_OPAQUE_ID));
assertTrue(Thread.currentThread().getName().contains(ThreadContext.WithTracingIdsInThreadName.ID_LABEL));
});
}

withContext.run();

// Ensure we cleaned up the thread name in case of an exception in the runnable
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
threadContext.putHeader(Task.X_OPAQUE_ID, "some_id");
withContext = threadContext.preserveContext(new AbstractRunnable() {

@Override
public void onAfter() {
assertFalse(Thread.currentThread().getName().contains("some_id"));
assertFalse(Thread.currentThread().getName().contains(Task.X_OPAQUE_ID));
assertFalse(Thread.currentThread().getName().contains(ThreadContext.WithTracingIdsInThreadName.ID_LABEL));
}

@Override
public void onFailure(Exception e) {
assertFalse(Thread.currentThread().getName().contains("some_id"));
assertFalse(Thread.currentThread().getName().contains(Task.X_OPAQUE_ID));
assertFalse(Thread.currentThread().getName().contains(ThreadContext.WithTracingIdsInThreadName.ID_LABEL));
throw new RuntimeException("from onFailure", e);
}

@Override
protected void doRun() throws Exception {
assertTrue(Thread.currentThread().getName().contains("some_id"));
assertTrue(Thread.currentThread().getName().contains(Task.X_OPAQUE_ID));
assertTrue(Thread.currentThread().getName().contains(ThreadContext.WithTracingIdsInThreadName.ID_LABEL));
throw new Exception("from doRun");
}
});
}

RuntimeException e = expectThrows(RuntimeException.class, withContext::run);
assertEquals("from onFailure", e.getMessage());
assertEquals("from doRun", e.getCause().getMessage());

// Make sure there's no thread label if no tracing ids were present in the headers
try (ThreadContext.StoredContext ignored = threadContext.stashContext()) {
threadContext.putHeader("foo", "bar");
withContext = threadContext.preserveContext(() -> {
assertFalse(Thread.currentThread().getName().contains("bar"));
assertFalse(Thread.currentThread().getName().contains(ThreadContext.WithTracingIdsInThreadName.ID_LABEL));
});
}

withContext.run();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public void testEnsureInnerDetectSkipsCurrentThread() throws Exception {

String innerResult = hotThreads.innerDetect(mockedMXBean, mockCurrentThreadId);

assertEquals(1, innerResult.lines().count());
assertEquals(3, innerResult.lines().count());
}

public void testReportTypeValueGetter() {
Expand Down

0 comments on commit 43f4821

Please sign in to comment.