Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce TraceContextHolder, unifying context propagation #417

Merged
merged 6 commits into from
Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public abstract class AbstractSpanImpl implements Span {
@Nonnull
// co.elastic.apm.impl.transaction.AbstractSpan
// co.elastic.apm.agent.impl.transaction.TraceContextHolder
protected final Object span;

AbstractSpanImpl(@Nonnull Object span) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public static Span currentSpan() {
}

private static Object doGetCurrentSpan() {
// co.elastic.apm.api.ElasticApmInstrumentation.CurrentSpanInstrumentation.doGetCurrentSpan
// co.elastic.apm.api.ElasticApmApiInstrumentation.CurrentSpanInstrumentation.doGetCurrentSpan
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class ScopeImpl implements Scope {

@Nonnull
@SuppressWarnings({"FieldCanBeLocal", "unused"})
// co.elastic.apm.agent.impl.transaction.TraceContextHolder
private final Object span;

ScopeImpl(@Nonnull Object span) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import co.elastic.apm.agent.bci.ElasticApmInstrumentation;
import co.elastic.apm.agent.bci.bytebuddy.SimpleMethodSignatureOffsetMappingFactory;
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.TraceContextHolder;
import co.elastic.apm.agent.matcher.WildcardMatcher;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
Expand Down Expand Up @@ -54,7 +55,7 @@ public TraceMethodInstrumentation(MethodMatcher methodMatcher) {
public static void onMethodEnter(@SimpleMethodSignatureOffsetMappingFactory.SimpleMethodSignature String signature,
@Advice.Local("span") AbstractSpan<?> span) {
if (tracer != null) {
final AbstractSpan<?> parent = tracer.activeSpan();
final TraceContextHolder<?> parent = tracer.getActive();
if (parent == null) {
span = tracer.startTransaction()
.withName(signature)
Expand All @@ -71,9 +72,8 @@ public static void onMethodEnter(@SimpleMethodSignatureOffsetMappingFactory.Simp
public static void onMethodExit(@Nullable @Advice.Local("span") AbstractSpan<?> span,
@Advice.Thrown Throwable t) {
if (span != null) {
span.captureException(t)
.deactivate()
.end();
span.captureException(t);
span.deactivate().end();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*-
* #%L
* Elastic APM Java agent
* %%
* Copyright (C) 2018 - 2019 Elastic and contributors
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package co.elastic.apm.agent.impl;

import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.TraceContextHolder;

/**
* A callback for {@link TraceContextHolder} activation and deactivaiton events
*/
public interface ActivationListener {

void init(ElasticApmTracer tracer);

/**
* A callback for {@link TraceContextHolder#activate()}
*
* @param context the {@link TraceContextHolder} which is being activated
* @throws Throwable if there was an error while calling this method
*/
void onActivate(TraceContextHolder<?> context) throws Throwable;

/**
* A callback for {@link TraceContextHolder#deactivate()}
* <p>
* Note: the corresponding span may already be {@link AbstractSpan#end() ended} and {@link AbstractSpan#resetState() recycled}.
* That's why there is no {@link TraceContextHolder} parameter.
* </p>
*
* @throws Throwable if there was an error while calling this method
*/
void onDeactivate() throws Throwable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.Span;
import co.elastic.apm.agent.impl.transaction.TraceContext;
import co.elastic.apm.agent.impl.transaction.TraceContextHolder;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.metrics.MetricRegistry;
import co.elastic.apm.agent.objectpool.Allocator;
Expand Down Expand Up @@ -69,23 +70,23 @@ public class ElasticApmTracer {
// Maintains a stack of all the activated spans
// This way its easy to retrieve the bottom of the stack (the transaction)
// Also, the caller does not have to keep a reference to the previously active span, as that is maintained by the stack
private final ThreadLocal<Deque<Object>> activeStack = new ThreadLocal<Deque<Object>>() {
private final ThreadLocal<Deque<TraceContextHolder<?>>> activeStack = new ThreadLocal<Deque<TraceContextHolder<?>>>() {
felixbarny marked this conversation as resolved.
Show resolved Hide resolved
@Override
protected Deque<Object> initialValue() {
return new ArrayDeque<Object>();
protected Deque<TraceContextHolder<?>> initialValue() {
return new ArrayDeque<TraceContextHolder<?>>();
}
};
private final CoreConfiguration coreConfiguration;
private final List<SpanListener> spanListeners;
private final List<ActivationListener> activationListeners;
private final MetricRegistry metricRegistry = new MetricRegistry();
private Sampler sampler;

ElasticApmTracer(ConfigurationRegistry configurationRegistry, Reporter reporter, Iterable<LifecycleListener> lifecycleListeners, List<SpanListener> spanListeners) {
ElasticApmTracer(ConfigurationRegistry configurationRegistry, Reporter reporter, Iterable<LifecycleListener> lifecycleListeners, List<ActivationListener> activationListeners) {
this.configurationRegistry = configurationRegistry;
this.reporter = reporter;
this.stacktraceConfiguration = configurationRegistry.getConfig(StacktraceConfiguration.class);
this.lifecycleListeners = lifecycleListeners;
this.spanListeners = spanListeners;
this.activationListeners = activationListeners;
int maxPooledElements = configurationRegistry.getConfig(ReporterConfiguration.class).getMaxQueueSize() * 2;
coreConfiguration = configurationRegistry.getConfig(CoreConfiguration.class);
transactionPool = QueueBasedObjectPool.ofRecyclable(AtomicQueueFactory.<Transaction>newQueue(createBoundedMpmc(maxPooledElements)), false,
Expand Down Expand Up @@ -127,8 +128,8 @@ public void onChange(ConfigurationOption<?> configurationOption, Double oldValue
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.start(this);
}
for (SpanListener spanListener : spanListeners) {
spanListener.init(this);
for (ActivationListener activationListener : activationListeners) {
activationListener.init(this);
}
reporter.scheduleMetricReporting(metricRegistry, configurationRegistry.getConfig(ReporterConfiguration.class).getMetricsIntervalMs());
}
Expand Down Expand Up @@ -171,15 +172,6 @@ public Transaction currentTransaction() {
return null;
}

@Nullable
public Span currentSpan() {
final AbstractSpan<?> abstractSpan = activeSpan();
if (abstractSpan instanceof Span) {
return (Span) abstractSpan;
}
return null;
}

/**
* Starts a span with a given parent context.
* <p>
Expand Down Expand Up @@ -219,7 +211,7 @@ public Span startSpan(AbstractSpan<?> parent, long epochMicros) {
} else {
dropped = false;
}
span.start(TraceContext.fromParentSpan(), parent, epochMicros, dropped);
span.start(TraceContext.fromParent(), parent, epochMicros, dropped);
return span;
}

Expand All @@ -228,10 +220,10 @@ private boolean isTransactionSpanLimitReached(Transaction transaction) {
}

public void captureException(@Nullable Throwable e) {
captureException(System.currentTimeMillis() * 1000, e, activeSpan());
captureException(System.currentTimeMillis() * 1000, e, getActive());
}

public void captureException(long epochMicros, @Nullable Throwable e, @Nullable AbstractSpan<?> active) {
public void captureException(long epochMicros, @Nullable Throwable e, @Nullable TraceContextHolder<?> active) {
if (e != null) {
ErrorCapture error = errorPool.createInstance();
error.withTimestamp(epochMicros);
Expand All @@ -247,7 +239,7 @@ else if (active instanceof Span) {
Span span = (Span) active;
error.getContext().getTags().putAll(span.getContext().getTags());
}
error.asChildOf(active);
error.asChildOf(active.getTraceContext());
error.setTransactionSampled(active.isSampled());
} else {
error.getTraceContext().getId().setToRandomValue();
Expand Down Expand Up @@ -344,70 +336,37 @@ public Sampler getSampler() {
}

@Nullable
public AbstractSpan<?> activeSpan() {
final Object active = getActive();
if (active instanceof AbstractSpan) {
return (AbstractSpan<?>) active;
}
return null;
}

@Nullable
public TraceContext activeTraceContext() {
final Object active = getActive();
if (active instanceof TraceContext) {
return (TraceContext) active;
}
return null;
}

@Nullable
public Object getActive() {
public TraceContextHolder<?> getActive() {
felixbarny marked this conversation as resolved.
Show resolved Hide resolved
return activeStack.get().peek();
}

public void registerSpanListener(SpanListener spanListener) {
this.spanListeners.add(spanListener);
public void registerSpanListener(ActivationListener activationListener) {
this.activationListeners.add(activationListener);
}

public List<SpanListener> getSpanListeners() {
return spanListeners;
public List<ActivationListener> getActivationListeners() {
return activationListeners;
}

public void activate(AbstractSpan<?> span) {
public void activate(TraceContextHolder<?> holder) {
if (logger.isDebugEnabled()) {
logger.debug("Activating {} on thread {}", span, Thread.currentThread().getId());
logger.debug("Activating {} on thread {}", holder.getTraceContext(), Thread.currentThread().getId());
}
activeStack.get().push(span);
activeStack.get().push(holder);
}

public void activate(TraceContext traceContext) {
public void deactivate(TraceContextHolder<?> holder) {
if (logger.isDebugEnabled()) {
logger.debug("Activating trace context {} on thread {}", traceContext, Thread.currentThread().getId());
logger.debug("Deactivating {} on thread {}", holder.getTraceContext(), Thread.currentThread().getId());
}
activeStack.get().push(traceContext);
}

public void deactivate(AbstractSpan<?> span) {
if (logger.isDebugEnabled()) {
logger.debug("Deactivating {} on thread {}", span, Thread.currentThread().getId());
}
assertIsActive(span, activeStack.get().poll());
if (span instanceof Transaction) {
assertIsActive(holder, activeStack.get().poll());
if (holder instanceof Transaction) {
// a transaction is always the bottom of this stack
// clearing to avoid potential leaks in case of wrong api usage
activeStack.get().clear();
}
}

public void deactivate(TraceContext traceContext) {
if (logger.isDebugEnabled()) {
logger.debug("Deactivating trace context {} on thread {}",
traceContext, Thread.currentThread().getId());
}
assertIsActive(traceContext, activeStack.get().poll());
}

private void assertIsActive(Object span, @Nullable Object currentlyActive) {
if (span != currentlyActive) {
logger.warn("Deactivating a span ({}) which is not the currently active span ({}). " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public class ElasticApmTracerBuilder {
@Nullable
private Iterable<LifecycleListener> lifecycleListeners;
private Map<String, String> inlineConfig = new HashMap<>();
private List<SpanListener> spanListeners = new ArrayList<>();
private List<ActivationListener> activationListeners = new ArrayList<>();

public ElasticApmTracerBuilder() {
final List<ConfigurationSource> configSources = getConfigSources();
LoggingConfiguration.init(configSources);
logger = LoggerFactory.getLogger(getClass());
for (SpanListener spanListener : ServiceLoader.load(SpanListener.class, ElasticApmTracerBuilder.class.getClassLoader())) {
spanListeners.add(spanListener);
for (ActivationListener activationListener : ServiceLoader.load(ActivationListener.class, ElasticApmTracerBuilder.class.getClassLoader())) {
activationListeners.add(activationListener);
}
}

Expand Down Expand Up @@ -97,7 +97,7 @@ public ElasticApmTracer build() {
if (lifecycleListeners == null) {
lifecycleListeners = ServiceLoader.load(LifecycleListener.class, getClass().getClassLoader());
}
return new ElasticApmTracer(configurationRegistry, reporter, lifecycleListeners, spanListeners);
return new ElasticApmTracer(configurationRegistry, reporter, lifecycleListeners, activationListeners);
}

private ConfigurationRegistry getDefaultConfigurationRegistry(List<ConfigurationSource> configSources) {
Expand Down
21 changes: 11 additions & 10 deletions apm-agent-core/src/main/java/co/elastic/apm/agent/impl/Scope.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,27 @@
*/
package co.elastic.apm.agent.impl;

import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.TraceContextHolder;

/**
* Within a scope, a {@link AbstractSpan} is active on the current thread.
* Within a scope, a {@link TraceContextHolder} is active on the current thread.
* Calling {@link #close()} detaches them from the active thread.
* In a scope, you can get the currently active {@link AbstractSpan} via
* {@link ElasticApmTracer#activeSpan()}.
* In a scope, you can get the currently active {@link TraceContextHolder} via
* {@link ElasticApmTracer#getActive()}.
* <p>
* During the duration of a {@link AbstractSpan},
* During the duration of a {@link TraceContextHolder},
* it can be active multiple times on multiple threads.
* In applications with a single thread per request model,
* there is typically one scope which lasts for the lifetime of the {@link AbstractSpan}.
* there is typically one scope which lasts for the lifetime of the {@link TraceContextHolder}.
* In reactive applications, this model does not work, as a request is handled in multiple threads.
* These types of application still might find it useful to scope a {@link AbstractSpan} on the currently processing thread.
* These types of application still might find it useful to scope a {@link TraceContextHolder} on the currently processing thread.
* For example, an instrumentation for {@link java.util.concurrent.ExecutorService} might want to propagate the currently
* active {@link AbstractSpan} to thread which runs {@link java.util.concurrent.ExecutorService#execute(Runnable)},
* so that {@link ElasticApmTracer#activeSpan()} returns the expected {@link AbstractSpan}.
* active {@link TraceContextHolder} to thread which runs {@link java.util.concurrent.ExecutorService#execute(Runnable)},
* so that {@link ElasticApmTracer#getActive()} returns the expected {@link TraceContextHolder}.
* </p>
* <p>
* Note: {@link #close() closing} a scope does not {@link AbstractSpan#end() end} it's active {@link AbstractSpan}.
* Note: {@link #close() closing} a scope does not {@link co.elastic.apm.agent.impl.transaction.AbstractSpan#end() end} it's active
* {@link co.elastic.apm.agent.impl.transaction.AbstractSpan}.
* </p>
*/
public interface Scope extends AutoCloseable {
Expand Down

This file was deleted.

Loading