Skip to content

Commit

Permalink
Merge pull request #243 from veklov/fix-reactor-lambda-subscribers
Browse files Browse the repository at this point in the history
Fix of reactor lambda subscribers instrumentation
  • Loading branch information
tspring authored Mar 15, 2021
2 parents fd3221f + 45fd454 commit 5ca67a2
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,17 @@ public final void onComplete() {
if (token != null) {
token.expire();
this.nrContext = null;
}
Weaver.callOriginal();
}

public final void onError(Throwable t) {
Token token = this.currentContext().getOrDefault("newrelic-token", null);
if (token != null) {
token.expire();
this.nrContext = null;
}
Weaver.callOriginal();
}

public Context currentContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,17 @@ public final void onComplete() {
if (token != null) {
token.expire();
this.nrContext = null;
}
Weaver.callOriginal();
}

public final void onError(Throwable t) {
Token token = this.currentContext().getOrDefault("newrelic-token", null);
if (token != null) {
token.expire();
this.nrContext = null;
}
Weaver.callOriginal();
}

public Context currentContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.tokenLift;
import static org.hamcrest.CoreMatchers.is;
Expand Down Expand Up @@ -199,6 +200,98 @@ public void testMonoNestedInFlatMap() {
assertCapturedData(hadTransaction);
}

@Test(timeout = 10000L)
public void testLambdaMonoSubscriberOnSuccess() {
AtomicBoolean hadTransaction = new AtomicBoolean();
CountDownLatch done = new CountDownLatch(1);
inTransaction(() -> {
Token token = createToken();
Mono.empty()
.subscribeOn(Schedulers.elastic())
.doOnSuccess(v ->
checkTransaction(hadTransaction))

// it is not need as LambdaMonoSubscriber instrumentation creates token
// and puts it into the context
//.subscriberContext(with(token))

// Call countDown in onComplete to see that instrumentation code calls original method
.subscribe(nil(), nil(), done::countDown);
await(done);
token.expire();
});
assertCapturedData(hadTransaction);
}

@Test(timeout = 10000L)
public void testLambdaMonoSubscriberOnError() {
AtomicBoolean hadTransaction = new AtomicBoolean();
CountDownLatch done = new CountDownLatch(1);
inTransaction(() -> {
Token token = createToken();
Mono.error(new RuntimeException())
.subscribeOn(Schedulers.elastic())
.doOnError(v ->
checkTransaction(hadTransaction))

// it is not need as LambdaMonoSubscriber instrumentation creates token
// and puts it into the context
//.subscriberContext(with(token))

// Call countDown in onError to see that instrumentation code calls original method
.subscribe(nil(), v -> done.countDown());
await(done);
token.expire();
});
assertCapturedData(hadTransaction);
}

@Test(timeout = 10000L)
public void testLambdaSubscriberOnComplete() {
AtomicBoolean hadTransaction = new AtomicBoolean();
CountDownLatch done = new CountDownLatch(1);
inTransaction(() -> {
Token token = createToken();
Flux.empty()
.subscribeOn(Schedulers.elastic())
.doOnComplete(() ->
checkTransaction(hadTransaction))

// it is not need as LambdaSubscriber instrumentation creates token
// and puts it into the context
//.subscriberContext(with(token))

// Call countDown in onComplete to see that instrumentation code calls original method
.subscribe(nil(), nil(), done::countDown);
await(done);
token.expire();
});
assertCapturedData(hadTransaction);
}

@Test(timeout = 10000L)
public void testLambdaSubscriberOnError() {
AtomicBoolean hadTransaction = new AtomicBoolean();
CountDownLatch done = new CountDownLatch(1);
inTransaction(() -> {
Token token = createToken();
Flux.error(new RuntimeException())
.subscribeOn(Schedulers.elastic())
.doOnError(v ->
checkTransaction(hadTransaction))

// it is not need as LambdaSubscriber instrumentation creates token
// and puts it into the context
//.subscriberContext(with(token))

// Call countDown in onError to see that instrumentation code calls original method
.subscribe(nil(), v -> done.countDown());
await(done);
token.expire();
});
assertCapturedData(hadTransaction);
}

@Trace(dispatcher = true)
public void inTransaction(Runnable actions) {
actions.run();
Expand Down Expand Up @@ -226,6 +319,18 @@ public void checkTransaction(AtomicBoolean hadTransaction) {
hadTransaction.set(AgentBridge.getAgent().getTransaction(false) != null);
}

private <T> Consumer<T> nil() {
return v -> {
};
}

private void await(CountDownLatch done) {
try {
done.await();
} catch (InterruptedException ignore) {
}
}

private void assertCapturedData(AtomicBoolean hadTransaction) {
assertTrue("Did not have transaction", hadTransaction.get());

Expand Down

0 comments on commit 5ca67a2

Please sign in to comment.