Skip to content

Commit

Permalink
Merge pull request #180 from newrelic/refactor-infinite-tracing
Browse files Browse the repository at this point in the history
refactor infinite tracing for improved clarity and simplicity, add su…
  • Loading branch information
jack-berg authored Jan 20, 2021
2 parents ca26ed1 + 02caf84 commit 34a24dd
Show file tree
Hide file tree
Showing 49 changed files with 1,207 additions and 1,975 deletions.
45 changes: 40 additions & 5 deletions infinite-tracing/src/main/java/com/newrelic/BackoffPolicy.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,43 @@
package com.newrelic;

import io.grpc.Status;
import com.google.common.annotations.VisibleForTesting;

public interface BackoffPolicy {
boolean shouldReconnect(Status status);
void backoff();
}
import java.util.concurrent.atomic.AtomicInteger;

class BackoffPolicy {

@VisibleForTesting
static final int[] BACKOFF_SECONDS_SEQUENCE = new int[] { 15, 15, 30, 60, 120, 300 };
private static final int DEFAULT_BACKOFF_SECONDS = 15;

private final AtomicInteger backoffSequenceIndex = new AtomicInteger(-1);

/**
* Get the default backoff seconds.
*
* @return the default backoff seconds
*/
int getDefaultBackoffSeconds() {
return DEFAULT_BACKOFF_SECONDS;
}

/**
* Get the next entry in the backoff sequence.
*
* @return the next number of seconds to backoff
*/
int getNextBackoffSeconds() {
int nextIndex = backoffSequenceIndex.incrementAndGet();
return nextIndex < BACKOFF_SECONDS_SEQUENCE.length
? BACKOFF_SECONDS_SEQUENCE[nextIndex]
: BACKOFF_SECONDS_SEQUENCE[BACKOFF_SECONDS_SEQUENCE.length - 1];
}

/**
* Reset the backoff sequence.
*/
void reset() {
backoffSequenceIndex.set(-1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
* This is a signaling exception to the call that the channel is closing. By using this exception, we can pass the required
* information to the {@link io.grpc.stub.StreamObserver#onError} call so it knows not to consider this an actual error.
*/
public class ChannelClosingException extends Exception {
class ChannelClosingException extends Exception {
}
36 changes: 0 additions & 36 deletions infinite-tracing/src/main/java/com/newrelic/ChannelFactory.java

This file was deleted.

207 changes: 207 additions & 0 deletions infinite-tracing/src/main/java/com/newrelic/ChannelManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package com.newrelic;

import com.google.common.annotations.VisibleForTesting;
import com.newrelic.api.agent.Logger;
import com.newrelic.api.agent.MetricAggregator;
import com.newrelic.trace.v1.IngestServiceGrpc;
import com.newrelic.trace.v1.IngestServiceGrpc.IngestServiceStub;
import com.newrelic.trace.v1.V1;
import io.grpc.ManagedChannel;
import io.grpc.okhttp.OkHttpChannelBuilder;
import io.grpc.stub.ClientCallStreamObserver;

import javax.annotation.concurrent.GuardedBy;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

class ChannelManager {

private final Logger logger;
private final InfiniteTracingConfig config;
private final MetricAggregator aggregator;
private final BackoffPolicy backoffManager;

private final Object lock = new Object();
@GuardedBy("lock") private boolean isShutdownForever;
@GuardedBy("lock") private CountDownLatch backoffLatch;
@GuardedBy("lock") private ManagedChannel managedChannel;
@GuardedBy("lock") private ClientCallStreamObserver<V1.Span> spanObserver;
@GuardedBy("lock") private ResponseObserver responseObserver;
@GuardedBy("lock") private String agentRunToken;
@GuardedBy("lock") private Map<String, String> requestMetadata;

ChannelManager(InfiniteTracingConfig config, MetricAggregator aggregator, String agentRunToken, Map<String, String> requestMetadata) {
this.logger = config.getLogger();
this.config = config;
this.aggregator = aggregator;
this.agentRunToken = agentRunToken;
this.requestMetadata = requestMetadata;
this.backoffManager = new BackoffPolicy();
}

/**
* Update metadata included on gRPC requests.
*
* @param agentRunToken the agent run token
* @param requestMetadata any extra metadata headers that must be included
*/
void updateMetadata(String agentRunToken, Map<String, String> requestMetadata) {
synchronized (lock) {
this.agentRunToken = agentRunToken;
this.requestMetadata = requestMetadata;
}
}

/**
* Obtain a span observer. Creates a channel if one is not open. Creates a span observer if one
* does not exist. If the channel has been shutdown and is backing off via
* {@link #shutdownChannelAndBackoff(int)}, awaits the backoff period before recreating the channel.
*
* @return a span observer
*/
ClientCallStreamObserver<V1.Span> getSpanObserver() {
// Obtain the lock, and await the backoff if in progress
CountDownLatch latch;
synchronized (lock) {
latch = backoffLatch;
}
if (latch != null) {
try {
logger.log(Level.FINE, "Awaiting backoff.");
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted while awaiting backoff.");
}
}

// Obtain the lock, and possibly recreate the channel or the span observer
synchronized (lock) {
if (isShutdownForever) {
throw new RuntimeException("No longer accepting connections to gRPC.");
}
if (managedChannel == null) {
logger.log(Level.FINE, "Creating gRPC channel.");
managedChannel = buildChannel();
}
if (spanObserver == null) {
logger.log(Level.FINE, "Creating gRPC span observer.");
IngestServiceStub ingestServiceStub = buildStub(managedChannel);
responseObserver = buildResponseObserver();
spanObserver = (ClientCallStreamObserver<V1.Span>) ingestServiceStub.recordSpan(responseObserver);
aggregator.incrementCounter("Supportability/InfiniteTracing/Connect");
}
return spanObserver;
}
}

@VisibleForTesting
IngestServiceStub buildStub(ManagedChannel managedChannel) {
return IngestServiceGrpc.newStub(managedChannel);
}

@VisibleForTesting
ResponseObserver buildResponseObserver() {
return new ResponseObserver(logger, this, aggregator, backoffManager);
}

/**
* Cancel the span observer. The next time {@link #getSpanObserver()} is called the span observer
* will be recreated. This cancels the span observer with a {@link ChannelClosingException}, which
* {@link ResponseObserver#onError(Throwable)} detects and ignores.
*/
void cancelSpanObserver() {
synchronized (lock) {
if (spanObserver == null) {
return;
}
logger.log(Level.FINE, "Canceling gRPC span observer.");
spanObserver.cancel("CLOSING_CONNECTION", new ChannelClosingException());
spanObserver = null;
responseObserver = null;
}
}

/**
* Shutdown the channel, cancel the span observer, and backoff. The next time {@link #getSpanObserver()}
* is called, it will await the backoff and the channel will be recreated.
*
* @param backoffSeconds the number of seconds to await before the channel can be recreated
*/
void shutdownChannelAndBackoff(int backoffSeconds) {
logger.log(Level.FINE, "Shutting down gRPC channel and backing off for {0} seconds.", backoffSeconds);
CountDownLatch latch;
synchronized (lock) {
if (backoffLatch != null) {
logger.log(Level.FINE, "Backoff already in progress.");
return;
}
backoffLatch = new CountDownLatch(1);
latch = backoffLatch;

if (managedChannel != null) {
managedChannel.shutdown();
managedChannel = null;
}
cancelSpanObserver();
}

try {
TimeUnit.SECONDS.sleep(backoffSeconds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted while backing off.");
}

synchronized (lock) {
latch.countDown();
backoffLatch = null;
}
logger.log(Level.FINE, "Backoff complete.");
}

/**
* Shutdown the channel and do not recreate it. The next time {@link #getSpanObserver()} is called
* an exception will be thrown.
*/
void shutdownChannelForever() {
synchronized (lock) {
logger.log(Level.FINE, "Shutting down gRPC channel forever.");
shutdownChannelAndBackoff(0);
this.isShutdownForever = true;
}
}

@VisibleForTesting
ManagedChannel buildChannel() {
Map<String, String> headers;
synchronized (lock) {
headers = requestMetadata != null ? new HashMap<>(requestMetadata) : new HashMap<String, String>();
headers.put("agent_run_token", agentRunToken);
}

headers.put("license_key", config.getLicenseKey());
if (config.getFlakyPercentage() != null) {
logger.log(Level.WARNING, "Infinite tracing is configured with a flaky percentage! There will be errors!");
headers.put("flaky", config.getFlakyPercentage().toString());
if (config.getFlakyCode() != null) {
headers.put("flaky_code", config.getFlakyCode().toString());
}
}

OkHttpChannelBuilder channelBuilder = OkHttpChannelBuilder
.forAddress(config.getHost(), config.getPort())
.defaultLoadBalancingPolicy("pick_first")
.intercept(new HeadersInterceptor(headers));
if (config.getUsePlaintext()) {
channelBuilder.usePlaintext();
} else {
channelBuilder.useTransportSecurity();
}
return channelBuilder.build();
}

}
59 changes: 0 additions & 59 deletions infinite-tracing/src/main/java/com/newrelic/ChannelSupplier.java

This file was deleted.

Loading

0 comments on commit 34a24dd

Please sign in to comment.