diff --git a/build.gradle b/build.gradle index 73a33647a490..1c052a3a254f 100644 --- a/build.gradle +++ b/build.gradle @@ -191,8 +191,9 @@ subprojects { google_auth_credentials: 'com.google.auth:google-auth-library-credentials:0.4.0', okhttp: 'com.squareup.okhttp:okhttp:2.5.0', okio: 'com.squareup.okio:okio:1.6.0', - opencensus_api: 'io.opencensus:opencensus-api:0.7.0', - opencensus_impl: 'io.opencensus:opencensus-impl:0.7.0', + opencensus_api: 'io.opencensus:opencensus-api:0.8.0-SNAPSHOT', + opencensus_contrib_grpc_metrics: 'io.opencensus:opencensus-contrib-grpc-metrics:0.8.0-SNAPSHOT', + opencensus_impl: 'io.opencensus:opencensus-impl:0.8.0-SNAPSHOT', instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3', protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}", protobuf_lite: "com.google.protobuf:protobuf-lite:3.0.1", diff --git a/core/build.gradle b/core/build.gradle index b6cd5f51fac0..74a31f3bdd0a 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -19,6 +19,12 @@ dependencies { // we'll always be more up-to-date exclude group: 'io.grpc', module: 'grpc-context' } + compile (libraries.opencensus_contrib_grpc_metrics) { + // prefer 3.0.0 from libraries instead of 3.0.1 + exclude group: 'com.google.code.findbugs', module: 'jsr305' + // we'll always be more up-to-date + exclude group: 'io.grpc', module: 'grpc-context' + } testCompile project(':grpc-testing') diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 471f8292b9a5..75dad9290ff1 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -21,8 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; -import com.google.instrumentation.stats.Stats; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.Attributes; import io.grpc.ClientInterceptor; import io.grpc.CompressorRegistry; @@ -34,6 +32,11 @@ import io.grpc.NameResolver; import io.grpc.NameResolverProvider; import io.grpc.PickFirstBalancerFactory; +import io.opencensus.stats.Stats; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; +import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.trace.Tracing; import java.net.SocketAddress; import java.net.URI; @@ -152,7 +155,13 @@ protected final int maxInboundMessageSize() { private boolean tracingEnabled = true; @Nullable - private StatsContextFactory statsFactory; + private Tagger tagger; + + @Nullable + private TagContextBinarySerializer tagCtxSerializer; + + @Nullable + private StatsRecorder statsRecorder; protected AbstractManagedChannelImplBuilder(String target) { this.target = Preconditions.checkNotNull(target, "target"); @@ -287,8 +296,11 @@ public final T idleTimeout(long value, TimeUnit unit) { * Override the default stats implementation. */ @VisibleForTesting - protected final T statsContextFactory(StatsContextFactory statsFactory) { - this.statsFactory = statsFactory; + protected final T statsImplementation( + Tagger tagger, TagContextBinarySerializer tagCtxSerializer, StatsRecorder statsRecorder) { + this.tagger = tagger; + this.tagCtxSerializer = tagCtxSerializer; + this.statsRecorder = statsRecorder; return thisT(); } @@ -346,15 +358,28 @@ final List getEffectiveInterceptors() { List effectiveInterceptors = new ArrayList(this.interceptors); if (statsEnabled) { - StatsContextFactory statsCtxFactory = - this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory(); - if (statsCtxFactory != null) { - CensusStatsModule censusStats = - new CensusStatsModule(statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER, true, recordStats); - // First interceptor runs last (see ClientInterceptors.intercept()), so that no - // other interceptor can override the tracer factory we set in CallOptions. - effectiveInterceptors.add(0, censusStats.getClientInterceptor()); - } + Tagger tagger = this.tagger != null ? this.tagger : Tags.getTagger(); + TagContextBinarySerializer tagCtxSerializer = + this.tagCtxSerializer != null + ? this.tagCtxSerializer + : Tags.getTagPropagationComponent().getBinarySerializer(); + StatsRecorder statsRecorder = + this.statsRecorder != null ? this.statsRecorder : Stats.getStatsRecorder(); + // // TODO: How do we check whether stats is enabled, now that the StatsRecorder is always + // // non-null? Uncommenting this line causes test failures. + // if (Stats.getState() == StatsCollectionState.ENABLED) { + CensusStatsModule censusStats = + new CensusStatsModule( + tagger, + tagCtxSerializer, + statsRecorder, + GrpcUtil.STOPWATCH_SUPPLIER, + true, + recordStats); + // First interceptor runs last (see ClientInterceptors.intercept()), so that no + // other interceptor can override the tracer factory we set in CallOptions. + effectiveInterceptors.add(0, censusStats.getClientInterceptor()); + // } } if (tracingEnabled) { CensusTracingModule censusTracing = diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 11c1a9ac4adb..53576e36f4e6 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -20,8 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; -import com.google.instrumentation.stats.Stats; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.BindableService; import io.grpc.CompressorRegistry; import io.grpc.Context; @@ -36,6 +34,11 @@ import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; import io.grpc.ServerTransportFilter; +import io.opencensus.stats.Stats; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; +import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.trace.Tracing; import java.util.ArrayList; import java.util.Collections; @@ -97,7 +100,13 @@ public List getServices() { CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY; @Nullable - private StatsContextFactory statsFactory; + private Tagger tagger; + + @Nullable + private TagContextBinarySerializer tagCtxSerializer; + + @Nullable + private StatsRecorder statsRecorder; private boolean statsEnabled = true; private boolean recordStats = true; @@ -184,8 +193,13 @@ public final T compressorRegistry(CompressorRegistry registry) { * Override the default stats implementation. */ @VisibleForTesting - protected T statsContextFactory(StatsContextFactory statsFactory) { - this.statsFactory = statsFactory; + protected T statsImplementation( + final Tagger tagger, + TagContextBinarySerializer tagCtxSerializer, + StatsRecorder statsRecorder) { + this.tagger = tagger; + this.tagCtxSerializer = tagCtxSerializer; + this.statsRecorder = statsRecorder; return thisT(); } @@ -228,13 +242,26 @@ final List getTracerFactories() { ArrayList tracerFactories = new ArrayList(); if (statsEnabled) { - StatsContextFactory statsFactory = - this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory(); - if (statsFactory != null) { - CensusStatsModule censusStats = - new CensusStatsModule(statsFactory, GrpcUtil.STOPWATCH_SUPPLIER, true, recordStats); - tracerFactories.add(censusStats.getServerTracerFactory()); - } + Tagger tagger = this.tagger != null ? this.tagger : Tags.getTagger(); + TagContextBinarySerializer tagCtxSerializer = + this.tagCtxSerializer != null + ? this.tagCtxSerializer + : Tags.getTagPropagationComponent().getBinarySerializer(); + StatsRecorder statsRecorder = + this.statsRecorder != null ? this.statsRecorder : Stats.getStatsRecorder(); + // // TODO: How do we check whether stats is enabled, now that the StatsRecorder is always + // // non-null? Uncommenting this line causes test failures. + // if (Stats.getState() == StatsCollectionState.ENABLED) { + CensusStatsModule censusStats = + new CensusStatsModule( + tagger, + tagCtxSerializer, + statsRecorder, + GrpcUtil.STOPWATCH_SUPPLIER, + true, + recordStats); + tracerFactories.add(censusStats.getServerTracerFactory()); + // } } if (tracingEnabled) { CensusTracingModule censusTracing = diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index 127b83d6c1d3..a503d92eaf69 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -19,16 +19,11 @@ import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static com.google.instrumentation.stats.ContextUtils.STATS_CONTEXT_KEY; +import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; -import com.google.instrumentation.stats.MeasurementMap; -import com.google.instrumentation.stats.RpcConstants; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; -import com.google.instrumentation.stats.TagValue; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -42,9 +37,13 @@ import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.StreamTracer; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -69,44 +68,42 @@ final class CensusStatsModule { private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1); private static final ClientTracer BLANK_CLIENT_TRACER = new ClientTracer(); - private final StatsContextFactory statsCtxFactory; + private final Tagger tagger; + private final StatsRecorder statsRecorder; private final Supplier stopwatchSupplier; @VisibleForTesting - final Metadata.Key statsHeader; + final Metadata.Key statsHeader; private final StatsClientInterceptor clientInterceptor = new StatsClientInterceptor(); private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory(); private final boolean propagateTags; private final boolean recordStats; CensusStatsModule( - final StatsContextFactory statsCtxFactory, Supplier stopwatchSupplier, + final Tagger tagger, + final TagContextBinarySerializer tagCtxSerializer, + StatsRecorder statsRecorder, Supplier stopwatchSupplier, boolean propagateTags, boolean recordStats) { - this.statsCtxFactory = checkNotNull(statsCtxFactory, "statsCtxFactory"); + this.tagger = checkNotNull(tagger, "tagger"); + this.statsRecorder = checkNotNull(statsRecorder, "statsRecorder"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.propagateTags = propagateTags; this.recordStats = recordStats; this.statsHeader = - Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller() { + Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller() { @Override - public byte[] toBytes(StatsContext context) { + public byte[] toBytes(TagContext context) { // TODO(carl-mastrangelo): currently we only make sure the correctness. We may need to // optimize out the allocation and copy in the future. - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - try { - context.serialize(buffer); - } catch (IOException e) { - throw new RuntimeException(e); - } - return buffer.toByteArray(); + return tagCtxSerializer.toByteArray(context); } @Override - public StatsContext parseBytes(byte[] serialized) { + public TagContext parseBytes(byte[] serialized) { try { - return statsCtxFactory.deserialize(new ByteArrayInputStream(serialized)); + return tagCtxSerializer.fromByteArray(serialized); } catch (Exception e) { logger.log(Level.FINE, "Failed to parse stats header", e); - return statsCtxFactory.getDefault(); + return tagger.empty(); } } }); @@ -116,7 +113,7 @@ public StatsContext parseBytes(byte[] serialized) { * Creates a {@link ClientCallTracer} for a new call. */ @VisibleForTesting - ClientCallTracer newClientCallTracer(StatsContext parentCtx, String fullMethodName) { + ClientCallTracer newClientCallTracer(TagContext parentCtx, String fullMethodName) { return new ClientCallTracer(this, parentCtx, fullMethodName); } @@ -203,9 +200,9 @@ static final class ClientCallTracer extends ClientStreamTracer.Factory { private final Stopwatch stopwatch; private volatile ClientTracer streamTracer; private volatile int callEnded; - private final StatsContext parentCtx; + private final TagContext parentCtx; - ClientCallTracer(CensusStatsModule module, StatsContext parentCtx, String fullMethodName) { + ClientCallTracer(CensusStatsModule module, TagContext parentCtx, String fullMethodName) { this.module = module; this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); @@ -222,7 +219,7 @@ public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadat "Are you creating multiple streams per call? This class doesn't yet support this case."); if (module.propagateTags) { headers.discardAll(module.statsHeader); - if (parentCtx != module.statsCtxFactory.getDefault()) { + if (!module.tagger.empty().equals(parentCtx)) { headers.put(module.statsHeader, parentCtx); } } @@ -248,27 +245,29 @@ void callEnded(Status status) { if (tracer == null) { tracer = BLANK_CLIENT_TRACER; } - MeasurementMap.Builder builder = MeasurementMap.builder() + StatsRecord statsRecord = module.statsRecorder.newRecord() // The metrics are in double - .put(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI) - .put(RpcConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount) - .put(RpcConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount) - .put(RpcConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize) - .put(RpcConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize) + .put(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI) + .put(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount) + .put(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount) + .put(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize) + .put(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize) .put( - RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, + RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, tracer.outboundUncompressedSize) .put( - RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, + RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, tracer.inboundUncompressedSize); if (!status.isOk()) { - builder.put(RpcConstants.RPC_CLIENT_ERROR_COUNT, 1.0); + statsRecord.put(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT, 1); } - parentCtx - .with( - RpcConstants.RPC_CLIENT_METHOD, TagValue.create(fullMethodName), - RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) - .record(builder.build()); + statsRecord.record( + module + .tagger + .toBuilder(parentCtx) + .put(RpcMeasureConstants.RPC_CLIENT_METHOD, TagValue.create(fullMethodName)) + .put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) + .build()); } } @@ -291,10 +290,10 @@ private static final class ServerTracer extends ServerStreamTracer { private final CensusStatsModule module; private final String fullMethodName; @Nullable - private final StatsContext parentCtx; + private final TagContext parentCtx; private volatile int streamClosed; private final Stopwatch stopwatch; - private final StatsContextFactory statsCtxFactory; + private final Tagger tagger; private volatile long outboundMessageCount; private volatile long inboundMessageCount; private volatile long outboundWireSize; @@ -305,14 +304,14 @@ private static final class ServerTracer extends ServerStreamTracer { ServerTracer( CensusStatsModule module, String fullMethodName, - StatsContext parentCtx, + TagContext parentCtx, Supplier stopwatchSupplier, - StatsContextFactory statsCtxFactory) { + Tagger tagger) { this.module = module; this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.stopwatch = stopwatchSupplier.get().start(); - this.statsCtxFactory = statsCtxFactory; + this.tagger = tagger; } @Override @@ -361,28 +360,31 @@ public void streamClosed(Status status) { } stopwatch.stop(); long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); - MeasurementMap.Builder builder = MeasurementMap.builder() + StatsRecord statsRecord = module.statsRecorder.newRecord() // The metrics are in double - .put(RpcConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI) - .put(RpcConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount) - .put(RpcConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount) - .put(RpcConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize) - .put(RpcConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize) - .put(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES, outboundUncompressedSize) - .put(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES, inboundUncompressedSize); + .put(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI) + .put(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount) + .put(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount) + .put(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize) + .put(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize) + .put(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES, outboundUncompressedSize) + .put(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES, inboundUncompressedSize); if (!status.isOk()) { - builder.put(RpcConstants.RPC_SERVER_ERROR_COUNT, 1.0); + statsRecord.put(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT, 1); } - StatsContext ctx = firstNonNull(parentCtx, statsCtxFactory.getDefault()); - ctx - .with(RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) - .record(builder.build()); + TagContext ctx = firstNonNull(parentCtx, tagger.empty()); + statsRecord.record( + module + .tagger + .toBuilder(ctx) + .put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) + .build()); } @Override public Context filterContext(Context context) { - if (parentCtx != statsCtxFactory.getDefault()) { - return context.withValue(STATS_CONTEXT_KEY, parentCtx); + if (!tagger.empty().equals(parentCtx)) { + return context.withValue(TAG_CONTEXT_KEY, parentCtx); } return context; } @@ -392,13 +394,17 @@ public Context filterContext(Context context) { final class ServerTracerFactory extends ServerStreamTracer.Factory { @Override public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) { - StatsContext parentCtx = headers.get(statsHeader); + TagContext parentCtx = headers.get(statsHeader); if (parentCtx == null) { - parentCtx = statsCtxFactory.getDefault(); + parentCtx = tagger.empty(); } - parentCtx = parentCtx.with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(fullMethodName)); + parentCtx = + tagger + .toBuilder(parentCtx) + .put(RpcMeasureConstants.RPC_SERVER_METHOD, TagValue.create(fullMethodName)) + .build(); return new ServerTracer( - CensusStatsModule.this, fullMethodName, parentCtx, stopwatchSupplier, statsCtxFactory); + CensusStatsModule.this, fullMethodName, parentCtx, stopwatchSupplier, tagger); } } @@ -407,8 +413,8 @@ final class StatsClientInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { - // New RPCs on client-side inherit the stats context from the current Context. - StatsContext parentCtx = statsCtxFactory.getCurrentStatsContext(); + // New RPCs on client-side inherit the tag context from the current Context. + TagContext parentCtx = tagger.getCurrentTagContext(); final ClientCallTracer tracerFactory = newClientCallTracer(parentCtx, method.getFullMethodName()); ClientCall call = diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index a874a7b0ec31..89d9efbe04cd 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -27,8 +27,6 @@ import static org.mockito.Mockito.mock; import com.google.common.util.concurrent.MoreExecutors; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -38,7 +36,14 @@ import io.grpc.LoadBalancer; import io.grpc.MethodDescriptor; import io.grpc.NameResolver; -import java.io.InputStream; +import io.opencensus.common.Scope; +import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagContextParseException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; @@ -61,15 +66,56 @@ public ClientCall interceptCall( } }; - private static final StatsContextFactory DUMMY_STATS_FACTORY = - new StatsContextFactory() { + private static final Tagger DUMMY_TAGGER = + new Tagger() { @Override - public StatsContext deserialize(InputStream input) { + public TagContext empty() { throw new UnsupportedOperationException(); } @Override - public StatsContext getDefault() { + public TagContext getCurrentTagContext() { + throw new UnsupportedOperationException(); + } + + @Override + public TagContextBuilder emptyBuilder() { + throw new UnsupportedOperationException(); + } + + @Override + public TagContextBuilder toBuilder(TagContext tags) { + throw new UnsupportedOperationException(); + } + + @Override + public TagContextBuilder currentBuilder() { + throw new UnsupportedOperationException(); + } + + @Override + public Scope withTagContext(TagContext tags) { + throw new UnsupportedOperationException(); + } + }; + + private static final TagContextBinarySerializer DUMMY_TAG_CONTEXT_BINARY_SERIALIZER = + new TagContextBinarySerializer() { + @Override + public byte[] toByteArray(TagContext tags) { + throw new UnsupportedOperationException(); + } + + @Override + public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { + throw new UnsupportedOperationException(); + } + }; + + private static final StatsRecorder DUMMY_STATS_RECORDER = + new StatsRecorder() { + @Override + public StatsRecord newRecord() { throw new UnsupportedOperationException(); } }; @@ -346,12 +392,12 @@ public void idleTimeout() { static class Builder extends AbstractManagedChannelImplBuilder { Builder(String target) { super(target); - statsContextFactory(DUMMY_STATS_FACTORY); + statsImplementation(DUMMY_TAGGER, DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, DUMMY_STATS_RECORDER); } Builder(SocketAddress directServerAddress, String authority) { super(directServerAddress, authority); - statsContextFactory(DUMMY_STATS_FACTORY); + statsImplementation(DUMMY_TAGGER, DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, DUMMY_STATS_RECORDER); } @Override diff --git a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java index fed373a4b4bb..145764cdcb6e 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java @@ -19,12 +19,17 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.Metadata; import io.grpc.ServerStreamTracer; +import io.opencensus.common.Scope; +import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagContextParseException; import java.io.File; -import java.io.InputStream; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; @@ -33,15 +38,57 @@ /** Unit tests for {@link AbstractServerImplBuilder}. */ @RunWith(JUnit4.class) public class AbstractServerImplBuilderTest { - private static final StatsContextFactory DUMMY_STATS_FACTORY = - new StatsContextFactory() { + + private static final Tagger DUMMY_TAGGER = + new Tagger() { + @Override + public TagContext empty() { + throw new UnsupportedOperationException(); + } + + @Override + public TagContext getCurrentTagContext() { + throw new UnsupportedOperationException(); + } + + @Override + public TagContextBuilder emptyBuilder() { + throw new UnsupportedOperationException(); + } + @Override - public StatsContext deserialize(InputStream input) { + public TagContextBuilder toBuilder(TagContext tags) { throw new UnsupportedOperationException(); } @Override - public StatsContext getDefault() { + public TagContextBuilder currentBuilder() { + throw new UnsupportedOperationException(); + } + + @Override + public Scope withTagContext(TagContext tags) { + throw new UnsupportedOperationException(); + } + }; + + private static final TagContextBinarySerializer DUMMY_TAG_CONTEXT_BINARY_SERIALIZER = + new TagContextBinarySerializer() { + @Override + public byte[] toByteArray(TagContext tags) { + throw new UnsupportedOperationException(); + } + + @Override + public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { + throw new UnsupportedOperationException(); + } + }; + + private static final StatsRecorder DUMMY_STATS_RECORDER = + new StatsRecorder() { + @Override + public StatsRecord newRecord() { throw new UnsupportedOperationException(); } }; @@ -97,7 +144,7 @@ public void getTracerFactories_disableBoth() { static class Builder extends AbstractServerImplBuilder { Builder() { - statsContextFactory(DUMMY_STATS_FACTORY); + statsImplementation(DUMMY_TAGGER, DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, DUMMY_STATS_RECORDER); } @Override diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java index c0fad718ca9d..575ce4a2344b 100644 --- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java +++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java @@ -16,7 +16,7 @@ package io.grpc.internal; -import static com.google.instrumentation.stats.ContextUtils.STATS_CONTEXT_KEY; +import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -40,9 +40,6 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.google.instrumentation.stats.RpcConstants; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.TagValue; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -58,9 +55,15 @@ import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.testing.StatsTestUtils; -import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; import io.grpc.internal.testing.StatsTestUtils.MockableSpan; import io.grpc.testing.GrpcServerRule; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tags; import io.opencensus.trace.EndSpanOptions; import io.opencensus.trace.NetworkEvent; import io.opencensus.trace.NetworkEvent.Type; @@ -71,7 +74,6 @@ import io.opencensus.trace.propagation.BinaryFormat; import io.opencensus.trace.propagation.SpanContextParseException; import io.opencensus.trace.unsafe.ContextUtils; -import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.List; import java.util.Random; @@ -134,7 +136,10 @@ public String parse(InputStream stream) { .setFullMethodName("package1.service2/method3") .build(); private final FakeClock fakeClock = new FakeClock(); - private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory(); + private final FakeTagger tagger = new FakeTagger(); + private final FakeTagContextBinarySerializer tagCtxSerializer = + new FakeTagContextBinarySerializer(); + private final FakeStatsRecorder statsRecorder = new FakeStatsRecorder(); private final Random random = new Random(1234); private final Span fakeClientParentSpan = MockableSpan.generateRandomSpan(random); private final Span spyClientSpan = spy(MockableSpan.generateRandomSpan(random)); @@ -182,13 +187,14 @@ public void setUp() throws Exception { when(mockTracingPropagationHandler.fromByteArray(any(byte[].class))) .thenReturn(fakeClientSpanContext); censusStats = - new CensusStatsModule(statsCtxFactory, fakeClock.getStopwatchSupplier(), true, true); + new CensusStatsModule( + tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), true, true); censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler); } @After public void wrapUp() { - assertNull(statsCtxFactory.pollRecord()); + assertNull(statsRecorder.pollRecord()); } @Test @@ -201,7 +207,7 @@ public void clientInterceptorCustomTag() { testClientInterceptors(true); } - // Test that Census ClientInterceptors uses the StatsContext and Span out of the current Context + // Test that Census ClientInterceptors uses the TagContext and Span out of the current Context // to create the ClientCallTracer, and that it intercepts ClientCall.Listener.onClose() to call // ClientCallTracer.callEnded(). private void testClientInterceptors(boolean nonDefaultContext) { @@ -236,9 +242,9 @@ public ClientCall interceptCall( if (nonDefaultContext) { Context ctx = Context.ROOT.withValues( - STATS_CONTEXT_KEY, - statsCtxFactory.getDefault().with( - StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")), + TAG_CONTEXT_KEY, + tagger.emptyBuilder().put( + StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")).build(), ContextUtils.CONTEXT_SPAN_KEY, fakeClientParentSpan); Context origCtx = ctx.attach(); @@ -248,7 +254,7 @@ public ClientCall interceptCall( ctx.detach(origCtx); } } else { - assertNull(STATS_CONTEXT_KEY.get()); + assertEquals(Tags.getTagger().empty(), TAG_CONTEXT_KEY.get()); assertNull(ContextUtils.CONTEXT_SPAN_KEY.get()); call = interceptedChannel.newCall(method, CALL_OPTIONS); } @@ -266,7 +272,7 @@ public ClientCall interceptCall( // Make the call Metadata headers = new Metadata(); call.start(mockClientCallListener, headers); - assertNull(statsCtxFactory.pollRecord()); + assertNull(statsRecorder.pollRecord()); if (nonDefaultContext) { verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), same(fakeClientParentSpan)); @@ -288,15 +294,15 @@ public ClientCall interceptCall( assertEquals("No you don't", status.getDescription()); // The intercepting listener calls callEnded() on ClientCallTracer, which records to Census. - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.toString()); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.asString()); if (nonDefaultContext) { TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG); - assertEquals("extra value", extraTag.toString()); + assertEquals("extra value", extraTag.asString()); } else { assertNull(record.tags.get(StatsTestUtils.EXTRA_TAG)); } @@ -312,7 +318,7 @@ public ClientCall interceptCall( @Test public void clientBasicStatsDefaultContext() { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName()); + censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName()); Metadata headers = new Metadata(); ClientStreamTracer tracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); @@ -339,24 +345,27 @@ public void clientBasicStatsDefaultContext() { tracer.streamClosed(Status.OK); callTracer.callEnded(Status.OK); - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoServerContent(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.OK.toString(), statusTag.toString()); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); - assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); - assertEquals(1128 + 865, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertEquals(33 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), statusTag.asString()); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals( + 1128 + 865, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals( + 33 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); assertEquals(67 + 552, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); assertEquals(30 + 100 + 16 + 24, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); } @Test @@ -403,29 +412,30 @@ public void clientBasicTracingDefaultSpan() { public void clientStreamNeverCreatedStillRecordStats() { CensusStatsModule.ClientCallTracer callTracer = censusStats.newClientCallTracer( - statsCtxFactory.getDefault(), method.getFullMethodName()); + tagger.empty(), method.getFullMethodName()); fakeClock.forwardTime(3000, MILLISECONDS); callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds")); - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoServerContent(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.toString()); - assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ERROR_COUNT)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); assertEquals(0, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); assertEquals(0, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); - assertEquals(3000, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals( + 3000, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); } @Test @@ -470,10 +480,16 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS // EXTRA_TAG is propagated by the FakeStatsContextFactory. Note that not all tags are // propagated. The StatsContextFactory decides which tags are to propagated. gRPC facilitates // the propagation by putting them in the headers. - StatsContext clientCtx = statsCtxFactory.getDefault().with( - StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897")); - CensusStatsModule census = new CensusStatsModule( - statsCtxFactory, fakeClock.getStopwatchSupplier(), propagate, recordStats); + TagContext clientCtx = tagger.emptyBuilder().put( + StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897")).build(); + CensusStatsModule census = + new CensusStatsModule( + tagger, + tagCtxSerializer, + statsRecorder, + fakeClock.getStopwatchSupplier(), + propagate, + recordStats); Metadata headers = new Metadata(); CensusStatsModule.ClientCallTracer callTracer = census.newClientCallTracer(clientCtx, method.getFullMethodName()); @@ -494,23 +510,25 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS Context serverContext = serverTracer.filterContext(Context.ROOT); // It also put clientCtx in the Context seen by the call handler assertEquals( - clientCtx.with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(method.getFullMethodName())), - STATS_CONTEXT_KEY.get(serverContext)); + tagger.toBuilder(clientCtx).put( + RpcMeasureConstants.RPC_SERVER_METHOD, + TagValue.create(method.getFullMethodName())).build(), + TAG_CONTEXT_KEY.get(serverContext)); // Verifies that the server tracer records the status with the propagated tag serverTracer.streamClosed(Status.OK); if (recordStats) { - StatsTestUtils.MetricsRecord serverRecord = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord serverRecord = statsRecorder.pollRecord(); assertNotNull(serverRecord); assertNoClientContent(serverRecord); - TagValue serverMethodTag = serverRecord.tags.get(RpcConstants.RPC_SERVER_METHOD); - assertEquals(method.getFullMethodName(), serverMethodTag.toString()); - TagValue serverStatusTag = serverRecord.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.OK.toString(), serverStatusTag.toString()); - assertNull(serverRecord.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT)); + TagValue serverMethodTag = serverRecord.tags.get(RpcMeasureConstants.RPC_SERVER_METHOD); + assertEquals(method.getFullMethodName(), serverMethodTag.asString()); + TagValue serverStatusTag = serverRecord.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), serverStatusTag.asString()); + assertNull(serverRecord.getMetric(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG); - assertEquals("extra-tag-value-897", serverPropagatedTag.toString()); + assertEquals("extra-tag-value-897", serverPropagatedTag.asString()); } // Verifies that the client tracer factory uses clientCtx, which includes the custom tags, to @@ -518,27 +536,27 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS callTracer.callEnded(Status.OK); if (recordStats) { - StatsTestUtils.MetricsRecord clientRecord = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord clientRecord = statsRecorder.pollRecord(); assertNotNull(clientRecord); assertNoServerContent(clientRecord); - TagValue clientMethodTag = clientRecord.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), clientMethodTag.toString()); - TagValue clientStatusTag = clientRecord.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.OK.toString(), clientStatusTag.toString()); - assertNull(clientRecord.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); + TagValue clientMethodTag = clientRecord.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), clientMethodTag.asString()); + TagValue clientStatusTag = clientRecord.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), clientStatusTag.asString()); + assertNull(clientRecord.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG); - assertEquals("extra-tag-value-897", clientPropagatedTag.toString()); + assertEquals("extra-tag-value-897", clientPropagatedTag.asString()); } if (!recordStats) { - assertNull(statsCtxFactory.pollRecord()); + assertNull(statsRecorder.pollRecord()); } } @Test public void statsHeadersNotPropagateDefaultContext() { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName()); + censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName()); Metadata headers = new Metadata(); callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); assertFalse(headers.containsKey(censusStats.statsHeader)); @@ -551,7 +569,7 @@ public void statsHeaderMalformed() { Metadata.Key arbitraryStatsHeader = Metadata.Key.of("grpc-tags-bin", Metadata.BINARY_BYTE_MARSHALLER); try { - statsCtxFactory.deserialize(new ByteArrayInputStream(statsHeaderValue)); + tagCtxSerializer.fromByteArray(statsHeaderValue); fail("Should have thrown"); } catch (Exception e) { // Expected @@ -561,7 +579,7 @@ public void statsHeaderMalformed() { Metadata headers = new Metadata(); assertNull(headers.get(censusStats.statsHeader)); headers.put(arbitraryStatsHeader, statsHeaderValue); - assertSame(statsCtxFactory.getDefault(), headers.get(censusStats.statsHeader)); + assertSame(tagger.empty(), headers.get(censusStats.statsHeader)); } @Test @@ -624,10 +642,14 @@ public void serverBasicStatsNoHeaders() { tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); Context filteredContext = tracer.filterContext(Context.ROOT); - StatsContext statsCtx = STATS_CONTEXT_KEY.get(filteredContext); + TagContext statsCtx = TAG_CONTEXT_KEY.get(filteredContext); assertEquals( - statsCtxFactory.getDefault() - .with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(method.getFullMethodName())), + tagger + .emptyBuilder() + .put( + RpcMeasureConstants.RPC_SERVER_METHOD, + TagValue.create(method.getFullMethodName())) + .build(), statsCtx); tracer.inboundMessage(0); @@ -651,24 +673,27 @@ public void serverBasicStatsNoHeaders() { tracer.streamClosed(Status.CANCELLED); - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoClientContent(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString()); - assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_ERROR_COUNT)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_COUNT)); - assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); - assertEquals(1128 + 865, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_COUNT)); - assertEquals(34 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_BYTES)); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_SERVER_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.CANCELLED.toString(), statusTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); + assertEquals( + 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); + assertEquals( + 1128 + 865, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); + assertEquals( + 34 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); assertEquals(67 + 552, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); assertEquals(100 + 16 + 24, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_SERVER_LATENCY)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); } @Test @@ -745,26 +770,26 @@ public void generateTraceSpanName() { } private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) { - assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_ELAPSED_TIME)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_SERVER_ELAPSED_TIME)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); } private static void assertNoClientContent(StatsTestUtils.MetricsRecord record) { - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index d3a25c058d41..b35c0ad9b659 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -17,8 +17,8 @@ package io.grpc.testing.integration; import static com.google.common.truth.Truth.assertThat; -import static com.google.instrumentation.stats.ContextUtils.STATS_CONTEXT_KEY; import static io.grpc.testing.integration.Messages.PayloadType.COMPRESSABLE; +import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY; import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -40,10 +40,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.util.concurrent.SettableFuture; -import com.google.instrumentation.stats.RpcConstants; -import com.google.instrumentation.stats.StatsContextFactory; -import com.google.instrumentation.stats.TagKey; -import com.google.instrumentation.stats.TagValue; import com.google.protobuf.BoolValue; import com.google.protobuf.ByteString; import com.google.protobuf.EmptyProtos.Empty; @@ -70,11 +66,13 @@ import io.grpc.auth.MoreCallCredentials; import io.grpc.internal.AbstractServerImplBuilder; import io.grpc.internal.GrpcUtil; -import io.grpc.internal.testing.StatsTestUtils.FakeStatsContext; -import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory; +import io.grpc.internal.testing.StatsTestUtils; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContext; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; import io.grpc.internal.testing.StatsTestUtils.MetricsRecord; import io.grpc.internal.testing.StatsTestUtils.MockableSpan; -import io.grpc.internal.testing.StatsTestUtils; import io.grpc.internal.testing.StreamRecorder; import io.grpc.internal.testing.TestClientStreamTracer; import io.grpc.internal.testing.TestServerStreamTracer; @@ -95,6 +93,12 @@ import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.trace.Span; import io.opencensus.trace.unsafe.ContextUtils; import java.io.IOException; @@ -148,10 +152,11 @@ public abstract class AbstractInteropTest { new AtomicReference(); private static ScheduledExecutorService testServiceExecutor; private static Server server; - private static final FakeStatsContextFactory clientStatsCtxFactory = - new FakeStatsContextFactory(); - private static final FakeStatsContextFactory serverStatsCtxFactory = - new FakeStatsContextFactory(); + private static final FakeTagger tagger = new FakeTagger(); + private static final FakeTagContextBinarySerializer tagContextBinarySerializer = + new FakeTagContextBinarySerializer(); + private static final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder(); + private static final FakeStatsRecorder serverStatsRecorder = new FakeStatsRecorder(); private static final LinkedBlockingQueue serverStreamTracers = new LinkedBlockingQueue(); @@ -207,7 +212,8 @@ protected static void startStaticServer( new TestServiceImpl(testServiceExecutor), allInterceptors)) .addStreamTracerFactory(serverStreamTracerFactory); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, serverStatsCtxFactory); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, tagger, tagContextBinarySerializer, serverStatsRecorder); try { server = builder.build().start(); } catch (IOException ex) { @@ -261,8 +267,8 @@ public void setUp() { TestServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor); asyncStub = TestServiceGrpc.newStub(channel).withInterceptors(tracerSetupInterceptor); requestHeadersCapture.set(null); - clientStatsCtxFactory.rolloverRecords(); - serverStatsCtxFactory.rolloverRecords(); + clientStatsRecorder.rolloverRecords(); + serverStatsRecorder.rolloverRecords(); serverStreamTracers.clear(); } @@ -276,8 +282,16 @@ public void tearDown() throws Exception { protected abstract ManagedChannel createChannel(); - protected final StatsContextFactory getClientStatsFactory() { - return clientStatsCtxFactory; + protected final StatsRecorder getClientStatsFactory() { + return clientStatsRecorder; + } + + protected final Tagger getTagger() { + return tagger; + } + + protected final TagContextBinarySerializer getTagContextBinarySerializer() { + return tagContextBinarySerializer; } /** @@ -706,7 +720,7 @@ public void cancelAfterBegin() throws Exception { // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be // recorded. The tracer stats rely on the stream being created, which is not always the case // in this test. Therefore we don't check the tracer stats. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/StreamingInputCall", Status.CANCELLED.getCode()); @@ -1029,7 +1043,7 @@ public void deadlineExceeded() throws Exception { if (metricsExpected()) { // Stream may not have been created before deadline is exceeded, thus we don't test the tracer // stats. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/StreamingOutputCall", Status.Code.DEADLINE_EXCEEDED); @@ -1062,7 +1076,7 @@ public void deadlineExceededServerStreaming() throws Exception { if (metricsExpected()) { // Stream may not have been created when deadline is exceeded, thus we don't check tracer // stats. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/StreamingOutputCall", Status.Code.DEADLINE_EXCEEDED); @@ -1086,7 +1100,7 @@ public void deadlineInPast() throws Exception { // recorded. The tracer stats rely on the stream being created, which is not the case if // deadline is exceeded before the call is created. Therefore we don't check the tracer stats. if (metricsExpected()) { - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode()); @@ -1104,7 +1118,7 @@ public void deadlineInPast() throws Exception { } assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); if (metricsExpected()) { - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode()); @@ -1347,9 +1361,9 @@ public void censusContextsPropagated() { Span clientParentSpan = MockableSpan.generateRandomSpan(new Random()); Context ctx = Context.ROOT.withValues( - STATS_CONTEXT_KEY, - clientStatsCtxFactory.getDefault().with( - StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")), + TAG_CONTEXT_KEY, + tagger.emptyBuilder().put( + StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")).build(), ContextUtils.CONTEXT_SPAN_KEY, clientParentSpan); Context origCtx = ctx.attach(); @@ -1358,7 +1372,7 @@ public void censusContextsPropagated() { Context serverCtx = contextCapture.get(); assertNotNull(serverCtx); - FakeStatsContext statsCtx = (FakeStatsContext) STATS_CONTEXT_KEY.get(serverCtx); + FakeTagContext statsCtx = (FakeTagContext) TAG_CONTEXT_KEY.get(serverCtx); assertNotNull(statsCtx); Map tags = statsCtx.getTags(); boolean tagFound = false; @@ -1480,7 +1494,7 @@ public void timeoutOnSleepingServer() throws Exception { // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be // recorded. The tracer stats rely on the stream being created, which is not always the case // in this test, thus we will not check that. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/FullDuplexCall", Status.DEADLINE_EXCEEDED.getCode()); @@ -1747,7 +1761,7 @@ private void assertClientStatsTrace(String method, Status.Code code, if (metricsExpected()) { // CensusStreamTracerModule records final status in interceptor, which is guaranteed to be // done before application receives status. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(); checkTags(clientRecord, false, method, code); if (requests != null && responses != null) { @@ -1779,7 +1793,7 @@ private void assertServerStatsTrace(String method, Status.Code code, try { // On the server, the stats is finalized in ServerStreamListener.closed(), which can be // run after the client receives the final status. So we use a timeout. - serverRecord = serverStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + serverRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1847,12 +1861,12 @@ private static void checkTags( MetricsRecord record, boolean server, String methodName, Status.Code status) { assertNotNull("record is not null", record); TagValue methodNameTag = record.tags.get( - server ? RpcConstants.RPC_SERVER_METHOD : RpcConstants.RPC_CLIENT_METHOD); + server ? RpcMeasureConstants.RPC_SERVER_METHOD : RpcMeasureConstants.RPC_CLIENT_METHOD); assertNotNull("method name tagged", methodNameTag); - assertEquals("method names match", methodName, methodNameTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); + assertEquals("method names match", methodName, methodNameTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); assertNotNull("status tagged", statusTag); - assertEquals(status.toString(), statusTag.toString()); + assertEquals(status.toString(), statusTag.asString()); } /** @@ -1905,33 +1919,41 @@ private void checkCensus(MetricsRecord record, boolean server, } if (server && serverInProcess()) { assertEquals( - requests.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_COUNT)); + requests.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); + assertEquals( + responses.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); + assertEquals( + uncompressedRequestsSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); assertEquals( - responses.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_COUNT)); - assertEquals(uncompressedRequestsSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(uncompressedResponsesSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY)); + uncompressedResponsesSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); // It's impossible to get the expected wire sizes because it may be compressed, so we just // check if they are recorded. - assertNotNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); } if (!server) { assertEquals( - requests.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); + requests.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + responses.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals( + uncompressedRequestsSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); assertEquals( - responses.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertEquals(uncompressedRequestsSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(uncompressedResponsesSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + uncompressedResponsesSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); // It's impossible to get the expected wire sizes because it may be compressed, so we just // check if they are recorded. - assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 64bbd9e86ef4..e67b0d63f24d 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -365,7 +365,8 @@ protected ManagedChannel createChannel() { } builder = okBuilder; } - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java index 50cedf649153..44b7bc83d9a2 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java @@ -48,7 +48,8 @@ protected ManagedChannel createChannel() { NettyChannelBuilder builder = NettyChannelBuilder.forAddress("localhost", getPort()) .negotiationType(NegotiationType.PLAINTEXT) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java index ff4f0989d8dd..975ed86a276f 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java @@ -59,7 +59,8 @@ protected ManagedChannel createChannel() { .channelType(LocalChannel.class) .flowControlWindow(65 * 1024) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java index eeb406173d5e..daefddf2df58 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java @@ -75,7 +75,8 @@ protected ManagedChannel createChannel() { .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE) .sslProvider(SslProvider.OPENSSL) .build()); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } catch (Exception ex) { throw new RuntimeException(ex); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java index b3e9a1d75dca..1c44f3b28b7b 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java @@ -105,7 +105,8 @@ private OkHttpChannelBuilder createChannelBuilder() { .build()) .overrideAuthority(GrpcUtil.authorityFromHostAndPort( TestUtils.TEST_SERVER_HOST, getPort())); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); try { builder.sslSocketFactory(TestUtils.newSslSocketFactoryForCa(Platform.get().getProvider(), TestUtils.loadCert("ca.pem"))); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java index 786ed46bdae8..54b2b8b8d7db 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java @@ -44,7 +44,8 @@ public static void stopServer() { @Override protected ManagedChannel createChannel() { InProcessChannelBuilder builder = InProcessChannelBuilder.forName(SERVER_NAME); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java index 4034d5b962d3..56268f7a33a7 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java @@ -172,7 +172,8 @@ public void onHeaders(Metadata headers) { } }) .usePlaintext(true); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } diff --git a/testing-proto/src/test/java/io/grpc/testing/protobuf/SimpleServiceTest.java b/testing-proto/src/test/java/io/grpc/testing/protobuf/SimpleServiceTest.java index a6381cccb021..b058b472f6f5 100644 --- a/testing-proto/src/test/java/io/grpc/testing/protobuf/SimpleServiceTest.java +++ b/testing-proto/src/test/java/io/grpc/testing/protobuf/SimpleServiceTest.java @@ -76,6 +76,11 @@ public void registerSampledMethodsForTracing() throws Exception { expectedSpans.add(generateTraceSpanName(true, methodName)); } + // The call to sleep is required because OpenCensus now registers span names asynchronously. It + // will be removed soon, because https://github.com/grpc/grpc-java/pull/3627 updates gRPC for + // the new asynchronous behavior and removes this test. + Thread.sleep(1000); + SampledSpanStore sampledStore = Tracing.getExportComponent().getSampledSpanStore(); Set registeredSpans = sampledStore.getRegisteredSpanNamesForCollection(); assertThat(registeredSpans).containsAllIn(expectedSpans); diff --git a/testing/src/main/java/io/grpc/internal/TestingAccessor.java b/testing/src/main/java/io/grpc/internal/TestingAccessor.java index 71f3f5d7349b..b03967e7686e 100644 --- a/testing/src/main/java/io/grpc/internal/TestingAccessor.java +++ b/testing/src/main/java/io/grpc/internal/TestingAccessor.java @@ -16,26 +16,34 @@ package io.grpc.internal; -import com.google.instrumentation.stats.StatsContextFactory; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; /** * Test helper that allows accessing package-private stuff. */ public final class TestingAccessor { /** - * Sets a custom {@link StatsContextFactory} for tests. + * Sets a custom stats implementation for tests. */ - public static void setStatsContextFactory( - AbstractManagedChannelImplBuilder builder, StatsContextFactory factory) { - builder.statsContextFactory(factory); + public static void setStatsImplementation( + AbstractManagedChannelImplBuilder builder, + Tagger tagger, + TagContextBinarySerializer tagCtxSerializer, + StatsRecorder statsRecorder) { + builder.statsImplementation(tagger, tagCtxSerializer, statsRecorder); } /** - * Sets a custom {@link StatsContextFactory} for tests. + * Sets a custom stats implementation for tests. */ - public static void setStatsContextFactory( - AbstractServerImplBuilder builder, StatsContextFactory factory) { - builder.statsContextFactory(factory); + public static void setStatsImplementation( + AbstractServerImplBuilder builder, + Tagger tagger, + TagContextBinarySerializer tagCtxSerializer, + StatsRecorder statsRecorder) { + builder.statsImplementation(tagger, tagCtxSerializer, statsRecorder); } private TestingAccessor() { diff --git a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java index 441bef3c584f..d8faa10aa3e6 100644 --- a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java +++ b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java @@ -19,15 +19,23 @@ import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; -import com.google.instrumentation.stats.MeasurementDescriptor; -import com.google.instrumentation.stats.MeasurementMap; -import com.google.instrumentation.stats.MeasurementValue; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; -import com.google.instrumentation.stats.TagKey; -import com.google.instrumentation.stats.TagValue; -import io.grpc.internal.IoUtils; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; +import io.opencensus.common.Scope; +import io.opencensus.stats.Measure; +import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.Tag; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagContextParseException; +import io.opencensus.tags.unsafe.ContextUtils; import io.opencensus.trace.Annotation; import io.opencensus.trace.AttributeValue; import io.opencensus.trace.EndSpanOptions; @@ -40,10 +48,8 @@ import io.opencensus.trace.SpanId; import io.opencensus.trace.TraceId; import io.opencensus.trace.TraceOptions; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -57,10 +63,12 @@ private StatsTestUtils() { } public static class MetricsRecord { + public final ImmutableMap tags; - public final MeasurementMap metrics; + public final ImmutableMap metrics; - private MetricsRecord(ImmutableMap tags, MeasurementMap metrics) { + private MetricsRecord( + ImmutableMap tags, ImmutableMap metrics) { this.tags = tags; this.metrics = metrics; } @@ -69,10 +77,16 @@ private MetricsRecord(ImmutableMap tags, MeasurementMap metric * Returns the value of a metric, or {@code null} if not found. */ @Nullable - public Double getMetric(MeasurementDescriptor metricName) { - for (MeasurementValue m : metrics) { - if (m.getMeasurement().equals(metricName)) { - return m.getValue(); + public Double getMetric(Measure measure) { + for (Map.Entry m : metrics.entrySet()) { + if (m.getKey().equals(measure)) { + Number value = m.getValue(); + if (value instanceof Double) { + return (Double) value; + } else if (value instanceof Long) { + return (double) (Long) value; + } + throw new AssertionError("Unexpected measure value type: " + value.getClass().getName()); } } return null; @@ -81,9 +95,9 @@ public Double getMetric(MeasurementDescriptor metricName) { /** * Returns the value of a metric converted to long, or throw if not found. */ - public long getMetricAsLongOrFail(MeasurementDescriptor metricName) { - Double doubleValue = getMetric(metricName); - checkNotNull(doubleValue, "Metric not found: %s", metricName.toString()); + public long getMetricAsLongOrFail(Measure measure) { + Double doubleValue = getMetric(measure); + checkNotNull(doubleValue, "Measure not found: %s", measure.getName()); long longValue = (long) (Math.abs(doubleValue) + 0.0001); if (doubleValue < 0) { longValue = -longValue; @@ -93,7 +107,7 @@ public long getMetricAsLongOrFail(MeasurementDescriptor metricName) { } /** - * This tag will be propagated by {@link FakeStatsContextFactory} on the wire. + * This tag will be propagated by {@link FakeTagger} on the wire. */ public static final TagKey EXTRA_TAG = TagKey.create("/rpc/test/extratag"); @@ -101,31 +115,21 @@ public long getMetricAsLongOrFail(MeasurementDescriptor metricName) { private static final String NO_EXTRA_TAG_HEADER_VALUE_PREFIX = "noextratag"; /** - * A factory that makes fake {@link StatsContext}s and saves the created contexts to be - * accessible from {@link #pollContextOrFail}. The contexts it has created would save metrics - * records to be accessible from {@link #pollRecord()} and {@link #pollRecord(long, TimeUnit)}, - * until {@link #rolloverRecords} is called. + * A {@link Tagger} implementation that saves metrics records to be accessible from {@link + * #pollRecord()} and {@link #pollRecord(long, TimeUnit)}, until {@link #rolloverRecords} is + * called. */ - public static final class FakeStatsContextFactory extends StatsContextFactory { + public static final class FakeStatsRecorder extends StatsRecorder { + private BlockingQueue records; - public final BlockingQueue contexts = - new LinkedBlockingQueue(); - private final FakeStatsContext defaultContext; - /** - * Constructor. - */ - public FakeStatsContextFactory() { - rolloverRecords(); - defaultContext = new FakeStatsContext(ImmutableMap.of(), this); - // The records on the default context is not visible from pollRecord(), just like it's - // not visible from pollContextOrFail() either. + public FakeStatsRecorder() { rolloverRecords(); } - public StatsContext pollContextOrFail() { - StatsContext cc = contexts.poll(); - return checkNotNull(cc); + @Override + public StatsRecord newRecord() { + return new FakeStatsRecord(this); } public MetricsRecord pollRecord() { @@ -136,31 +140,8 @@ public MetricsRecord pollRecord(long timeout, TimeUnit unit) throws InterruptedE return getCurrentRecordSink().poll(timeout, unit); } - @Override - public StatsContext deserialize(InputStream buffer) throws IOException { - String serializedString; - try { - serializedString = new String(IoUtils.toByteArray(buffer), UTF_8); - } catch (IOException e) { - throw new RuntimeException(e); - } - if (serializedString.startsWith(EXTRA_TAG_HEADER_VALUE_PREFIX)) { - return getDefault().with(EXTRA_TAG, - TagValue.create(serializedString.substring(EXTRA_TAG_HEADER_VALUE_PREFIX.length()))); - } else if (serializedString.startsWith(NO_EXTRA_TAG_HEADER_VALUE_PREFIX)) { - return getDefault(); - } else { - throw new IOException("Malformed value"); - } - } - - @Override - public FakeStatsContext getDefault() { - return defaultContext; - } - /** - * Disconnect this factory with the contexts it has created so far. The records from those + * Disconnect this tagger with the contexts it has created so far. The records from those * contexts will not show up in {@link #pollRecord}. Useful for isolating the records between * test cases. */ @@ -174,88 +155,171 @@ private synchronized BlockingQueue getCurrentRecordSink() { } } - public static final class FakeStatsContext extends StatsContext { - private final ImmutableMap tags; - private final FakeStatsContextFactory factory; - private final BlockingQueue recordSink; + public static final class FakeTagger extends Tagger { - private FakeStatsContext(ImmutableMap tags, - FakeStatsContextFactory factory) { - this.tags = tags; - this.factory = factory; - this.recordSink = factory.getCurrentRecordSink(); + @Override + public FakeTagContext empty() { + return FakeTagContext.EMPTY; } - public Map getTags() { - return tags; + @Override + public TagContext getCurrentTagContext() { + return ContextUtils.TAG_CONTEXT_KEY.get(); } @Override - public Builder builder() { - return new FakeStatsContextBuilder(this); + public TagContextBuilder emptyBuilder() { + return new FakeTagContextBuilder(ImmutableMap.of()); } @Override - public StatsContext record(MeasurementMap metrics) { - recordSink.add(new MetricsRecord(tags, metrics)); - return this; + public FakeTagContextBuilder toBuilder(TagContext tags) { + return new FakeTagContextBuilder(getTags(tags)); } @Override - public void serialize(OutputStream os) { - TagValue extraTagValue = tags.get(EXTRA_TAG); - try { - if (extraTagValue == null) { - os.write(NO_EXTRA_TAG_HEADER_VALUE_PREFIX.getBytes(UTF_8)); - } else { - os.write((EXTRA_TAG_HEADER_VALUE_PREFIX + extraTagValue.toString()).getBytes(UTF_8)); - } - } catch (IOException e) { - throw new RuntimeException(e); - } + public TagContextBuilder currentBuilder() { + throw new UnsupportedOperationException(); } @Override - public String toString() { - return "[tags=" + tags + "]"; + public Scope withTagContext(TagContext tags) { + throw new UnsupportedOperationException(); } + } + + public static final class FakeTagContextBinarySerializer extends TagContextBinarySerializer { + + private final FakeTagger tagger = new FakeTagger(); @Override - public boolean equals(Object other) { - if (!(other instanceof FakeStatsContext)) { - return false; + public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { + String serializedString = new String(bytes, UTF_8); + if (serializedString.startsWith(EXTRA_TAG_HEADER_VALUE_PREFIX)) { + return tagger.emptyBuilder() + .put(EXTRA_TAG, + TagValue.create(serializedString.substring(EXTRA_TAG_HEADER_VALUE_PREFIX.length()))) + .build(); + } else if (serializedString.startsWith(NO_EXTRA_TAG_HEADER_VALUE_PREFIX)) { + return tagger.empty(); + } else { + throw new TagContextParseException("Malformed value"); } - FakeStatsContext otherCtx = (FakeStatsContext) other; - return tags.equals(otherCtx.tags); } @Override - public int hashCode() { - return tags.hashCode(); + public byte[] toByteArray(TagContext tags) { + TagValue extraTagValue = getTags(tags).get(EXTRA_TAG); + if (extraTagValue == null) { + return NO_EXTRA_TAG_HEADER_VALUE_PREFIX.getBytes(UTF_8); + } else { + return (EXTRA_TAG_HEADER_VALUE_PREFIX + extraTagValue.asString()).getBytes(UTF_8); + } } } - private static class FakeStatsContextBuilder extends StatsContext.Builder { - private final ImmutableMap.Builder tagsBuilder = ImmutableMap.builder(); - private final FakeStatsContext base; + public static final class FakeStatsRecord extends StatsRecord { + + private final BlockingQueue recordSink; + public final Map metrics = Maps.newHashMap(); + + private FakeStatsRecord(FakeStatsRecorder statsRecorder) { + this.recordSink = statsRecorder.getCurrentRecordSink(); + } - private FakeStatsContextBuilder(FakeStatsContext base) { - this.base = base; - tagsBuilder.putAll(base.tags); + @Override + public StatsRecord put(Measure.MeasureDouble measure, double value) { + metrics.put(measure, value); + return this; } @Override - public StatsContext.Builder set(TagKey key, TagValue value) { + public StatsRecord put(Measure.MeasureLong measure, long value) { + metrics.put(measure, value); + return this; + } + + @Override + public void record(TagContext tags) { + recordSink.add(new MetricsRecord(getTags(tags), ImmutableMap.copyOf(metrics))); + } + + @Override + public void record() { + throw new UnsupportedOperationException(); + } + } + + public static final class FakeTagContext extends TagContext { + + private static final FakeTagContext EMPTY = + new FakeTagContext(ImmutableMap.of()); + + private final ImmutableMap tags; + + private FakeTagContext(ImmutableMap tags) { + this.tags = tags; + } + + public ImmutableMap getTags() { + return tags; + } + + @Override + public String toString() { + return "[tags=" + tags + "]"; + } + + @Override + protected Iterator getIterator() { + return Iterators.transform( + tags.entrySet().iterator(), + new Function, Tag>() { + @Override + public Tag apply(@Nullable Map.Entry entry) { + return Tag.create(entry.getKey(), entry.getValue()); + } + }); + } + } + + public static class FakeTagContextBuilder extends TagContextBuilder { + + private final Map tagsBuilder = Maps.newHashMap(); + + private FakeTagContextBuilder(Map tags) { + tagsBuilder.putAll(tags); + } + + @Override + public TagContextBuilder put(TagKey key, TagValue value) { tagsBuilder.put(key, value); return this; } @Override - public StatsContext build() { - FakeStatsContext context = new FakeStatsContext(tagsBuilder.build(), base.factory); - base.factory.contexts.add(context); + public TagContextBuilder remove(TagKey key) { + tagsBuilder.remove(key); + return this; + } + + @Override + public TagContext build() { + FakeTagContext context = new FakeTagContext(ImmutableMap.copyOf(tagsBuilder)); return context; } + + @Override + public Scope buildScoped() { + throw new UnsupportedOperationException(); + } + } + + // This method handles the default TagContext, which isn't an instance of FakeTagContext. + private static ImmutableMap getTags(TagContext tags) { + return tags instanceof FakeTagContext + ? ((FakeTagContext) tags).getTags() + : ImmutableMap.of(); } // TODO(bdrutu): Remove this class after OpenCensus releases support for this class.