From f03ae781337b9022e781d8d60d3586306efe371f Mon Sep 17 00:00:00 2001 From: Kristen Kozak Date: Tue, 31 Oct 2017 14:56:04 -0700 Subject: [PATCH 1/8] core: use new OpenCensus stats/tagging API. This commit updates gRPC core to use io.opencensus:opencensus-api and io.opencensus:opencensus-contrib-grpc-metrics instead of com.google.instrumentation:instrumentation-api for stats and tagging. The gRPC Monitoring Service continues to use instrumentation-api. The main changes affecting gRPC: - The StatsContextFactory is replaced by three objects, StatsRecorder, Tagger, and TagContextBinarySerializer. - The StatsRecorder, Tagger, and TagContextBinarySerializer are never null, but the objects are no-ops when the OpenCensus implementation is not available. This commit includes changes written by @songy23 and @sebright. --- build.gradle | 1 + core/build.gradle | 6 + .../AbstractManagedChannelImplBuilder.java | 53 +++- .../internal/AbstractServerImplBuilder.java | 51 +++- .../io/grpc/internal/CensusStatsModule.java | 144 ++++----- ...AbstractManagedChannelImplBuilderTest.java | 64 +++- .../AbstractServerImplBuilderTest.java | 63 +++- .../io/grpc/internal/CensusModulesTest.java | 245 ++++++++------- .../integration/AbstractInteropTest.java | 124 ++++---- .../integration/TestServiceClient.java | 3 +- .../integration/AutoWindowSizingOnTest.java | 3 +- .../Http2NettyLocalChannelTest.java | 3 +- .../testing/integration/Http2NettyTest.java | 3 +- .../testing/integration/Http2OkHttpTest.java | 3 +- .../testing/integration/InProcessTest.java | 3 +- .../integration/TransportCompressionTest.java | 3 +- .../io/grpc/internal/TestingAccessor.java | 26 +- .../grpc/internal/testing/StatsTestUtils.java | 284 +++++++++++------- 18 files changed, 683 insertions(+), 399 deletions(-) diff --git a/build.gradle b/build.gradle index ef6d1726c73..7a39d929c1c 100644 --- a/build.gradle +++ b/build.gradle @@ -192,6 +192,7 @@ subprojects { okhttp: 'com.squareup.okhttp:okhttp:2.5.0', okio: 'com.squareup.okio:okio:1.6.0', opencensus_api: 'io.opencensus:opencensus-api:0.8.0', + opencensus_contrib_grpc_metrics: 'io.opencensus:opencensus-contrib-grpc-metrics:0.8.0', opencensus_impl: 'io.opencensus:opencensus-impl:0.8.0', instrumentation_api: 'com.google.instrumentation:instrumentation-api:0.4.3', protobuf: "com.google.protobuf:protobuf-java:${protobufVersion}", diff --git a/core/build.gradle b/core/build.gradle index b6cd5f51fac..74a31f3bdd0 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -19,6 +19,12 @@ dependencies { // we'll always be more up-to-date exclude group: 'io.grpc', module: 'grpc-context' } + compile (libraries.opencensus_contrib_grpc_metrics) { + // prefer 3.0.0 from libraries instead of 3.0.1 + exclude group: 'com.google.code.findbugs', module: 'jsr305' + // we'll always be more up-to-date + exclude group: 'io.grpc', module: 'grpc-context' + } testCompile project(':grpc-testing') diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 471f8292b9a..75dad9290ff 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -21,8 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; -import com.google.instrumentation.stats.Stats; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.Attributes; import io.grpc.ClientInterceptor; import io.grpc.CompressorRegistry; @@ -34,6 +32,11 @@ import io.grpc.NameResolver; import io.grpc.NameResolverProvider; import io.grpc.PickFirstBalancerFactory; +import io.opencensus.stats.Stats; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; +import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.trace.Tracing; import java.net.SocketAddress; import java.net.URI; @@ -152,7 +155,13 @@ protected final int maxInboundMessageSize() { private boolean tracingEnabled = true; @Nullable - private StatsContextFactory statsFactory; + private Tagger tagger; + + @Nullable + private TagContextBinarySerializer tagCtxSerializer; + + @Nullable + private StatsRecorder statsRecorder; protected AbstractManagedChannelImplBuilder(String target) { this.target = Preconditions.checkNotNull(target, "target"); @@ -287,8 +296,11 @@ public final T idleTimeout(long value, TimeUnit unit) { * Override the default stats implementation. */ @VisibleForTesting - protected final T statsContextFactory(StatsContextFactory statsFactory) { - this.statsFactory = statsFactory; + protected final T statsImplementation( + Tagger tagger, TagContextBinarySerializer tagCtxSerializer, StatsRecorder statsRecorder) { + this.tagger = tagger; + this.tagCtxSerializer = tagCtxSerializer; + this.statsRecorder = statsRecorder; return thisT(); } @@ -346,15 +358,28 @@ final List getEffectiveInterceptors() { List effectiveInterceptors = new ArrayList(this.interceptors); if (statsEnabled) { - StatsContextFactory statsCtxFactory = - this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory(); - if (statsCtxFactory != null) { - CensusStatsModule censusStats = - new CensusStatsModule(statsCtxFactory, GrpcUtil.STOPWATCH_SUPPLIER, true, recordStats); - // First interceptor runs last (see ClientInterceptors.intercept()), so that no - // other interceptor can override the tracer factory we set in CallOptions. - effectiveInterceptors.add(0, censusStats.getClientInterceptor()); - } + Tagger tagger = this.tagger != null ? this.tagger : Tags.getTagger(); + TagContextBinarySerializer tagCtxSerializer = + this.tagCtxSerializer != null + ? this.tagCtxSerializer + : Tags.getTagPropagationComponent().getBinarySerializer(); + StatsRecorder statsRecorder = + this.statsRecorder != null ? this.statsRecorder : Stats.getStatsRecorder(); + // // TODO: How do we check whether stats is enabled, now that the StatsRecorder is always + // // non-null? Uncommenting this line causes test failures. + // if (Stats.getState() == StatsCollectionState.ENABLED) { + CensusStatsModule censusStats = + new CensusStatsModule( + tagger, + tagCtxSerializer, + statsRecorder, + GrpcUtil.STOPWATCH_SUPPLIER, + true, + recordStats); + // First interceptor runs last (see ClientInterceptors.intercept()), so that no + // other interceptor can override the tracer factory we set in CallOptions. + effectiveInterceptors.add(0, censusStats.getClientInterceptor()); + // } } if (tracingEnabled) { CensusTracingModule censusTracing = diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 11c1a9ac4ad..53576e36f4e 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -20,8 +20,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; -import com.google.instrumentation.stats.Stats; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.BindableService; import io.grpc.CompressorRegistry; import io.grpc.Context; @@ -36,6 +34,11 @@ import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; import io.grpc.ServerTransportFilter; +import io.opencensus.stats.Stats; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; +import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.trace.Tracing; import java.util.ArrayList; import java.util.Collections; @@ -97,7 +100,13 @@ public List getServices() { CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY; @Nullable - private StatsContextFactory statsFactory; + private Tagger tagger; + + @Nullable + private TagContextBinarySerializer tagCtxSerializer; + + @Nullable + private StatsRecorder statsRecorder; private boolean statsEnabled = true; private boolean recordStats = true; @@ -184,8 +193,13 @@ public final T compressorRegistry(CompressorRegistry registry) { * Override the default stats implementation. */ @VisibleForTesting - protected T statsContextFactory(StatsContextFactory statsFactory) { - this.statsFactory = statsFactory; + protected T statsImplementation( + final Tagger tagger, + TagContextBinarySerializer tagCtxSerializer, + StatsRecorder statsRecorder) { + this.tagger = tagger; + this.tagCtxSerializer = tagCtxSerializer; + this.statsRecorder = statsRecorder; return thisT(); } @@ -228,13 +242,26 @@ final List getTracerFactories() { ArrayList tracerFactories = new ArrayList(); if (statsEnabled) { - StatsContextFactory statsFactory = - this.statsFactory != null ? this.statsFactory : Stats.getStatsContextFactory(); - if (statsFactory != null) { - CensusStatsModule censusStats = - new CensusStatsModule(statsFactory, GrpcUtil.STOPWATCH_SUPPLIER, true, recordStats); - tracerFactories.add(censusStats.getServerTracerFactory()); - } + Tagger tagger = this.tagger != null ? this.tagger : Tags.getTagger(); + TagContextBinarySerializer tagCtxSerializer = + this.tagCtxSerializer != null + ? this.tagCtxSerializer + : Tags.getTagPropagationComponent().getBinarySerializer(); + StatsRecorder statsRecorder = + this.statsRecorder != null ? this.statsRecorder : Stats.getStatsRecorder(); + // // TODO: How do we check whether stats is enabled, now that the StatsRecorder is always + // // non-null? Uncommenting this line causes test failures. + // if (Stats.getState() == StatsCollectionState.ENABLED) { + CensusStatsModule censusStats = + new CensusStatsModule( + tagger, + tagCtxSerializer, + statsRecorder, + GrpcUtil.STOPWATCH_SUPPLIER, + true, + recordStats); + tracerFactories.add(censusStats.getServerTracerFactory()); + // } } if (tracingEnabled) { CensusTracingModule censusTracing = diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index 127b83d6c1d..a503d92eaf6 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -19,16 +19,11 @@ import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import static com.google.instrumentation.stats.ContextUtils.STATS_CONTEXT_KEY; +import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; -import com.google.instrumentation.stats.MeasurementMap; -import com.google.instrumentation.stats.RpcConstants; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; -import com.google.instrumentation.stats.TagValue; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -42,9 +37,13 @@ import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.StreamTracer; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -69,44 +68,42 @@ final class CensusStatsModule { private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1); private static final ClientTracer BLANK_CLIENT_TRACER = new ClientTracer(); - private final StatsContextFactory statsCtxFactory; + private final Tagger tagger; + private final StatsRecorder statsRecorder; private final Supplier stopwatchSupplier; @VisibleForTesting - final Metadata.Key statsHeader; + final Metadata.Key statsHeader; private final StatsClientInterceptor clientInterceptor = new StatsClientInterceptor(); private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory(); private final boolean propagateTags; private final boolean recordStats; CensusStatsModule( - final StatsContextFactory statsCtxFactory, Supplier stopwatchSupplier, + final Tagger tagger, + final TagContextBinarySerializer tagCtxSerializer, + StatsRecorder statsRecorder, Supplier stopwatchSupplier, boolean propagateTags, boolean recordStats) { - this.statsCtxFactory = checkNotNull(statsCtxFactory, "statsCtxFactory"); + this.tagger = checkNotNull(tagger, "tagger"); + this.statsRecorder = checkNotNull(statsRecorder, "statsRecorder"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.propagateTags = propagateTags; this.recordStats = recordStats; this.statsHeader = - Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller() { + Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller() { @Override - public byte[] toBytes(StatsContext context) { + public byte[] toBytes(TagContext context) { // TODO(carl-mastrangelo): currently we only make sure the correctness. We may need to // optimize out the allocation and copy in the future. - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - try { - context.serialize(buffer); - } catch (IOException e) { - throw new RuntimeException(e); - } - return buffer.toByteArray(); + return tagCtxSerializer.toByteArray(context); } @Override - public StatsContext parseBytes(byte[] serialized) { + public TagContext parseBytes(byte[] serialized) { try { - return statsCtxFactory.deserialize(new ByteArrayInputStream(serialized)); + return tagCtxSerializer.fromByteArray(serialized); } catch (Exception e) { logger.log(Level.FINE, "Failed to parse stats header", e); - return statsCtxFactory.getDefault(); + return tagger.empty(); } } }); @@ -116,7 +113,7 @@ public StatsContext parseBytes(byte[] serialized) { * Creates a {@link ClientCallTracer} for a new call. */ @VisibleForTesting - ClientCallTracer newClientCallTracer(StatsContext parentCtx, String fullMethodName) { + ClientCallTracer newClientCallTracer(TagContext parentCtx, String fullMethodName) { return new ClientCallTracer(this, parentCtx, fullMethodName); } @@ -203,9 +200,9 @@ static final class ClientCallTracer extends ClientStreamTracer.Factory { private final Stopwatch stopwatch; private volatile ClientTracer streamTracer; private volatile int callEnded; - private final StatsContext parentCtx; + private final TagContext parentCtx; - ClientCallTracer(CensusStatsModule module, StatsContext parentCtx, String fullMethodName) { + ClientCallTracer(CensusStatsModule module, TagContext parentCtx, String fullMethodName) { this.module = module; this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); @@ -222,7 +219,7 @@ public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadat "Are you creating multiple streams per call? This class doesn't yet support this case."); if (module.propagateTags) { headers.discardAll(module.statsHeader); - if (parentCtx != module.statsCtxFactory.getDefault()) { + if (!module.tagger.empty().equals(parentCtx)) { headers.put(module.statsHeader, parentCtx); } } @@ -248,27 +245,29 @@ void callEnded(Status status) { if (tracer == null) { tracer = BLANK_CLIENT_TRACER; } - MeasurementMap.Builder builder = MeasurementMap.builder() + StatsRecord statsRecord = module.statsRecorder.newRecord() // The metrics are in double - .put(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI) - .put(RpcConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount) - .put(RpcConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount) - .put(RpcConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize) - .put(RpcConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize) + .put(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI) + .put(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount) + .put(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT, tracer.inboundMessageCount) + .put(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES, tracer.outboundWireSize) + .put(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES, tracer.inboundWireSize) .put( - RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, + RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES, tracer.outboundUncompressedSize) .put( - RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, + RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, tracer.inboundUncompressedSize); if (!status.isOk()) { - builder.put(RpcConstants.RPC_CLIENT_ERROR_COUNT, 1.0); + statsRecord.put(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT, 1); } - parentCtx - .with( - RpcConstants.RPC_CLIENT_METHOD, TagValue.create(fullMethodName), - RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) - .record(builder.build()); + statsRecord.record( + module + .tagger + .toBuilder(parentCtx) + .put(RpcMeasureConstants.RPC_CLIENT_METHOD, TagValue.create(fullMethodName)) + .put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) + .build()); } } @@ -291,10 +290,10 @@ private static final class ServerTracer extends ServerStreamTracer { private final CensusStatsModule module; private final String fullMethodName; @Nullable - private final StatsContext parentCtx; + private final TagContext parentCtx; private volatile int streamClosed; private final Stopwatch stopwatch; - private final StatsContextFactory statsCtxFactory; + private final Tagger tagger; private volatile long outboundMessageCount; private volatile long inboundMessageCount; private volatile long outboundWireSize; @@ -305,14 +304,14 @@ private static final class ServerTracer extends ServerStreamTracer { ServerTracer( CensusStatsModule module, String fullMethodName, - StatsContext parentCtx, + TagContext parentCtx, Supplier stopwatchSupplier, - StatsContextFactory statsCtxFactory) { + Tagger tagger) { this.module = module; this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.stopwatch = stopwatchSupplier.get().start(); - this.statsCtxFactory = statsCtxFactory; + this.tagger = tagger; } @Override @@ -361,28 +360,31 @@ public void streamClosed(Status status) { } stopwatch.stop(); long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); - MeasurementMap.Builder builder = MeasurementMap.builder() + StatsRecord statsRecord = module.statsRecorder.newRecord() // The metrics are in double - .put(RpcConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI) - .put(RpcConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount) - .put(RpcConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount) - .put(RpcConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize) - .put(RpcConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize) - .put(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES, outboundUncompressedSize) - .put(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES, inboundUncompressedSize); + .put(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI) + .put(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount) + .put(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT, inboundMessageCount) + .put(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES, outboundWireSize) + .put(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES, inboundWireSize) + .put(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES, outboundUncompressedSize) + .put(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES, inboundUncompressedSize); if (!status.isOk()) { - builder.put(RpcConstants.RPC_SERVER_ERROR_COUNT, 1.0); + statsRecord.put(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT, 1); } - StatsContext ctx = firstNonNull(parentCtx, statsCtxFactory.getDefault()); - ctx - .with(RpcConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) - .record(builder.build()); + TagContext ctx = firstNonNull(parentCtx, tagger.empty()); + statsRecord.record( + module + .tagger + .toBuilder(ctx) + .put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) + .build()); } @Override public Context filterContext(Context context) { - if (parentCtx != statsCtxFactory.getDefault()) { - return context.withValue(STATS_CONTEXT_KEY, parentCtx); + if (!tagger.empty().equals(parentCtx)) { + return context.withValue(TAG_CONTEXT_KEY, parentCtx); } return context; } @@ -392,13 +394,17 @@ public Context filterContext(Context context) { final class ServerTracerFactory extends ServerStreamTracer.Factory { @Override public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) { - StatsContext parentCtx = headers.get(statsHeader); + TagContext parentCtx = headers.get(statsHeader); if (parentCtx == null) { - parentCtx = statsCtxFactory.getDefault(); + parentCtx = tagger.empty(); } - parentCtx = parentCtx.with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(fullMethodName)); + parentCtx = + tagger + .toBuilder(parentCtx) + .put(RpcMeasureConstants.RPC_SERVER_METHOD, TagValue.create(fullMethodName)) + .build(); return new ServerTracer( - CensusStatsModule.this, fullMethodName, parentCtx, stopwatchSupplier, statsCtxFactory); + CensusStatsModule.this, fullMethodName, parentCtx, stopwatchSupplier, tagger); } } @@ -407,8 +413,8 @@ final class StatsClientInterceptor implements ClientInterceptor { @Override public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { - // New RPCs on client-side inherit the stats context from the current Context. - StatsContext parentCtx = statsCtxFactory.getCurrentStatsContext(); + // New RPCs on client-side inherit the tag context from the current Context. + TagContext parentCtx = tagger.getCurrentTagContext(); final ClientCallTracer tracerFactory = newClientCallTracer(parentCtx, method.getFullMethodName()); ClientCall call = diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index a874a7b0ec3..89d9efbe04c 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -27,8 +27,6 @@ import static org.mockito.Mockito.mock; import com.google.common.util.concurrent.MoreExecutors; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -38,7 +36,14 @@ import io.grpc.LoadBalancer; import io.grpc.MethodDescriptor; import io.grpc.NameResolver; -import java.io.InputStream; +import io.opencensus.common.Scope; +import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagContextParseException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; @@ -61,15 +66,56 @@ public ClientCall interceptCall( } }; - private static final StatsContextFactory DUMMY_STATS_FACTORY = - new StatsContextFactory() { + private static final Tagger DUMMY_TAGGER = + new Tagger() { @Override - public StatsContext deserialize(InputStream input) { + public TagContext empty() { throw new UnsupportedOperationException(); } @Override - public StatsContext getDefault() { + public TagContext getCurrentTagContext() { + throw new UnsupportedOperationException(); + } + + @Override + public TagContextBuilder emptyBuilder() { + throw new UnsupportedOperationException(); + } + + @Override + public TagContextBuilder toBuilder(TagContext tags) { + throw new UnsupportedOperationException(); + } + + @Override + public TagContextBuilder currentBuilder() { + throw new UnsupportedOperationException(); + } + + @Override + public Scope withTagContext(TagContext tags) { + throw new UnsupportedOperationException(); + } + }; + + private static final TagContextBinarySerializer DUMMY_TAG_CONTEXT_BINARY_SERIALIZER = + new TagContextBinarySerializer() { + @Override + public byte[] toByteArray(TagContext tags) { + throw new UnsupportedOperationException(); + } + + @Override + public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { + throw new UnsupportedOperationException(); + } + }; + + private static final StatsRecorder DUMMY_STATS_RECORDER = + new StatsRecorder() { + @Override + public StatsRecord newRecord() { throw new UnsupportedOperationException(); } }; @@ -346,12 +392,12 @@ public void idleTimeout() { static class Builder extends AbstractManagedChannelImplBuilder { Builder(String target) { super(target); - statsContextFactory(DUMMY_STATS_FACTORY); + statsImplementation(DUMMY_TAGGER, DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, DUMMY_STATS_RECORDER); } Builder(SocketAddress directServerAddress, String authority) { super(directServerAddress, authority); - statsContextFactory(DUMMY_STATS_FACTORY); + statsImplementation(DUMMY_TAGGER, DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, DUMMY_STATS_RECORDER); } @Override diff --git a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java index fed373a4b4b..145764cdcb6 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java @@ -19,12 +19,17 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; import io.grpc.Metadata; import io.grpc.ServerStreamTracer; +import io.opencensus.common.Scope; +import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagContextParseException; import java.io.File; -import java.io.InputStream; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; @@ -33,15 +38,57 @@ /** Unit tests for {@link AbstractServerImplBuilder}. */ @RunWith(JUnit4.class) public class AbstractServerImplBuilderTest { - private static final StatsContextFactory DUMMY_STATS_FACTORY = - new StatsContextFactory() { + + private static final Tagger DUMMY_TAGGER = + new Tagger() { + @Override + public TagContext empty() { + throw new UnsupportedOperationException(); + } + + @Override + public TagContext getCurrentTagContext() { + throw new UnsupportedOperationException(); + } + + @Override + public TagContextBuilder emptyBuilder() { + throw new UnsupportedOperationException(); + } + @Override - public StatsContext deserialize(InputStream input) { + public TagContextBuilder toBuilder(TagContext tags) { throw new UnsupportedOperationException(); } @Override - public StatsContext getDefault() { + public TagContextBuilder currentBuilder() { + throw new UnsupportedOperationException(); + } + + @Override + public Scope withTagContext(TagContext tags) { + throw new UnsupportedOperationException(); + } + }; + + private static final TagContextBinarySerializer DUMMY_TAG_CONTEXT_BINARY_SERIALIZER = + new TagContextBinarySerializer() { + @Override + public byte[] toByteArray(TagContext tags) { + throw new UnsupportedOperationException(); + } + + @Override + public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { + throw new UnsupportedOperationException(); + } + }; + + private static final StatsRecorder DUMMY_STATS_RECORDER = + new StatsRecorder() { + @Override + public StatsRecord newRecord() { throw new UnsupportedOperationException(); } }; @@ -97,7 +144,7 @@ public void getTracerFactories_disableBoth() { static class Builder extends AbstractServerImplBuilder { Builder() { - statsContextFactory(DUMMY_STATS_FACTORY); + statsImplementation(DUMMY_TAGGER, DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, DUMMY_STATS_RECORDER); } @Override diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java index 8828db15999..6ee1d996d80 100644 --- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java +++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java @@ -16,7 +16,7 @@ package io.grpc.internal; -import static com.google.instrumentation.stats.ContextUtils.STATS_CONTEXT_KEY; +import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -40,9 +40,6 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -import com.google.instrumentation.stats.RpcConstants; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.TagValue; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; @@ -58,9 +55,15 @@ import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.internal.testing.StatsTestUtils; -import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; import io.grpc.internal.testing.StatsTestUtils.MockableSpan; import io.grpc.testing.GrpcServerRule; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tags; import io.opencensus.trace.EndSpanOptions; import io.opencensus.trace.NetworkEvent; import io.opencensus.trace.NetworkEvent.Type; @@ -71,7 +74,6 @@ import io.opencensus.trace.propagation.BinaryFormat; import io.opencensus.trace.propagation.SpanContextParseException; import io.opencensus.trace.unsafe.ContextUtils; -import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.List; import java.util.Random; @@ -137,7 +139,10 @@ public String parse(InputStream stream) { method.toBuilder().setSampledToLocalTracing(true).build(); private final FakeClock fakeClock = new FakeClock(); - private final FakeStatsContextFactory statsCtxFactory = new FakeStatsContextFactory(); + private final FakeTagger tagger = new FakeTagger(); + private final FakeTagContextBinarySerializer tagCtxSerializer = + new FakeTagContextBinarySerializer(); + private final FakeStatsRecorder statsRecorder = new FakeStatsRecorder(); private final Random random = new Random(1234); private final Span fakeClientParentSpan = MockableSpan.generateRandomSpan(random); private final Span spyClientSpan = spy(MockableSpan.generateRandomSpan(random)); @@ -185,13 +190,14 @@ public void setUp() throws Exception { when(mockTracingPropagationHandler.fromByteArray(any(byte[].class))) .thenReturn(fakeClientSpanContext); censusStats = - new CensusStatsModule(statsCtxFactory, fakeClock.getStopwatchSupplier(), true, true); + new CensusStatsModule( + tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), true, true); censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler); } @After public void wrapUp() { - assertNull(statsCtxFactory.pollRecord()); + assertNull(statsRecorder.pollRecord()); } @Test @@ -204,7 +210,7 @@ public void clientInterceptorCustomTag() { testClientInterceptors(true); } - // Test that Census ClientInterceptors uses the StatsContext and Span out of the current Context + // Test that Census ClientInterceptors uses the TagContext and Span out of the current Context // to create the ClientCallTracer, and that it intercepts ClientCall.Listener.onClose() to call // ClientCallTracer.callEnded(). private void testClientInterceptors(boolean nonDefaultContext) { @@ -239,9 +245,9 @@ public ClientCall interceptCall( if (nonDefaultContext) { Context ctx = Context.ROOT.withValues( - STATS_CONTEXT_KEY, - statsCtxFactory.getDefault().with( - StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")), + TAG_CONTEXT_KEY, + tagger.emptyBuilder().put( + StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")).build(), ContextUtils.CONTEXT_SPAN_KEY, fakeClientParentSpan); Context origCtx = ctx.attach(); @@ -251,7 +257,7 @@ public ClientCall interceptCall( ctx.detach(origCtx); } } else { - assertNull(STATS_CONTEXT_KEY.get()); + assertEquals(Tags.getTagger().empty(), TAG_CONTEXT_KEY.get()); assertNull(ContextUtils.CONTEXT_SPAN_KEY.get()); call = interceptedChannel.newCall(method, CALL_OPTIONS); } @@ -269,7 +275,7 @@ public ClientCall interceptCall( // Make the call Metadata headers = new Metadata(); call.start(mockClientCallListener, headers); - assertNull(statsCtxFactory.pollRecord()); + assertNull(statsRecorder.pollRecord()); if (nonDefaultContext) { verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), same(fakeClientParentSpan)); @@ -291,15 +297,15 @@ public ClientCall interceptCall( assertEquals("No you don't", status.getDescription()); // The intercepting listener calls callEnded() on ClientCallTracer, which records to Census. - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.toString()); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.asString()); if (nonDefaultContext) { TagValue extraTag = record.tags.get(StatsTestUtils.EXTRA_TAG); - assertEquals("extra value", extraTag.toString()); + assertEquals("extra value", extraTag.asString()); } else { assertNull(record.tags.get(StatsTestUtils.EXTRA_TAG)); } @@ -316,7 +322,7 @@ public ClientCall interceptCall( @Test public void clientBasicStatsDefaultContext() { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName()); + censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName()); Metadata headers = new Metadata(); ClientStreamTracer tracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); @@ -343,24 +349,27 @@ public void clientBasicStatsDefaultContext() { tracer.streamClosed(Status.OK); callTracer.callEnded(Status.OK); - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoServerContent(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.OK.toString(), statusTag.toString()); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); - assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); - assertEquals(1128 + 865, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertEquals(33 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), statusTag.asString()); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); + assertEquals( + 1128 + 865, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals( + 33 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); assertEquals(67 + 552, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); assertEquals(30 + 100 + 16 + 24, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); } @Test @@ -424,29 +433,30 @@ public void clientTracingSampledToLocalSpanStore() { public void clientStreamNeverCreatedStillRecordStats() { CensusStatsModule.ClientCallTracer callTracer = censusStats.newClientCallTracer( - statsCtxFactory.getDefault(), method.getFullMethodName()); + tagger.empty(), method.getFullMethodName()); fakeClock.forwardTime(3000, MILLISECONDS); callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds")); - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoServerContent(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.toString()); - assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ERROR_COUNT)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); assertEquals(0, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertEquals(0, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals(0, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); assertEquals(0, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); - assertEquals(3000, record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals( + 3000, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); } @Test @@ -492,10 +502,16 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS // EXTRA_TAG is propagated by the FakeStatsContextFactory. Note that not all tags are // propagated. The StatsContextFactory decides which tags are to propagated. gRPC facilitates // the propagation by putting them in the headers. - StatsContext clientCtx = statsCtxFactory.getDefault().with( - StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897")); - CensusStatsModule census = new CensusStatsModule( - statsCtxFactory, fakeClock.getStopwatchSupplier(), propagate, recordStats); + TagContext clientCtx = tagger.emptyBuilder().put( + StatsTestUtils.EXTRA_TAG, TagValue.create("extra-tag-value-897")).build(); + CensusStatsModule census = + new CensusStatsModule( + tagger, + tagCtxSerializer, + statsRecorder, + fakeClock.getStopwatchSupplier(), + propagate, + recordStats); Metadata headers = new Metadata(); CensusStatsModule.ClientCallTracer callTracer = census.newClientCallTracer(clientCtx, method.getFullMethodName()); @@ -516,23 +532,25 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS Context serverContext = serverTracer.filterContext(Context.ROOT); // It also put clientCtx in the Context seen by the call handler assertEquals( - clientCtx.with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(method.getFullMethodName())), - STATS_CONTEXT_KEY.get(serverContext)); + tagger.toBuilder(clientCtx).put( + RpcMeasureConstants.RPC_SERVER_METHOD, + TagValue.create(method.getFullMethodName())).build(), + TAG_CONTEXT_KEY.get(serverContext)); // Verifies that the server tracer records the status with the propagated tag serverTracer.streamClosed(Status.OK); if (recordStats) { - StatsTestUtils.MetricsRecord serverRecord = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord serverRecord = statsRecorder.pollRecord(); assertNotNull(serverRecord); assertNoClientContent(serverRecord); - TagValue serverMethodTag = serverRecord.tags.get(RpcConstants.RPC_SERVER_METHOD); - assertEquals(method.getFullMethodName(), serverMethodTag.toString()); - TagValue serverStatusTag = serverRecord.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.OK.toString(), serverStatusTag.toString()); - assertNull(serverRecord.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT)); + TagValue serverMethodTag = serverRecord.tags.get(RpcMeasureConstants.RPC_SERVER_METHOD); + assertEquals(method.getFullMethodName(), serverMethodTag.asString()); + TagValue serverStatusTag = serverRecord.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), serverStatusTag.asString()); + assertNull(serverRecord.getMetric(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); TagValue serverPropagatedTag = serverRecord.tags.get(StatsTestUtils.EXTRA_TAG); - assertEquals("extra-tag-value-897", serverPropagatedTag.toString()); + assertEquals("extra-tag-value-897", serverPropagatedTag.asString()); } // Verifies that the client tracer factory uses clientCtx, which includes the custom tags, to @@ -540,27 +558,27 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS callTracer.callEnded(Status.OK); if (recordStats) { - StatsTestUtils.MetricsRecord clientRecord = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord clientRecord = statsRecorder.pollRecord(); assertNotNull(clientRecord); assertNoServerContent(clientRecord); - TagValue clientMethodTag = clientRecord.tags.get(RpcConstants.RPC_CLIENT_METHOD); - assertEquals(method.getFullMethodName(), clientMethodTag.toString()); - TagValue clientStatusTag = clientRecord.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.OK.toString(), clientStatusTag.toString()); - assertNull(clientRecord.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); + TagValue clientMethodTag = clientRecord.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + assertEquals(method.getFullMethodName(), clientMethodTag.asString()); + TagValue clientStatusTag = clientRecord.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.OK.toString(), clientStatusTag.asString()); + assertNull(clientRecord.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); TagValue clientPropagatedTag = clientRecord.tags.get(StatsTestUtils.EXTRA_TAG); - assertEquals("extra-tag-value-897", clientPropagatedTag.toString()); + assertEquals("extra-tag-value-897", clientPropagatedTag.asString()); } if (!recordStats) { - assertNull(statsCtxFactory.pollRecord()); + assertNull(statsRecorder.pollRecord()); } } @Test public void statsHeadersNotPropagateDefaultContext() { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName()); + censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName()); Metadata headers = new Metadata(); callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); assertFalse(headers.containsKey(censusStats.statsHeader)); @@ -573,7 +591,7 @@ public void statsHeaderMalformed() { Metadata.Key arbitraryStatsHeader = Metadata.Key.of("grpc-tags-bin", Metadata.BINARY_BYTE_MARSHALLER); try { - statsCtxFactory.deserialize(new ByteArrayInputStream(statsHeaderValue)); + tagCtxSerializer.fromByteArray(statsHeaderValue); fail("Should have thrown"); } catch (Exception e) { // Expected @@ -583,7 +601,7 @@ public void statsHeaderMalformed() { Metadata headers = new Metadata(); assertNull(headers.get(censusStats.statsHeader)); headers.put(arbitraryStatsHeader, statsHeaderValue); - assertSame(statsCtxFactory.getDefault(), headers.get(censusStats.statsHeader)); + assertSame(tagger.empty(), headers.get(censusStats.statsHeader)); } @Test @@ -646,10 +664,14 @@ public void serverBasicStatsNoHeaders() { tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); Context filteredContext = tracer.filterContext(Context.ROOT); - StatsContext statsCtx = STATS_CONTEXT_KEY.get(filteredContext); + TagContext statsCtx = TAG_CONTEXT_KEY.get(filteredContext); assertEquals( - statsCtxFactory.getDefault() - .with(RpcConstants.RPC_SERVER_METHOD, TagValue.create(method.getFullMethodName())), + tagger + .emptyBuilder() + .put( + RpcMeasureConstants.RPC_SERVER_METHOD, + TagValue.create(method.getFullMethodName())) + .build(), statsCtx); tracer.inboundMessage(0); @@ -673,24 +695,27 @@ public void serverBasicStatsNoHeaders() { tracer.streamClosed(Status.CANCELLED); - StatsTestUtils.MetricsRecord record = statsCtxFactory.pollRecord(); + StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoClientContent(record); - TagValue methodTag = record.tags.get(RpcConstants.RPC_SERVER_METHOD); - assertEquals(method.getFullMethodName(), methodTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); - assertEquals(Status.Code.CANCELLED.toString(), statusTag.toString()); - assertEquals(1, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_ERROR_COUNT)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_COUNT)); - assertEquals(1028 + 99, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); - assertEquals(1128 + 865, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); - assertEquals(2, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_COUNT)); - assertEquals(34 + 154, record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_BYTES)); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_SERVER_METHOD); + assertEquals(method.getFullMethodName(), methodTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); + assertEquals(Status.Code.CANCELLED.toString(), statusTag.asString()); + assertEquals(1, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); + assertEquals( + 1028 + 99, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); + assertEquals( + 1128 + 865, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + assertEquals(2, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); + assertEquals( + 34 + 154, record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); assertEquals(67 + 552, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); assertEquals(100 + 16 + 24, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_SERVER_LATENCY)); + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); } @Test @@ -803,27 +828,27 @@ public void generateTraceSpanName() { } private static void assertNoServerContent(StatsTestUtils.MetricsRecord record) { - assertNull(record.getMetric(RpcConstants.RPC_SERVER_ERROR_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_ELAPSED_TIME)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_SERVER_ELAPSED_TIME)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); } private static void assertNoClientContent(StatsTestUtils.MetricsRecord record) { - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ERROR_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertNull(record.getMetric(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_SERVER_ELAPSED_TIME)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); + assertNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); } private static class FakeServerCall extends ServerCall { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index 8b760a073cb..cae2185606f 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -17,8 +17,8 @@ package io.grpc.testing.integration; import static com.google.common.truth.Truth.assertThat; -import static com.google.instrumentation.stats.ContextUtils.STATS_CONTEXT_KEY; import static io.grpc.testing.integration.Messages.PayloadType.COMPRESSABLE; +import static io.opencensus.tags.unsafe.ContextUtils.TAG_CONTEXT_KEY; import static io.opencensus.trace.unsafe.ContextUtils.CONTEXT_SPAN_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -40,10 +40,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.util.concurrent.SettableFuture; -import com.google.instrumentation.stats.RpcConstants; -import com.google.instrumentation.stats.StatsContextFactory; -import com.google.instrumentation.stats.TagKey; -import com.google.instrumentation.stats.TagValue; import com.google.protobuf.BoolValue; import com.google.protobuf.ByteString; import com.google.protobuf.EmptyProtos.Empty; @@ -70,11 +66,13 @@ import io.grpc.auth.MoreCallCredentials; import io.grpc.internal.AbstractServerImplBuilder; import io.grpc.internal.GrpcUtil; -import io.grpc.internal.testing.StatsTestUtils.FakeStatsContext; -import io.grpc.internal.testing.StatsTestUtils.FakeStatsContextFactory; +import io.grpc.internal.testing.StatsTestUtils; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContext; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; import io.grpc.internal.testing.StatsTestUtils.MetricsRecord; import io.grpc.internal.testing.StatsTestUtils.MockableSpan; -import io.grpc.internal.testing.StatsTestUtils; import io.grpc.internal.testing.StreamRecorder; import io.grpc.internal.testing.TestClientStreamTracer; import io.grpc.internal.testing.TestServerStreamTracer; @@ -95,6 +93,12 @@ import io.grpc.testing.integration.Messages.StreamingInputCallResponse; import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; +import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.trace.Span; import io.opencensus.trace.unsafe.ContextUtils; import java.io.IOException; @@ -149,10 +153,11 @@ public abstract class AbstractInteropTest { new AtomicReference(); private static ScheduledExecutorService testServiceExecutor; private static Server server; - private static final FakeStatsContextFactory clientStatsCtxFactory = - new FakeStatsContextFactory(); - private static final FakeStatsContextFactory serverStatsCtxFactory = - new FakeStatsContextFactory(); + private static final FakeTagger tagger = new FakeTagger(); + private static final FakeTagContextBinarySerializer tagContextBinarySerializer = + new FakeTagContextBinarySerializer(); + private static final FakeStatsRecorder clientStatsRecorder = new FakeStatsRecorder(); + private static final FakeStatsRecorder serverStatsRecorder = new FakeStatsRecorder(); private static final LinkedBlockingQueue serverStreamTracers = new LinkedBlockingQueue(); @@ -208,7 +213,8 @@ protected static void startStaticServer( new TestServiceImpl(testServiceExecutor), allInterceptors)) .addStreamTracerFactory(serverStreamTracerFactory); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, serverStatsCtxFactory); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, tagger, tagContextBinarySerializer, serverStatsRecorder); try { server = builder.build().start(); } catch (IOException ex) { @@ -262,8 +268,8 @@ public void setUp() { TestServiceGrpc.newBlockingStub(channel).withInterceptors(tracerSetupInterceptor); asyncStub = TestServiceGrpc.newStub(channel).withInterceptors(tracerSetupInterceptor); requestHeadersCapture.set(null); - clientStatsCtxFactory.rolloverRecords(); - serverStatsCtxFactory.rolloverRecords(); + clientStatsRecorder.rolloverRecords(); + serverStatsRecorder.rolloverRecords(); serverStreamTracers.clear(); } @@ -277,8 +283,16 @@ public void tearDown() throws Exception { protected abstract ManagedChannel createChannel(); - protected final StatsContextFactory getClientStatsFactory() { - return clientStatsCtxFactory; + protected final StatsRecorder getClientStatsFactory() { + return clientStatsRecorder; + } + + protected final Tagger getTagger() { + return tagger; + } + + protected final TagContextBinarySerializer getTagContextBinarySerializer() { + return tagContextBinarySerializer; } /** @@ -707,7 +721,7 @@ public void cancelAfterBegin() throws Exception { // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be // recorded. The tracer stats rely on the stream being created, which is not always the case // in this test. Therefore we don't check the tracer stats. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/StreamingInputCall", Status.CANCELLED.getCode()); @@ -1030,7 +1044,7 @@ public void deadlineExceeded() throws Exception { if (metricsExpected()) { // Stream may not have been created before deadline is exceeded, thus we don't test the tracer // stats. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/StreamingOutputCall", Status.Code.DEADLINE_EXCEEDED); @@ -1063,7 +1077,7 @@ public void deadlineExceededServerStreaming() throws Exception { if (metricsExpected()) { // Stream may not have been created when deadline is exceeded, thus we don't check tracer // stats. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/StreamingOutputCall", Status.Code.DEADLINE_EXCEEDED); @@ -1087,7 +1101,7 @@ public void deadlineInPast() throws Exception { // recorded. The tracer stats rely on the stream being created, which is not the case if // deadline is exceeded before the call is created. Therefore we don't check the tracer stats. if (metricsExpected()) { - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode()); @@ -1105,7 +1119,7 @@ public void deadlineInPast() throws Exception { } assertStatsTrace("grpc.testing.TestService/EmptyCall", Status.Code.OK); if (metricsExpected()) { - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode()); @@ -1348,9 +1362,9 @@ public void censusContextsPropagated() { Span clientParentSpan = MockableSpan.generateRandomSpan(new Random()); Context ctx = Context.ROOT.withValues( - STATS_CONTEXT_KEY, - clientStatsCtxFactory.getDefault().with( - StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")), + TAG_CONTEXT_KEY, + tagger.emptyBuilder().put( + StatsTestUtils.EXTRA_TAG, TagValue.create("extra value")).build(), ContextUtils.CONTEXT_SPAN_KEY, clientParentSpan); Context origCtx = ctx.attach(); @@ -1359,7 +1373,7 @@ public void censusContextsPropagated() { Context serverCtx = contextCapture.get(); assertNotNull(serverCtx); - FakeStatsContext statsCtx = (FakeStatsContext) STATS_CONTEXT_KEY.get(serverCtx); + FakeTagContext statsCtx = (FakeTagContext) TAG_CONTEXT_KEY.get(serverCtx); assertNotNull(statsCtx); Map tags = statsCtx.getTags(); boolean tagFound = false; @@ -1481,7 +1495,7 @@ public void timeoutOnSleepingServer() throws Exception { // CensusStreamTracerModule record final status in the interceptor, thus is guaranteed to be // recorded. The tracer stats rely on the stream being created, which is not always the case // in this test, thus we will not check that. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( clientRecord, false, "grpc.testing.TestService/FullDuplexCall", Status.DEADLINE_EXCEEDED.getCode()); @@ -1743,7 +1757,7 @@ private void assertClientStatsTrace(String method, Status.Code code, if (metricsExpected()) { // CensusStreamTracerModule records final status in interceptor, which is guaranteed to be // done before application receives status. - MetricsRecord clientRecord = clientStatsCtxFactory.pollRecord(); + MetricsRecord clientRecord = clientStatsRecorder.pollRecord(); checkTags(clientRecord, false, method, code); if (requests != null && responses != null) { @@ -1775,7 +1789,7 @@ private void assertServerStatsTrace(String method, Status.Code code, try { // On the server, the stats is finalized in ServerStreamListener.closed(), which can be // run after the client receives the final status. So we use a timeout. - serverRecord = serverStatsCtxFactory.pollRecord(5, TimeUnit.SECONDS); + serverRecord = serverStatsRecorder.pollRecord(5, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -1843,12 +1857,12 @@ private static void checkTags( MetricsRecord record, boolean server, String methodName, Status.Code status) { assertNotNull("record is not null", record); TagValue methodNameTag = record.tags.get( - server ? RpcConstants.RPC_SERVER_METHOD : RpcConstants.RPC_CLIENT_METHOD); + server ? RpcMeasureConstants.RPC_SERVER_METHOD : RpcMeasureConstants.RPC_CLIENT_METHOD); assertNotNull("method name tagged", methodNameTag); - assertEquals("method names match", methodName, methodNameTag.toString()); - TagValue statusTag = record.tags.get(RpcConstants.RPC_STATUS); + assertEquals("method names match", methodName, methodNameTag.asString()); + TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); assertNotNull("status tagged", statusTag); - assertEquals(status.toString(), statusTag.toString()); + assertEquals(status.toString(), statusTag.asString()); } /** @@ -1901,33 +1915,41 @@ private void checkCensus(MetricsRecord record, boolean server, } if (server && serverInProcess()) { assertEquals( - requests.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_REQUEST_COUNT)); + requests.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_REQUEST_COUNT)); + assertEquals( + responses.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT)); + assertEquals( + uncompressedRequestsSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); assertEquals( - responses.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_RESPONSE_COUNT)); - assertEquals(uncompressedRequestsSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(uncompressedResponsesSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_SERVER_SERVER_LATENCY)); + uncompressedResponsesSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY)); // It's impossible to get the expected wire sizes because it may be compressed, so we just // check if they are recorded. - assertNotNull(record.getMetric(RpcConstants.RPC_SERVER_REQUEST_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_SERVER_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_REQUEST_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_SERVER_RESPONSE_BYTES)); } if (!server) { assertEquals( - requests.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_REQUEST_COUNT)); + requests.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT)); + assertEquals( + responses.size(), + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_RESPONSE_COUNT)); + assertEquals( + uncompressedRequestsSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); assertEquals( - responses.size(), record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_RESPONSE_COUNT)); - assertEquals(uncompressedRequestsSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_REQUEST_BYTES)); - assertEquals(uncompressedResponsesSize, - record.getMetricAsLongOrFail(RpcConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); + uncompressedResponsesSize, + record.getMetricAsLongOrFail(RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY)); // It's impossible to get the expected wire sizes because it may be compressed, so we just // check if they are recorded. - assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_REQUEST_BYTES)); - assertNotNull(record.getMetric(RpcConstants.RPC_CLIENT_RESPONSE_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_REQUEST_BYTES)); + assertNotNull(record.getMetric(RpcMeasureConstants.RPC_CLIENT_RESPONSE_BYTES)); } } diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index 64bbd9e86ef..e67b0d63f24 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -365,7 +365,8 @@ protected ManagedChannel createChannel() { } builder = okBuilder; } - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java index 50cedf64915..44b7bc83d9a 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java @@ -48,7 +48,8 @@ protected ManagedChannel createChannel() { NettyChannelBuilder builder = NettyChannelBuilder.forAddress("localhost", getPort()) .negotiationType(NegotiationType.PLAINTEXT) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java index ff4f0989d8d..975ed86a276 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java @@ -59,7 +59,8 @@ protected ManagedChannel createChannel() { .channelType(LocalChannel.class) .flowControlWindow(65 * 1024) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java index f23f7e775ec..651619a1943 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java @@ -80,7 +80,8 @@ protected ManagedChannel createChannel() { .ciphers(TestUtils.preferredTestCiphers(), SupportedCipherSuiteFilter.INSTANCE) .sslProvider(SslProvider.OPENSSL) .build()); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } catch (Exception ex) { throw new RuntimeException(ex); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java index 083c6ead49d..94e6cac8775 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java @@ -105,7 +105,8 @@ private OkHttpChannelBuilder createChannelBuilder() { .build()) .overrideAuthority(GrpcUtil.authorityFromHostAndPort( TestUtils.TEST_SERVER_HOST, getPort())); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); try { builder.sslSocketFactory(TestUtils.newSslSocketFactoryForCa(Platform.get().getProvider(), TestUtils.loadCert("ca.pem"))); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java index 786ed46bdae..54b2b8b8d7d 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java @@ -44,7 +44,8 @@ public static void stopServer() { @Override protected ManagedChannel createChannel() { InProcessChannelBuilder builder = InProcessChannelBuilder.forName(SERVER_NAME); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java index 4034d5b962d..56268f7a33a 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java @@ -172,7 +172,8 @@ public void onHeaders(Metadata headers) { } }) .usePlaintext(true); - io.grpc.internal.TestingAccessor.setStatsContextFactory(builder, getClientStatsFactory()); + io.grpc.internal.TestingAccessor.setStatsImplementation( + builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); return builder.build(); } diff --git a/testing/src/main/java/io/grpc/internal/TestingAccessor.java b/testing/src/main/java/io/grpc/internal/TestingAccessor.java index 71f3f5d7349..b03967e7686 100644 --- a/testing/src/main/java/io/grpc/internal/TestingAccessor.java +++ b/testing/src/main/java/io/grpc/internal/TestingAccessor.java @@ -16,26 +16,34 @@ package io.grpc.internal; -import com.google.instrumentation.stats.StatsContextFactory; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; /** * Test helper that allows accessing package-private stuff. */ public final class TestingAccessor { /** - * Sets a custom {@link StatsContextFactory} for tests. + * Sets a custom stats implementation for tests. */ - public static void setStatsContextFactory( - AbstractManagedChannelImplBuilder builder, StatsContextFactory factory) { - builder.statsContextFactory(factory); + public static void setStatsImplementation( + AbstractManagedChannelImplBuilder builder, + Tagger tagger, + TagContextBinarySerializer tagCtxSerializer, + StatsRecorder statsRecorder) { + builder.statsImplementation(tagger, tagCtxSerializer, statsRecorder); } /** - * Sets a custom {@link StatsContextFactory} for tests. + * Sets a custom stats implementation for tests. */ - public static void setStatsContextFactory( - AbstractServerImplBuilder builder, StatsContextFactory factory) { - builder.statsContextFactory(factory); + public static void setStatsImplementation( + AbstractServerImplBuilder builder, + Tagger tagger, + TagContextBinarySerializer tagCtxSerializer, + StatsRecorder statsRecorder) { + builder.statsImplementation(tagger, tagCtxSerializer, statsRecorder); } private TestingAccessor() { diff --git a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java index 441bef3c584..d8faa10aa3e 100644 --- a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java +++ b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java @@ -19,15 +19,23 @@ import static com.google.common.base.Charsets.UTF_8; import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; -import com.google.instrumentation.stats.MeasurementDescriptor; -import com.google.instrumentation.stats.MeasurementMap; -import com.google.instrumentation.stats.MeasurementValue; -import com.google.instrumentation.stats.StatsContext; -import com.google.instrumentation.stats.StatsContextFactory; -import com.google.instrumentation.stats.TagKey; -import com.google.instrumentation.stats.TagValue; -import io.grpc.internal.IoUtils; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; +import io.opencensus.common.Scope; +import io.opencensus.stats.Measure; +import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.StatsRecorder; +import io.opencensus.tags.Tag; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagContextBuilder; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagContextParseException; +import io.opencensus.tags.unsafe.ContextUtils; import io.opencensus.trace.Annotation; import io.opencensus.trace.AttributeValue; import io.opencensus.trace.EndSpanOptions; @@ -40,10 +48,8 @@ import io.opencensus.trace.SpanId; import io.opencensus.trace.TraceId; import io.opencensus.trace.TraceOptions; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -57,10 +63,12 @@ private StatsTestUtils() { } public static class MetricsRecord { + public final ImmutableMap tags; - public final MeasurementMap metrics; + public final ImmutableMap metrics; - private MetricsRecord(ImmutableMap tags, MeasurementMap metrics) { + private MetricsRecord( + ImmutableMap tags, ImmutableMap metrics) { this.tags = tags; this.metrics = metrics; } @@ -69,10 +77,16 @@ private MetricsRecord(ImmutableMap tags, MeasurementMap metric * Returns the value of a metric, or {@code null} if not found. */ @Nullable - public Double getMetric(MeasurementDescriptor metricName) { - for (MeasurementValue m : metrics) { - if (m.getMeasurement().equals(metricName)) { - return m.getValue(); + public Double getMetric(Measure measure) { + for (Map.Entry m : metrics.entrySet()) { + if (m.getKey().equals(measure)) { + Number value = m.getValue(); + if (value instanceof Double) { + return (Double) value; + } else if (value instanceof Long) { + return (double) (Long) value; + } + throw new AssertionError("Unexpected measure value type: " + value.getClass().getName()); } } return null; @@ -81,9 +95,9 @@ public Double getMetric(MeasurementDescriptor metricName) { /** * Returns the value of a metric converted to long, or throw if not found. */ - public long getMetricAsLongOrFail(MeasurementDescriptor metricName) { - Double doubleValue = getMetric(metricName); - checkNotNull(doubleValue, "Metric not found: %s", metricName.toString()); + public long getMetricAsLongOrFail(Measure measure) { + Double doubleValue = getMetric(measure); + checkNotNull(doubleValue, "Measure not found: %s", measure.getName()); long longValue = (long) (Math.abs(doubleValue) + 0.0001); if (doubleValue < 0) { longValue = -longValue; @@ -93,7 +107,7 @@ public long getMetricAsLongOrFail(MeasurementDescriptor metricName) { } /** - * This tag will be propagated by {@link FakeStatsContextFactory} on the wire. + * This tag will be propagated by {@link FakeTagger} on the wire. */ public static final TagKey EXTRA_TAG = TagKey.create("/rpc/test/extratag"); @@ -101,31 +115,21 @@ public long getMetricAsLongOrFail(MeasurementDescriptor metricName) { private static final String NO_EXTRA_TAG_HEADER_VALUE_PREFIX = "noextratag"; /** - * A factory that makes fake {@link StatsContext}s and saves the created contexts to be - * accessible from {@link #pollContextOrFail}. The contexts it has created would save metrics - * records to be accessible from {@link #pollRecord()} and {@link #pollRecord(long, TimeUnit)}, - * until {@link #rolloverRecords} is called. + * A {@link Tagger} implementation that saves metrics records to be accessible from {@link + * #pollRecord()} and {@link #pollRecord(long, TimeUnit)}, until {@link #rolloverRecords} is + * called. */ - public static final class FakeStatsContextFactory extends StatsContextFactory { + public static final class FakeStatsRecorder extends StatsRecorder { + private BlockingQueue records; - public final BlockingQueue contexts = - new LinkedBlockingQueue(); - private final FakeStatsContext defaultContext; - /** - * Constructor. - */ - public FakeStatsContextFactory() { - rolloverRecords(); - defaultContext = new FakeStatsContext(ImmutableMap.of(), this); - // The records on the default context is not visible from pollRecord(), just like it's - // not visible from pollContextOrFail() either. + public FakeStatsRecorder() { rolloverRecords(); } - public StatsContext pollContextOrFail() { - StatsContext cc = contexts.poll(); - return checkNotNull(cc); + @Override + public StatsRecord newRecord() { + return new FakeStatsRecord(this); } public MetricsRecord pollRecord() { @@ -136,31 +140,8 @@ public MetricsRecord pollRecord(long timeout, TimeUnit unit) throws InterruptedE return getCurrentRecordSink().poll(timeout, unit); } - @Override - public StatsContext deserialize(InputStream buffer) throws IOException { - String serializedString; - try { - serializedString = new String(IoUtils.toByteArray(buffer), UTF_8); - } catch (IOException e) { - throw new RuntimeException(e); - } - if (serializedString.startsWith(EXTRA_TAG_HEADER_VALUE_PREFIX)) { - return getDefault().with(EXTRA_TAG, - TagValue.create(serializedString.substring(EXTRA_TAG_HEADER_VALUE_PREFIX.length()))); - } else if (serializedString.startsWith(NO_EXTRA_TAG_HEADER_VALUE_PREFIX)) { - return getDefault(); - } else { - throw new IOException("Malformed value"); - } - } - - @Override - public FakeStatsContext getDefault() { - return defaultContext; - } - /** - * Disconnect this factory with the contexts it has created so far. The records from those + * Disconnect this tagger with the contexts it has created so far. The records from those * contexts will not show up in {@link #pollRecord}. Useful for isolating the records between * test cases. */ @@ -174,88 +155,171 @@ private synchronized BlockingQueue getCurrentRecordSink() { } } - public static final class FakeStatsContext extends StatsContext { - private final ImmutableMap tags; - private final FakeStatsContextFactory factory; - private final BlockingQueue recordSink; + public static final class FakeTagger extends Tagger { - private FakeStatsContext(ImmutableMap tags, - FakeStatsContextFactory factory) { - this.tags = tags; - this.factory = factory; - this.recordSink = factory.getCurrentRecordSink(); + @Override + public FakeTagContext empty() { + return FakeTagContext.EMPTY; } - public Map getTags() { - return tags; + @Override + public TagContext getCurrentTagContext() { + return ContextUtils.TAG_CONTEXT_KEY.get(); } @Override - public Builder builder() { - return new FakeStatsContextBuilder(this); + public TagContextBuilder emptyBuilder() { + return new FakeTagContextBuilder(ImmutableMap.of()); } @Override - public StatsContext record(MeasurementMap metrics) { - recordSink.add(new MetricsRecord(tags, metrics)); - return this; + public FakeTagContextBuilder toBuilder(TagContext tags) { + return new FakeTagContextBuilder(getTags(tags)); } @Override - public void serialize(OutputStream os) { - TagValue extraTagValue = tags.get(EXTRA_TAG); - try { - if (extraTagValue == null) { - os.write(NO_EXTRA_TAG_HEADER_VALUE_PREFIX.getBytes(UTF_8)); - } else { - os.write((EXTRA_TAG_HEADER_VALUE_PREFIX + extraTagValue.toString()).getBytes(UTF_8)); - } - } catch (IOException e) { - throw new RuntimeException(e); - } + public TagContextBuilder currentBuilder() { + throw new UnsupportedOperationException(); } @Override - public String toString() { - return "[tags=" + tags + "]"; + public Scope withTagContext(TagContext tags) { + throw new UnsupportedOperationException(); } + } + + public static final class FakeTagContextBinarySerializer extends TagContextBinarySerializer { + + private final FakeTagger tagger = new FakeTagger(); @Override - public boolean equals(Object other) { - if (!(other instanceof FakeStatsContext)) { - return false; + public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { + String serializedString = new String(bytes, UTF_8); + if (serializedString.startsWith(EXTRA_TAG_HEADER_VALUE_PREFIX)) { + return tagger.emptyBuilder() + .put(EXTRA_TAG, + TagValue.create(serializedString.substring(EXTRA_TAG_HEADER_VALUE_PREFIX.length()))) + .build(); + } else if (serializedString.startsWith(NO_EXTRA_TAG_HEADER_VALUE_PREFIX)) { + return tagger.empty(); + } else { + throw new TagContextParseException("Malformed value"); } - FakeStatsContext otherCtx = (FakeStatsContext) other; - return tags.equals(otherCtx.tags); } @Override - public int hashCode() { - return tags.hashCode(); + public byte[] toByteArray(TagContext tags) { + TagValue extraTagValue = getTags(tags).get(EXTRA_TAG); + if (extraTagValue == null) { + return NO_EXTRA_TAG_HEADER_VALUE_PREFIX.getBytes(UTF_8); + } else { + return (EXTRA_TAG_HEADER_VALUE_PREFIX + extraTagValue.asString()).getBytes(UTF_8); + } } } - private static class FakeStatsContextBuilder extends StatsContext.Builder { - private final ImmutableMap.Builder tagsBuilder = ImmutableMap.builder(); - private final FakeStatsContext base; + public static final class FakeStatsRecord extends StatsRecord { + + private final BlockingQueue recordSink; + public final Map metrics = Maps.newHashMap(); + + private FakeStatsRecord(FakeStatsRecorder statsRecorder) { + this.recordSink = statsRecorder.getCurrentRecordSink(); + } - private FakeStatsContextBuilder(FakeStatsContext base) { - this.base = base; - tagsBuilder.putAll(base.tags); + @Override + public StatsRecord put(Measure.MeasureDouble measure, double value) { + metrics.put(measure, value); + return this; } @Override - public StatsContext.Builder set(TagKey key, TagValue value) { + public StatsRecord put(Measure.MeasureLong measure, long value) { + metrics.put(measure, value); + return this; + } + + @Override + public void record(TagContext tags) { + recordSink.add(new MetricsRecord(getTags(tags), ImmutableMap.copyOf(metrics))); + } + + @Override + public void record() { + throw new UnsupportedOperationException(); + } + } + + public static final class FakeTagContext extends TagContext { + + private static final FakeTagContext EMPTY = + new FakeTagContext(ImmutableMap.of()); + + private final ImmutableMap tags; + + private FakeTagContext(ImmutableMap tags) { + this.tags = tags; + } + + public ImmutableMap getTags() { + return tags; + } + + @Override + public String toString() { + return "[tags=" + tags + "]"; + } + + @Override + protected Iterator getIterator() { + return Iterators.transform( + tags.entrySet().iterator(), + new Function, Tag>() { + @Override + public Tag apply(@Nullable Map.Entry entry) { + return Tag.create(entry.getKey(), entry.getValue()); + } + }); + } + } + + public static class FakeTagContextBuilder extends TagContextBuilder { + + private final Map tagsBuilder = Maps.newHashMap(); + + private FakeTagContextBuilder(Map tags) { + tagsBuilder.putAll(tags); + } + + @Override + public TagContextBuilder put(TagKey key, TagValue value) { tagsBuilder.put(key, value); return this; } @Override - public StatsContext build() { - FakeStatsContext context = new FakeStatsContext(tagsBuilder.build(), base.factory); - base.factory.contexts.add(context); + public TagContextBuilder remove(TagKey key) { + tagsBuilder.remove(key); + return this; + } + + @Override + public TagContext build() { + FakeTagContext context = new FakeTagContext(ImmutableMap.copyOf(tagsBuilder)); return context; } + + @Override + public Scope buildScoped() { + throw new UnsupportedOperationException(); + } + } + + // This method handles the default TagContext, which isn't an instance of FakeTagContext. + private static ImmutableMap getTags(TagContext tags) { + return tags instanceof FakeTagContext + ? ((FakeTagContext) tags).getTags() + : ImmutableMap.of(); } // TODO(bdrutu): Remove this class after OpenCensus releases support for this class. From 4e01775a9451545c4afd47c173be483384f43037 Mon Sep 17 00:00:00 2001 From: Kristen Kozak Date: Wed, 1 Nov 2017 13:08:05 -0700 Subject: [PATCH 2/8] Update OpenCensus to 0.8.0 release. --- .../io/grpc/internal/CensusStatsModule.java | 25 ++++++++++------- ...AbstractManagedChannelImplBuilderTest.java | 11 ++++---- .../AbstractServerImplBuilderTest.java | 11 ++++---- .../io/grpc/internal/CensusModulesTest.java | 16 +++++------ .../integration/AbstractInteropTest.java | 27 +++++++++---------- .../grpc/internal/testing/StatsTestUtils.java | 16 +++++------ 6 files changed, 56 insertions(+), 50 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index a503d92eaf6..acdaf458a98 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -38,12 +38,13 @@ import io.grpc.Status; import io.grpc.StreamTracer; import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; -import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.MeasureMap; import io.opencensus.stats.StatsRecorder; import io.opencensus.tags.TagContext; import io.opencensus.tags.TagValue; import io.opencensus.tags.Tagger; import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagContextSerializationException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -94,7 +95,11 @@ final class CensusStatsModule { public byte[] toBytes(TagContext context) { // TODO(carl-mastrangelo): currently we only make sure the correctness. We may need to // optimize out the allocation and copy in the future. - return tagCtxSerializer.toByteArray(context); + try { + return tagCtxSerializer.toByteArray(context); + } catch (TagContextSerializationException e) { + throw new RuntimeException(e); + } } @Override @@ -245,7 +250,7 @@ void callEnded(Status status) { if (tracer == null) { tracer = BLANK_CLIENT_TRACER; } - StatsRecord statsRecord = module.statsRecorder.newRecord() + MeasureMap measureMap = module.statsRecorder.newMeasureMap() // The metrics are in double .put(RpcMeasureConstants.RPC_CLIENT_ROUNDTRIP_LATENCY, roundtripNanos / NANOS_PER_MILLI) .put(RpcMeasureConstants.RPC_CLIENT_REQUEST_COUNT, tracer.outboundMessageCount) @@ -259,13 +264,13 @@ void callEnded(Status status) { RpcMeasureConstants.RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES, tracer.inboundUncompressedSize); if (!status.isOk()) { - statsRecord.put(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT, 1); + measureMap.put(RpcMeasureConstants.RPC_CLIENT_ERROR_COUNT, 1); } - statsRecord.record( + measureMap.record( module .tagger .toBuilder(parentCtx) - .put(RpcMeasureConstants.RPC_CLIENT_METHOD, TagValue.create(fullMethodName)) + .put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName)) .put(RpcMeasureConstants.RPC_STATUS, TagValue.create(status.getCode().toString())) .build()); } @@ -360,7 +365,7 @@ public void streamClosed(Status status) { } stopwatch.stop(); long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS); - StatsRecord statsRecord = module.statsRecorder.newRecord() + MeasureMap measureMap = module.statsRecorder.newMeasureMap() // The metrics are in double .put(RpcMeasureConstants.RPC_SERVER_SERVER_LATENCY, elapsedTimeNanos / NANOS_PER_MILLI) .put(RpcMeasureConstants.RPC_SERVER_RESPONSE_COUNT, outboundMessageCount) @@ -370,10 +375,10 @@ public void streamClosed(Status status) { .put(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_RESPONSE_BYTES, outboundUncompressedSize) .put(RpcMeasureConstants.RPC_SERVER_UNCOMPRESSED_REQUEST_BYTES, inboundUncompressedSize); if (!status.isOk()) { - statsRecord.put(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT, 1); + measureMap.put(RpcMeasureConstants.RPC_SERVER_ERROR_COUNT, 1); } TagContext ctx = firstNonNull(parentCtx, tagger.empty()); - statsRecord.record( + measureMap.record( module .tagger .toBuilder(ctx) @@ -401,7 +406,7 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata parentCtx = tagger .toBuilder(parentCtx) - .put(RpcMeasureConstants.RPC_SERVER_METHOD, TagValue.create(fullMethodName)) + .put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName)) .build(); return new ServerTracer( CensusStatsModule.this, fullMethodName, parentCtx, stopwatchSupplier, tagger); diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index 89d9efbe04c..c9ec9110f18 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -37,13 +37,14 @@ import io.grpc.MethodDescriptor; import io.grpc.NameResolver; import io.opencensus.common.Scope; -import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.MeasureMap; import io.opencensus.stats.StatsRecorder; import io.opencensus.tags.TagContext; import io.opencensus.tags.TagContextBuilder; import io.opencensus.tags.Tagger; import io.opencensus.tags.propagation.TagContextBinarySerializer; -import io.opencensus.tags.propagation.TagContextParseException; +import io.opencensus.tags.propagation.TagContextDeserializationException; +import io.opencensus.tags.propagation.TagContextSerializationException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; @@ -102,12 +103,12 @@ public Scope withTagContext(TagContext tags) { private static final TagContextBinarySerializer DUMMY_TAG_CONTEXT_BINARY_SERIALIZER = new TagContextBinarySerializer() { @Override - public byte[] toByteArray(TagContext tags) { + public byte[] toByteArray(TagContext tags) throws TagContextSerializationException { throw new UnsupportedOperationException(); } @Override - public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { + public TagContext fromByteArray(byte[] bytes) throws TagContextDeserializationException { throw new UnsupportedOperationException(); } }; @@ -115,7 +116,7 @@ public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { private static final StatsRecorder DUMMY_STATS_RECORDER = new StatsRecorder() { @Override - public StatsRecord newRecord() { + public MeasureMap newMeasureMap() { throw new UnsupportedOperationException(); } }; diff --git a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java index 145764cdcb6..d03d0302c49 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java @@ -22,13 +22,14 @@ import io.grpc.Metadata; import io.grpc.ServerStreamTracer; import io.opencensus.common.Scope; -import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.MeasureMap; import io.opencensus.stats.StatsRecorder; import io.opencensus.tags.TagContext; import io.opencensus.tags.TagContextBuilder; import io.opencensus.tags.Tagger; import io.opencensus.tags.propagation.TagContextBinarySerializer; -import io.opencensus.tags.propagation.TagContextParseException; +import io.opencensus.tags.propagation.TagContextDeserializationException; +import io.opencensus.tags.propagation.TagContextSerializationException; import java.io.File; import java.util.List; import org.junit.Test; @@ -75,12 +76,12 @@ public Scope withTagContext(TagContext tags) { private static final TagContextBinarySerializer DUMMY_TAG_CONTEXT_BINARY_SERIALIZER = new TagContextBinarySerializer() { @Override - public byte[] toByteArray(TagContext tags) { + public byte[] toByteArray(TagContext tags) throws TagContextSerializationException { throw new UnsupportedOperationException(); } @Override - public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { + public TagContext fromByteArray(byte[] bytes) throws TagContextDeserializationException { throw new UnsupportedOperationException(); } }; @@ -88,7 +89,7 @@ public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { private static final StatsRecorder DUMMY_STATS_RECORDER = new StatsRecorder() { @Override - public StatsRecord newRecord() { + public MeasureMap newMeasureMap() { throw new UnsupportedOperationException(); } }; diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java index 6ee1d996d80..89c7896b78f 100644 --- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java +++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java @@ -299,7 +299,7 @@ public ClientCall interceptCall( // The intercepting listener calls callEnded() on ClientCallTracer, which records to Census. StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); - TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); assertEquals(method.getFullMethodName(), methodTag.asString()); TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); assertEquals(Status.Code.PERMISSION_DENIED.toString(), statusTag.asString()); @@ -352,7 +352,7 @@ public void clientBasicStatsDefaultContext() { StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoServerContent(record); - TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); assertEquals(method.getFullMethodName(), methodTag.asString()); TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); assertEquals(Status.Code.OK.toString(), statusTag.asString()); @@ -441,7 +441,7 @@ public void clientStreamNeverCreatedStillRecordStats() { StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoServerContent(record); - TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); assertEquals(method.getFullMethodName(), methodTag.asString()); TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); assertEquals(Status.Code.DEADLINE_EXCEEDED.toString(), statusTag.asString()); @@ -533,7 +533,7 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS // It also put clientCtx in the Context seen by the call handler assertEquals( tagger.toBuilder(clientCtx).put( - RpcMeasureConstants.RPC_SERVER_METHOD, + RpcMeasureConstants.RPC_METHOD, TagValue.create(method.getFullMethodName())).build(), TAG_CONTEXT_KEY.get(serverContext)); @@ -544,7 +544,7 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS StatsTestUtils.MetricsRecord serverRecord = statsRecorder.pollRecord(); assertNotNull(serverRecord); assertNoClientContent(serverRecord); - TagValue serverMethodTag = serverRecord.tags.get(RpcMeasureConstants.RPC_SERVER_METHOD); + TagValue serverMethodTag = serverRecord.tags.get(RpcMeasureConstants.RPC_METHOD); assertEquals(method.getFullMethodName(), serverMethodTag.asString()); TagValue serverStatusTag = serverRecord.tags.get(RpcMeasureConstants.RPC_STATUS); assertEquals(Status.Code.OK.toString(), serverStatusTag.asString()); @@ -561,7 +561,7 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS StatsTestUtils.MetricsRecord clientRecord = statsRecorder.pollRecord(); assertNotNull(clientRecord); assertNoServerContent(clientRecord); - TagValue clientMethodTag = clientRecord.tags.get(RpcMeasureConstants.RPC_CLIENT_METHOD); + TagValue clientMethodTag = clientRecord.tags.get(RpcMeasureConstants.RPC_METHOD); assertEquals(method.getFullMethodName(), clientMethodTag.asString()); TagValue clientStatusTag = clientRecord.tags.get(RpcMeasureConstants.RPC_STATUS); assertEquals(Status.Code.OK.toString(), clientStatusTag.asString()); @@ -669,7 +669,7 @@ public void serverBasicStatsNoHeaders() { tagger .emptyBuilder() .put( - RpcMeasureConstants.RPC_SERVER_METHOD, + RpcMeasureConstants.RPC_METHOD, TagValue.create(method.getFullMethodName())) .build(), statsCtx); @@ -698,7 +698,7 @@ public void serverBasicStatsNoHeaders() { StatsTestUtils.MetricsRecord record = statsRecorder.pollRecord(); assertNotNull(record); assertNoClientContent(record); - TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_SERVER_METHOD); + TagValue methodTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); assertEquals(method.getFullMethodName(), methodTag.asString()); TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); assertEquals(Status.Code.CANCELLED.toString(), statusTag.asString()); diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index cae2185606f..c9ae027d682 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -723,8 +723,7 @@ public void cancelAfterBegin() throws Exception { // in this test. Therefore we don't check the tracer stats. MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/StreamingInputCall", - Status.CANCELLED.getCode()); + clientRecord, "grpc.testing.TestService/StreamingInputCall", Status.CANCELLED.getCode()); // Do not check server-side metrics, because the status on the server side is undetermined. } } @@ -1046,7 +1045,8 @@ public void deadlineExceeded() throws Exception { // stats. MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/StreamingOutputCall", + clientRecord, + "grpc.testing.TestService/StreamingOutputCall", Status.Code.DEADLINE_EXCEEDED); // Do not check server-side metrics, because the status on the server side is undetermined. } @@ -1079,7 +1079,8 @@ public void deadlineExceededServerStreaming() throws Exception { // stats. MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/StreamingOutputCall", + clientRecord, + "grpc.testing.TestService/StreamingOutputCall", Status.Code.DEADLINE_EXCEEDED); // Do not check server-side metrics, because the status on the server side is undetermined. } @@ -1103,8 +1104,7 @@ public void deadlineInPast() throws Exception { if (metricsExpected()) { MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/EmptyCall", - Status.DEADLINE_EXCEEDED.getCode()); + clientRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode()); } // warm up the channel @@ -1121,8 +1121,7 @@ public void deadlineInPast() throws Exception { if (metricsExpected()) { MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/EmptyCall", - Status.DEADLINE_EXCEEDED.getCode()); + clientRecord, "grpc.testing.TestService/EmptyCall", Status.DEADLINE_EXCEEDED.getCode()); } } @@ -1497,7 +1496,8 @@ public void timeoutOnSleepingServer() throws Exception { // in this test, thus we will not check that. MetricsRecord clientRecord = clientStatsRecorder.pollRecord(5, TimeUnit.SECONDS); checkTags( - clientRecord, false, "grpc.testing.TestService/FullDuplexCall", + clientRecord, + "grpc.testing.TestService/FullDuplexCall", Status.DEADLINE_EXCEEDED.getCode()); } } @@ -1758,7 +1758,7 @@ private void assertClientStatsTrace(String method, Status.Code code, // CensusStreamTracerModule records final status in interceptor, which is guaranteed to be // done before application receives status. MetricsRecord clientRecord = clientStatsRecorder.pollRecord(); - checkTags(clientRecord, false, method, code); + checkTags(clientRecord, method, code); if (requests != null && responses != null) { checkCensus(clientRecord, false, requests, responses); @@ -1797,7 +1797,7 @@ private void assertServerStatsTrace(String method, Status.Code code, break; } try { - checkTags(serverRecord, true, method, code); + checkTags(serverRecord, method, code); if (requests != null && responses != null) { checkCensus(serverRecord, true, requests, responses); } @@ -1854,10 +1854,9 @@ private void assertServerStatsTrace(String method, Status.Code code, } private static void checkTags( - MetricsRecord record, boolean server, String methodName, Status.Code status) { + MetricsRecord record, String methodName, Status.Code status) { assertNotNull("record is not null", record); - TagValue methodNameTag = record.tags.get( - server ? RpcMeasureConstants.RPC_SERVER_METHOD : RpcMeasureConstants.RPC_CLIENT_METHOD); + TagValue methodNameTag = record.tags.get(RpcMeasureConstants.RPC_METHOD); assertNotNull("method name tagged", methodNameTag); assertEquals("method names match", methodName, methodNameTag.asString()); TagValue statusTag = record.tags.get(RpcMeasureConstants.RPC_STATUS); diff --git a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java index d8faa10aa3e..588ca898658 100644 --- a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java +++ b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java @@ -25,7 +25,7 @@ import com.google.common.collect.Maps; import io.opencensus.common.Scope; import io.opencensus.stats.Measure; -import io.opencensus.stats.StatsRecord; +import io.opencensus.stats.MeasureMap; import io.opencensus.stats.StatsRecorder; import io.opencensus.tags.Tag; import io.opencensus.tags.TagContext; @@ -34,7 +34,7 @@ import io.opencensus.tags.TagValue; import io.opencensus.tags.Tagger; import io.opencensus.tags.propagation.TagContextBinarySerializer; -import io.opencensus.tags.propagation.TagContextParseException; +import io.opencensus.tags.propagation.TagContextDeserializationException; import io.opencensus.tags.unsafe.ContextUtils; import io.opencensus.trace.Annotation; import io.opencensus.trace.AttributeValue; @@ -128,7 +128,7 @@ public FakeStatsRecorder() { } @Override - public StatsRecord newRecord() { + public MeasureMap newMeasureMap() { return new FakeStatsRecord(this); } @@ -193,7 +193,7 @@ public static final class FakeTagContextBinarySerializer extends TagContextBinar private final FakeTagger tagger = new FakeTagger(); @Override - public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { + public TagContext fromByteArray(byte[] bytes) throws TagContextDeserializationException { String serializedString = new String(bytes, UTF_8); if (serializedString.startsWith(EXTRA_TAG_HEADER_VALUE_PREFIX)) { return tagger.emptyBuilder() @@ -203,7 +203,7 @@ public TagContext fromByteArray(byte[] bytes) throws TagContextParseException { } else if (serializedString.startsWith(NO_EXTRA_TAG_HEADER_VALUE_PREFIX)) { return tagger.empty(); } else { - throw new TagContextParseException("Malformed value"); + throw new TagContextDeserializationException("Malformed value"); } } @@ -218,7 +218,7 @@ public byte[] toByteArray(TagContext tags) { } } - public static final class FakeStatsRecord extends StatsRecord { + public static final class FakeStatsRecord extends MeasureMap { private final BlockingQueue recordSink; public final Map metrics = Maps.newHashMap(); @@ -228,13 +228,13 @@ private FakeStatsRecord(FakeStatsRecorder statsRecorder) { } @Override - public StatsRecord put(Measure.MeasureDouble measure, double value) { + public MeasureMap put(Measure.MeasureDouble measure, double value) { metrics.put(measure, value); return this; } @Override - public StatsRecord put(Measure.MeasureLong measure, long value) { + public MeasureMap put(Measure.MeasureLong measure, long value) { metrics.put(measure, value); return this; } From 611878c0b535941f55b01aab05dd56a3813e5c30 Mon Sep 17 00:00:00 2001 From: Kristen Kozak Date: Wed, 1 Nov 2017 15:53:31 -0700 Subject: [PATCH 3/8] Remove TODOs about checking stats collection state. gRPC shouldn't use the state to determine whether to initialize the CensusStatsModule, because the state can be changed at runtime. --- .../io/grpc/internal/AbstractManagedChannelImplBuilder.java | 4 ---- .../main/java/io/grpc/internal/AbstractServerImplBuilder.java | 4 ---- 2 files changed, 8 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 75dad9290ff..0652cf40268 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -365,9 +365,6 @@ final List getEffectiveInterceptors() { : Tags.getTagPropagationComponent().getBinarySerializer(); StatsRecorder statsRecorder = this.statsRecorder != null ? this.statsRecorder : Stats.getStatsRecorder(); - // // TODO: How do we check whether stats is enabled, now that the StatsRecorder is always - // // non-null? Uncommenting this line causes test failures. - // if (Stats.getState() == StatsCollectionState.ENABLED) { CensusStatsModule censusStats = new CensusStatsModule( tagger, @@ -379,7 +376,6 @@ final List getEffectiveInterceptors() { // First interceptor runs last (see ClientInterceptors.intercept()), so that no // other interceptor can override the tracer factory we set in CallOptions. effectiveInterceptors.add(0, censusStats.getClientInterceptor()); - // } } if (tracingEnabled) { CensusTracingModule censusTracing = diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 53576e36f4e..2d6b00c1cc4 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -249,9 +249,6 @@ final List getTracerFactories() { : Tags.getTagPropagationComponent().getBinarySerializer(); StatsRecorder statsRecorder = this.statsRecorder != null ? this.statsRecorder : Stats.getStatsRecorder(); - // // TODO: How do we check whether stats is enabled, now that the StatsRecorder is always - // // non-null? Uncommenting this line causes test failures. - // if (Stats.getState() == StatsCollectionState.ENABLED) { CensusStatsModule censusStats = new CensusStatsModule( tagger, @@ -261,7 +258,6 @@ final List getTracerFactories() { true, recordStats); tracerFactories.add(censusStats.getServerTracerFactory()); - // } } if (tracingEnabled) { CensusTracingModule censusTracing = From d921df5a638f2d222d23237776e396ad84fe67ea Mon Sep 17 00:00:00 2001 From: Kristen Kozak Date: Wed, 1 Nov 2017 17:24:13 -0700 Subject: [PATCH 4/8] Refactor the code for overriding the Census implementation in tests. This commit adds a method to set the CensusStatsModule on AbstractServerImplBuilder and AbstractManagedChannelImplBuilder. It is simpler to set the whole CensusStatsModule than pass in all necessary OpenCensus implementation objects. --- .../AbstractManagedChannelImplBuilder.java | 41 ++-------- .../internal/AbstractServerImplBuilder.java | 43 ++--------- .../io/grpc/internal/CensusStatsModule.java | 75 ++++++++++++++----- ...AbstractManagedChannelImplBuilderTest.java | 16 +++- .../AbstractServerImplBuilderTest.java | 8 +- .../io/grpc/internal/CensusModulesTest.java | 19 +++-- .../io/grpc/internal/TestingAccessor.java | 8 +- 7 files changed, 110 insertions(+), 100 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java index 0652cf40268..91851fd2ac9 100644 --- a/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java @@ -32,11 +32,6 @@ import io.grpc.NameResolver; import io.grpc.NameResolverProvider; import io.grpc.PickFirstBalancerFactory; -import io.opencensus.stats.Stats; -import io.opencensus.stats.StatsRecorder; -import io.opencensus.tags.Tagger; -import io.opencensus.tags.Tags; -import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.trace.Tracing; import java.net.SocketAddress; import java.net.URI; @@ -155,13 +150,7 @@ protected final int maxInboundMessageSize() { private boolean tracingEnabled = true; @Nullable - private Tagger tagger; - - @Nullable - private TagContextBinarySerializer tagCtxSerializer; - - @Nullable - private StatsRecorder statsRecorder; + private CensusStatsModule censusStatsOverride; protected AbstractManagedChannelImplBuilder(String target) { this.target = Preconditions.checkNotNull(target, "target"); @@ -296,11 +285,8 @@ public final T idleTimeout(long value, TimeUnit unit) { * Override the default stats implementation. */ @VisibleForTesting - protected final T statsImplementation( - Tagger tagger, TagContextBinarySerializer tagCtxSerializer, StatsRecorder statsRecorder) { - this.tagger = tagger; - this.tagCtxSerializer = tagCtxSerializer; - this.statsRecorder = statsRecorder; + protected final T overrideCensusStatsModule(CensusStatsModule censusStats) { + this.censusStatsOverride = censusStats; return thisT(); } @@ -358,24 +344,13 @@ final List getEffectiveInterceptors() { List effectiveInterceptors = new ArrayList(this.interceptors); if (statsEnabled) { - Tagger tagger = this.tagger != null ? this.tagger : Tags.getTagger(); - TagContextBinarySerializer tagCtxSerializer = - this.tagCtxSerializer != null - ? this.tagCtxSerializer - : Tags.getTagPropagationComponent().getBinarySerializer(); - StatsRecorder statsRecorder = - this.statsRecorder != null ? this.statsRecorder : Stats.getStatsRecorder(); - CensusStatsModule censusStats = - new CensusStatsModule( - tagger, - tagCtxSerializer, - statsRecorder, - GrpcUtil.STOPWATCH_SUPPLIER, - true, - recordStats); + CensusStatsModule censusStats = this.censusStatsOverride; + if (censusStats == null) { + censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true); + } // First interceptor runs last (see ClientInterceptors.intercept()), so that no // other interceptor can override the tracer factory we set in CallOptions. - effectiveInterceptors.add(0, censusStats.getClientInterceptor()); + effectiveInterceptors.add(0, censusStats.getClientInterceptor(recordStats)); } if (tracingEnabled) { CensusTracingModule censusTracing = diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index 2d6b00c1cc4..29768d15037 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -34,11 +34,6 @@ import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; import io.grpc.ServerTransportFilter; -import io.opencensus.stats.Stats; -import io.opencensus.stats.StatsRecorder; -import io.opencensus.tags.Tagger; -import io.opencensus.tags.Tags; -import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.trace.Tracing; import java.util.ArrayList; import java.util.Collections; @@ -100,13 +95,7 @@ public List getServices() { CompressorRegistry compressorRegistry = DEFAULT_COMPRESSOR_REGISTRY; @Nullable - private Tagger tagger; - - @Nullable - private TagContextBinarySerializer tagCtxSerializer; - - @Nullable - private StatsRecorder statsRecorder; + private CensusStatsModule censusStatsOverride; private boolean statsEnabled = true; private boolean recordStats = true; @@ -193,13 +182,8 @@ public final T compressorRegistry(CompressorRegistry registry) { * Override the default stats implementation. */ @VisibleForTesting - protected T statsImplementation( - final Tagger tagger, - TagContextBinarySerializer tagCtxSerializer, - StatsRecorder statsRecorder) { - this.tagger = tagger; - this.tagCtxSerializer = tagCtxSerializer; - this.statsRecorder = statsRecorder; + protected T overrideCensusStatsModule(CensusStatsModule censusStats) { + this.censusStatsOverride = censusStats; return thisT(); } @@ -242,22 +226,11 @@ final List getTracerFactories() { ArrayList tracerFactories = new ArrayList(); if (statsEnabled) { - Tagger tagger = this.tagger != null ? this.tagger : Tags.getTagger(); - TagContextBinarySerializer tagCtxSerializer = - this.tagCtxSerializer != null - ? this.tagCtxSerializer - : Tags.getTagPropagationComponent().getBinarySerializer(); - StatsRecorder statsRecorder = - this.statsRecorder != null ? this.statsRecorder : Stats.getStatsRecorder(); - CensusStatsModule censusStats = - new CensusStatsModule( - tagger, - tagCtxSerializer, - statsRecorder, - GrpcUtil.STOPWATCH_SUPPLIER, - true, - recordStats); - tracerFactories.add(censusStats.getServerTracerFactory()); + CensusStatsModule censusStats = this.censusStatsOverride; + if (censusStats == null) { + censusStats = new CensusStatsModule(GrpcUtil.STOPWATCH_SUPPLIER, true); + } + tracerFactories.add(censusStats.getServerTracerFactory(recordStats)); } if (tracingEnabled) { CensusTracingModule censusTracing = diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index acdaf458a98..781722004cd 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -39,10 +39,12 @@ import io.grpc.StreamTracer; import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; import io.opencensus.stats.MeasureMap; +import io.opencensus.stats.Stats; import io.opencensus.stats.StatsRecorder; import io.opencensus.tags.TagContext; import io.opencensus.tags.TagValue; import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.tags.propagation.TagContextSerializationException; import java.util.concurrent.TimeUnit; @@ -74,21 +76,33 @@ final class CensusStatsModule { private final Supplier stopwatchSupplier; @VisibleForTesting final Metadata.Key statsHeader; - private final StatsClientInterceptor clientInterceptor = new StatsClientInterceptor(); - private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory(); private final boolean propagateTags; - private final boolean recordStats; + /** + * Creates a {@link CensusStatsModule} with the default OpenCensus implementation. + */ + CensusStatsModule(Supplier stopwatchSupplier, boolean propagateTags) { + this( + Tags.getTagger(), + Tags.getTagPropagationComponent().getBinarySerializer(), + Stats.getStatsRecorder(), + stopwatchSupplier, + propagateTags); + } + + /** + * Creates a {@link CensusStatsModule} with the given OpenCensus implementation. + */ CensusStatsModule( final Tagger tagger, final TagContextBinarySerializer tagCtxSerializer, StatsRecorder statsRecorder, Supplier stopwatchSupplier, - boolean propagateTags, boolean recordStats) { + boolean propagateTags) { this.tagger = checkNotNull(tagger, "tagger"); this.statsRecorder = checkNotNull(statsRecorder, "statsRecorder"); + checkNotNull(tagCtxSerializer, "tagCtxSerializer"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.propagateTags = propagateTags; - this.recordStats = recordStats; this.statsHeader = Metadata.Key.of("grpc-tags-bin", new Metadata.BinaryMarshaller() { @Override @@ -118,22 +132,23 @@ public TagContext parseBytes(byte[] serialized) { * Creates a {@link ClientCallTracer} for a new call. */ @VisibleForTesting - ClientCallTracer newClientCallTracer(TagContext parentCtx, String fullMethodName) { - return new ClientCallTracer(this, parentCtx, fullMethodName); + ClientCallTracer newClientCallTracer( + TagContext parentCtx, String fullMethodName, boolean recordStats) { + return new ClientCallTracer(this, parentCtx, fullMethodName, recordStats); } /** * Returns the server tracer factory. */ - ServerStreamTracer.Factory getServerTracerFactory() { - return serverTracerFactory; + ServerStreamTracer.Factory getServerTracerFactory(boolean recordStats) { + return new ServerTracerFactory(recordStats); } /** * Returns the client interceptor that facilitates Census-based stats reporting. */ - ClientInterceptor getClientInterceptor() { - return clientInterceptor; + ClientInterceptor getClientInterceptor(boolean recordStats) { + return new StatsClientInterceptor(recordStats); } private static final class ClientTracer extends ClientStreamTracer { @@ -206,12 +221,18 @@ static final class ClientCallTracer extends ClientStreamTracer.Factory { private volatile ClientTracer streamTracer; private volatile int callEnded; private final TagContext parentCtx; + private final boolean recordStats; - ClientCallTracer(CensusStatsModule module, TagContext parentCtx, String fullMethodName) { + ClientCallTracer( + CensusStatsModule module, + TagContext parentCtx, + String fullMethodName, + boolean recordStats) { this.module = module; this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.stopwatch = module.stopwatchSupplier.get().start(); + this.recordStats = recordStats; } @Override @@ -241,7 +262,7 @@ void callEnded(Status status) { if (callEndedUpdater.getAndSet(this, 1) != 0) { return; } - if (!module.recordStats) { + if (!recordStats) { return; } stopwatch.stop(); @@ -299,6 +320,7 @@ private static final class ServerTracer extends ServerStreamTracer { private volatile int streamClosed; private final Stopwatch stopwatch; private final Tagger tagger; + private final boolean recordStats; private volatile long outboundMessageCount; private volatile long inboundMessageCount; private volatile long outboundWireSize; @@ -311,12 +333,14 @@ private static final class ServerTracer extends ServerStreamTracer { String fullMethodName, TagContext parentCtx, Supplier stopwatchSupplier, - Tagger tagger) { + Tagger tagger, + boolean recordStats) { this.module = module; this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName"); this.parentCtx = checkNotNull(parentCtx, "parentCtx"); this.stopwatch = stopwatchSupplier.get().start(); this.tagger = tagger; + this.recordStats = recordStats; } @Override @@ -360,7 +384,7 @@ public void streamClosed(Status status) { if (streamClosedUpdater.getAndSet(this, 1) != 0) { return; } - if (!module.recordStats) { + if (!recordStats) { return; } stopwatch.stop(); @@ -397,6 +421,12 @@ public Context filterContext(Context context) { @VisibleForTesting final class ServerTracerFactory extends ServerStreamTracer.Factory { + private final boolean recordStats; + + ServerTracerFactory(boolean recordStats) { + this.recordStats = recordStats; + } + @Override public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) { TagContext parentCtx = headers.get(statsHeader); @@ -409,19 +439,30 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata .put(RpcMeasureConstants.RPC_METHOD, TagValue.create(fullMethodName)) .build(); return new ServerTracer( - CensusStatsModule.this, fullMethodName, parentCtx, stopwatchSupplier, tagger); + CensusStatsModule.this, + fullMethodName, + parentCtx, + stopwatchSupplier, + tagger, + recordStats); } } @VisibleForTesting final class StatsClientInterceptor implements ClientInterceptor { + private final boolean recordStats; + + StatsClientInterceptor(boolean recordStats) { + this.recordStats = recordStats; + } + @Override public ClientCall interceptCall( MethodDescriptor method, CallOptions callOptions, Channel next) { // New RPCs on client-side inherit the tag context from the current Context. TagContext parentCtx = tagger.getCurrentTagContext(); final ClientCallTracer tracerFactory = - newClientCallTracer(parentCtx, method.getFullMethodName()); + newClientCallTracer(parentCtx, method.getFullMethodName(), recordStats); ClientCall call = next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory)); return new SimpleForwardingClientCall(call) { diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index c9ec9110f18..58b367581c2 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -393,12 +393,24 @@ public void idleTimeout() { static class Builder extends AbstractManagedChannelImplBuilder { Builder(String target) { super(target); - statsImplementation(DUMMY_TAGGER, DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, DUMMY_STATS_RECORDER); + overrideCensusStatsModule( + new CensusStatsModule( + DUMMY_TAGGER, + DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, + DUMMY_STATS_RECORDER, + GrpcUtil.STOPWATCH_SUPPLIER, + true)); } Builder(SocketAddress directServerAddress, String authority) { super(directServerAddress, authority); - statsImplementation(DUMMY_TAGGER, DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, DUMMY_STATS_RECORDER); + overrideCensusStatsModule( + new CensusStatsModule( + DUMMY_TAGGER, + DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, + DUMMY_STATS_RECORDER, + GrpcUtil.STOPWATCH_SUPPLIER, + true)); } @Override diff --git a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java index d03d0302c49..a331ccf1986 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java @@ -145,7 +145,13 @@ public void getTracerFactories_disableBoth() { static class Builder extends AbstractServerImplBuilder { Builder() { - statsImplementation(DUMMY_TAGGER, DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, DUMMY_STATS_RECORDER); + overrideCensusStatsModule( + new CensusStatsModule( + DUMMY_TAGGER, + DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, + DUMMY_STATS_RECORDER, + GrpcUtil.STOPWATCH_SUPPLIER, + true)); } @Override diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java index 89c7896b78f..d77b268ce35 100644 --- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java +++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java @@ -191,7 +191,7 @@ public void setUp() throws Exception { .thenReturn(fakeClientSpanContext); censusStats = new CensusStatsModule( - tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), true, true); + tagger, tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), true); censusTracing = new CensusTracingModule(tracer, mockTracingPropagationHandler); } @@ -240,7 +240,7 @@ public ClientCall interceptCall( Channel interceptedChannel = ClientInterceptors.intercept( grpcServerRule.getChannel(), callOptionsCaptureInterceptor, - censusStats.getClientInterceptor(), censusTracing.getClientInterceptor()); + censusStats.getClientInterceptor(true), censusTracing.getClientInterceptor()); ClientCall call; if (nonDefaultContext) { Context ctx = @@ -322,7 +322,7 @@ public ClientCall interceptCall( @Test public void clientBasicStatsDefaultContext() { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName()); + censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), true); Metadata headers = new Metadata(); ClientStreamTracer tracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); @@ -433,7 +433,7 @@ public void clientTracingSampledToLocalSpanStore() { public void clientStreamNeverCreatedStillRecordStats() { CensusStatsModule.ClientCallTracer callTracer = censusStats.newClientCallTracer( - tagger.empty(), method.getFullMethodName()); + tagger.empty(), method.getFullMethodName(), true); fakeClock.forwardTime(3000, MILLISECONDS); callTracer.callEnded(Status.DEADLINE_EXCEEDED.withDescription("3 seconds")); @@ -510,11 +510,10 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS tagCtxSerializer, statsRecorder, fakeClock.getStopwatchSupplier(), - propagate, - recordStats); + propagate); Metadata headers = new Metadata(); CensusStatsModule.ClientCallTracer callTracer = - census.newClientCallTracer(clientCtx, method.getFullMethodName()); + census.newClientCallTracer(clientCtx, method.getFullMethodName(), recordStats); // This propagates clientCtx to headers if propagates==true callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); if (propagate) { @@ -525,7 +524,7 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS } ServerStreamTracer serverTracer = - census.getServerTracerFactory().newServerStreamTracer( + census.getServerTracerFactory(recordStats).newServerStreamTracer( method.getFullMethodName(), headers); // Server tracer deserializes clientCtx from the headers, so that it records stats with the // propagated tags. @@ -578,7 +577,7 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS @Test public void statsHeadersNotPropagateDefaultContext() { CensusStatsModule.ClientCallTracer callTracer = - censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName()); + censusStats.newClientCallTracer(tagger.empty(), method.getFullMethodName(), true); Metadata headers = new Metadata(); callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); assertFalse(headers.containsKey(censusStats.statsHeader)); @@ -659,7 +658,7 @@ public void traceHeaderMalformed() throws Exception { @Test public void serverBasicStatsNoHeaders() { - ServerStreamTracer.Factory tracerFactory = censusStats.getServerTracerFactory(); + ServerStreamTracer.Factory tracerFactory = censusStats.getServerTracerFactory(true); ServerStreamTracer tracer = tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); diff --git a/testing/src/main/java/io/grpc/internal/TestingAccessor.java b/testing/src/main/java/io/grpc/internal/TestingAccessor.java index b03967e7686..0b97081a347 100644 --- a/testing/src/main/java/io/grpc/internal/TestingAccessor.java +++ b/testing/src/main/java/io/grpc/internal/TestingAccessor.java @@ -32,7 +32,9 @@ public static void setStatsImplementation( Tagger tagger, TagContextBinarySerializer tagCtxSerializer, StatsRecorder statsRecorder) { - builder.statsImplementation(tagger, tagCtxSerializer, statsRecorder); + builder.overrideCensusStatsModule( + new CensusStatsModule( + tagger, tagCtxSerializer, statsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true)); } /** @@ -43,7 +45,9 @@ public static void setStatsImplementation( Tagger tagger, TagContextBinarySerializer tagCtxSerializer, StatsRecorder statsRecorder) { - builder.statsImplementation(tagger, tagCtxSerializer, statsRecorder); + builder.overrideCensusStatsModule( + new CensusStatsModule( + tagger, tagCtxSerializer, statsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true)); } private TestingAccessor() { From ca7ad9a75b47ead114dcc9e36b1365d1836eb9ec Mon Sep 17 00:00:00 2001 From: Kristen Kozak Date: Fri, 3 Nov 2017 12:00:34 -0700 Subject: [PATCH 5/8] Remove dummy OpenCensus classes, and use fake classes instead. --- ...AbstractManagedChannelImplBuilderTest.java | 78 +++---------------- .../AbstractServerImplBuilderTest.java | 72 ++--------------- 2 files changed, 15 insertions(+), 135 deletions(-) diff --git a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java index 58b367581c2..3ca5db366a3 100644 --- a/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractManagedChannelImplBuilderTest.java @@ -36,15 +36,9 @@ import io.grpc.LoadBalancer; import io.grpc.MethodDescriptor; import io.grpc.NameResolver; -import io.opencensus.common.Scope; -import io.opencensus.stats.MeasureMap; -import io.opencensus.stats.StatsRecorder; -import io.opencensus.tags.TagContext; -import io.opencensus.tags.TagContextBuilder; -import io.opencensus.tags.Tagger; -import io.opencensus.tags.propagation.TagContextBinarySerializer; -import io.opencensus.tags.propagation.TagContextDeserializationException; -import io.opencensus.tags.propagation.TagContextSerializationException; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; @@ -67,60 +61,6 @@ public ClientCall interceptCall( } }; - private static final Tagger DUMMY_TAGGER = - new Tagger() { - @Override - public TagContext empty() { - throw new UnsupportedOperationException(); - } - - @Override - public TagContext getCurrentTagContext() { - throw new UnsupportedOperationException(); - } - - @Override - public TagContextBuilder emptyBuilder() { - throw new UnsupportedOperationException(); - } - - @Override - public TagContextBuilder toBuilder(TagContext tags) { - throw new UnsupportedOperationException(); - } - - @Override - public TagContextBuilder currentBuilder() { - throw new UnsupportedOperationException(); - } - - @Override - public Scope withTagContext(TagContext tags) { - throw new UnsupportedOperationException(); - } - }; - - private static final TagContextBinarySerializer DUMMY_TAG_CONTEXT_BINARY_SERIALIZER = - new TagContextBinarySerializer() { - @Override - public byte[] toByteArray(TagContext tags) throws TagContextSerializationException { - throw new UnsupportedOperationException(); - } - - @Override - public TagContext fromByteArray(byte[] bytes) throws TagContextDeserializationException { - throw new UnsupportedOperationException(); - } - }; - - private static final StatsRecorder DUMMY_STATS_RECORDER = - new StatsRecorder() { - @Override - public MeasureMap newMeasureMap() { - throw new UnsupportedOperationException(); - } - }; - private Builder builder = new Builder("fake"); private Builder directAddressBuilder = new Builder(new SocketAddress(){}, "fake"); @@ -395,9 +335,9 @@ static class Builder extends AbstractManagedChannelImplBuilder { super(target); overrideCensusStatsModule( new CensusStatsModule( - DUMMY_TAGGER, - DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, - DUMMY_STATS_RECORDER, + new FakeTagger(), + new FakeTagContextBinarySerializer(), + new FakeStatsRecorder(), GrpcUtil.STOPWATCH_SUPPLIER, true)); } @@ -406,9 +346,9 @@ static class Builder extends AbstractManagedChannelImplBuilder { super(directServerAddress, authority); overrideCensusStatsModule( new CensusStatsModule( - DUMMY_TAGGER, - DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, - DUMMY_STATS_RECORDER, + new FakeTagger(), + new FakeTagContextBinarySerializer(), + new FakeStatsRecorder(), GrpcUtil.STOPWATCH_SUPPLIER, true)); } diff --git a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java index a331ccf1986..ed85ac377eb 100644 --- a/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractServerImplBuilderTest.java @@ -21,15 +21,9 @@ import io.grpc.Metadata; import io.grpc.ServerStreamTracer; -import io.opencensus.common.Scope; -import io.opencensus.stats.MeasureMap; -import io.opencensus.stats.StatsRecorder; -import io.opencensus.tags.TagContext; -import io.opencensus.tags.TagContextBuilder; -import io.opencensus.tags.Tagger; -import io.opencensus.tags.propagation.TagContextBinarySerializer; -import io.opencensus.tags.propagation.TagContextDeserializationException; -import io.opencensus.tags.propagation.TagContextSerializationException; +import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; +import io.grpc.internal.testing.StatsTestUtils.FakeTagContextBinarySerializer; +import io.grpc.internal.testing.StatsTestUtils.FakeTagger; import java.io.File; import java.util.List; import org.junit.Test; @@ -40,60 +34,6 @@ @RunWith(JUnit4.class) public class AbstractServerImplBuilderTest { - private static final Tagger DUMMY_TAGGER = - new Tagger() { - @Override - public TagContext empty() { - throw new UnsupportedOperationException(); - } - - @Override - public TagContext getCurrentTagContext() { - throw new UnsupportedOperationException(); - } - - @Override - public TagContextBuilder emptyBuilder() { - throw new UnsupportedOperationException(); - } - - @Override - public TagContextBuilder toBuilder(TagContext tags) { - throw new UnsupportedOperationException(); - } - - @Override - public TagContextBuilder currentBuilder() { - throw new UnsupportedOperationException(); - } - - @Override - public Scope withTagContext(TagContext tags) { - throw new UnsupportedOperationException(); - } - }; - - private static final TagContextBinarySerializer DUMMY_TAG_CONTEXT_BINARY_SERIALIZER = - new TagContextBinarySerializer() { - @Override - public byte[] toByteArray(TagContext tags) throws TagContextSerializationException { - throw new UnsupportedOperationException(); - } - - @Override - public TagContext fromByteArray(byte[] bytes) throws TagContextDeserializationException { - throw new UnsupportedOperationException(); - } - }; - - private static final StatsRecorder DUMMY_STATS_RECORDER = - new StatsRecorder() { - @Override - public MeasureMap newMeasureMap() { - throw new UnsupportedOperationException(); - } - }; - private static final ServerStreamTracer.Factory DUMMY_USER_TRACER = new ServerStreamTracer.Factory() { @Override @@ -147,9 +87,9 @@ static class Builder extends AbstractServerImplBuilder { Builder() { overrideCensusStatsModule( new CensusStatsModule( - DUMMY_TAGGER, - DUMMY_TAG_CONTEXT_BINARY_SERIALIZER, - DUMMY_STATS_RECORDER, + new FakeTagger(), + new FakeTagContextBinarySerializer(), + new FakeStatsRecorder(), GrpcUtil.STOPWATCH_SUPPLIER, true)); } From 7fe19a25f49ce42dd9a5bba2d51f6d47624aacba Mon Sep 17 00:00:00 2001 From: Kristen Kozak Date: Fri, 3 Nov 2017 13:06:51 -0700 Subject: [PATCH 6/8] Refactor TestingAccessor.setStatsImplementation methods. --- .../io/grpc/internal/CensusStatsModule.java | 4 +-- .../integration/AbstractInteropTest.java | 25 ++++++++----------- .../integration/TestServiceClient.java | 2 +- .../integration/AutoWindowSizingOnTest.java | 2 +- .../Http2NettyLocalChannelTest.java | 2 +- .../testing/integration/Http2NettyTest.java | 2 +- .../testing/integration/Http2OkHttpTest.java | 2 +- .../testing/integration/InProcessTest.java | 2 +- .../integration/TransportCompressionTest.java | 2 +- .../io/grpc/internal/TestingAccessor.java | 22 +++------------- 10 files changed, 24 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index 781722004cd..524f3a39967 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -66,7 +66,7 @@ * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call and * it's the tracer that reports the summary to Census. */ -final class CensusStatsModule { +public final class CensusStatsModule { private static final Logger logger = Logger.getLogger(CensusStatsModule.class.getName()); private static final double NANOS_PER_MILLI = TimeUnit.MILLISECONDS.toNanos(1); private static final ClientTracer BLANK_CLIENT_TRACER = new ClientTracer(); @@ -93,7 +93,7 @@ final class CensusStatsModule { /** * Creates a {@link CensusStatsModule} with the given OpenCensus implementation. */ - CensusStatsModule( + public CensusStatsModule( final Tagger tagger, final TagContextBinarySerializer tagCtxSerializer, StatsRecorder statsRecorder, Supplier stopwatchSupplier, diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index c9ae027d682..1dc8ef05fd0 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -65,6 +65,7 @@ import io.grpc.StatusRuntimeException; import io.grpc.auth.MoreCallCredentials; import io.grpc.internal.AbstractServerImplBuilder; +import io.grpc.internal.CensusStatsModule; import io.grpc.internal.GrpcUtil; import io.grpc.internal.testing.StatsTestUtils; import io.grpc.internal.testing.StatsTestUtils.FakeStatsRecorder; @@ -94,11 +95,8 @@ import io.grpc.testing.integration.Messages.StreamingOutputCallRequest; import io.grpc.testing.integration.Messages.StreamingOutputCallResponse; import io.opencensus.contrib.grpc.metrics.RpcMeasureConstants; -import io.opencensus.stats.StatsRecorder; import io.opencensus.tags.TagKey; import io.opencensus.tags.TagValue; -import io.opencensus.tags.Tagger; -import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.trace.Span; import io.opencensus.trace.unsafe.ContextUtils; import java.io.IOException; @@ -214,7 +212,13 @@ protected static void startStaticServer( allInterceptors)) .addStreamTracerFactory(serverStreamTracerFactory); io.grpc.internal.TestingAccessor.setStatsImplementation( - builder, tagger, tagContextBinarySerializer, serverStatsRecorder); + builder, + new CensusStatsModule( + tagger, + tagContextBinarySerializer, + serverStatsRecorder, + GrpcUtil.STOPWATCH_SUPPLIER, + true)); try { server = builder.build().start(); } catch (IOException ex) { @@ -283,16 +287,9 @@ public void tearDown() throws Exception { protected abstract ManagedChannel createChannel(); - protected final StatsRecorder getClientStatsFactory() { - return clientStatsRecorder; - } - - protected final Tagger getTagger() { - return tagger; - } - - protected final TagContextBinarySerializer getTagContextBinarySerializer() { - return tagContextBinarySerializer; + protected final CensusStatsModule createClientCensusStatsModule() { + return new CensusStatsModule( + tagger, tagContextBinarySerializer, clientStatsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true); } /** diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java index e67b0d63f24..bbe60cc3ee1 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java @@ -366,7 +366,7 @@ protected ManagedChannel createChannel() { builder = okBuilder; } io.grpc.internal.TestingAccessor.setStatsImplementation( - builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); + builder, createClientCensusStatsModule()); return builder.build(); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java index 44b7bc83d9a..464c9704180 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/AutoWindowSizingOnTest.java @@ -49,7 +49,7 @@ protected ManagedChannel createChannel() { .negotiationType(NegotiationType.PLAINTEXT) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); io.grpc.internal.TestingAccessor.setStatsImplementation( - builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); + builder, createClientCensusStatsModule()); return builder.build(); } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java index 975ed86a276..8076a96cf9b 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyLocalChannelTest.java @@ -60,7 +60,7 @@ protected ManagedChannel createChannel() { .flowControlWindow(65 * 1024) .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE); io.grpc.internal.TestingAccessor.setStatsImplementation( - builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); + builder, createClientCensusStatsModule()); return builder.build(); } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java index 651619a1943..8bbaba6ec82 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2NettyTest.java @@ -81,7 +81,7 @@ protected ManagedChannel createChannel() { .sslProvider(SslProvider.OPENSSL) .build()); io.grpc.internal.TestingAccessor.setStatsImplementation( - builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); + builder, createClientCensusStatsModule()); return builder.build(); } catch (Exception ex) { throw new RuntimeException(ex); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java index 94e6cac8775..f2850a0f2d6 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/Http2OkHttpTest.java @@ -106,7 +106,7 @@ private OkHttpChannelBuilder createChannelBuilder() { .overrideAuthority(GrpcUtil.authorityFromHostAndPort( TestUtils.TEST_SERVER_HOST, getPort())); io.grpc.internal.TestingAccessor.setStatsImplementation( - builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); + builder, createClientCensusStatsModule()); try { builder.sslSocketFactory(TestUtils.newSslSocketFactoryForCa(Platform.get().getProvider(), TestUtils.loadCert("ca.pem"))); diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java index 54b2b8b8d7d..e7db3a6fccb 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/InProcessTest.java @@ -45,7 +45,7 @@ public static void stopServer() { protected ManagedChannel createChannel() { InProcessChannelBuilder builder = InProcessChannelBuilder.forName(SERVER_NAME); io.grpc.internal.TestingAccessor.setStatsImplementation( - builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); + builder, createClientCensusStatsModule()); return builder.build(); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java index 56268f7a33a..7cca4a8b06a 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java @@ -173,7 +173,7 @@ public void onHeaders(Metadata headers) { }) .usePlaintext(true); io.grpc.internal.TestingAccessor.setStatsImplementation( - builder, getTagger(), getTagContextBinarySerializer(), getClientStatsFactory()); + builder, createClientCensusStatsModule()); return builder.build(); } diff --git a/testing/src/main/java/io/grpc/internal/TestingAccessor.java b/testing/src/main/java/io/grpc/internal/TestingAccessor.java index 0b97081a347..d2df039a78e 100644 --- a/testing/src/main/java/io/grpc/internal/TestingAccessor.java +++ b/testing/src/main/java/io/grpc/internal/TestingAccessor.java @@ -16,10 +16,6 @@ package io.grpc.internal; -import io.opencensus.stats.StatsRecorder; -import io.opencensus.tags.Tagger; -import io.opencensus.tags.propagation.TagContextBinarySerializer; - /** * Test helper that allows accessing package-private stuff. */ @@ -28,26 +24,16 @@ public final class TestingAccessor { * Sets a custom stats implementation for tests. */ public static void setStatsImplementation( - AbstractManagedChannelImplBuilder builder, - Tagger tagger, - TagContextBinarySerializer tagCtxSerializer, - StatsRecorder statsRecorder) { - builder.overrideCensusStatsModule( - new CensusStatsModule( - tagger, tagCtxSerializer, statsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true)); + AbstractManagedChannelImplBuilder builder, CensusStatsModule censusStats) { + builder.overrideCensusStatsModule(censusStats); } /** * Sets a custom stats implementation for tests. */ public static void setStatsImplementation( - AbstractServerImplBuilder builder, - Tagger tagger, - TagContextBinarySerializer tagCtxSerializer, - StatsRecorder statsRecorder) { - builder.overrideCensusStatsModule( - new CensusStatsModule( - tagger, tagCtxSerializer, statsRecorder, GrpcUtil.STOPWATCH_SUPPLIER, true)); + AbstractServerImplBuilder builder, CensusStatsModule censusStats) { + builder.overrideCensusStatsModule(censusStats); } private TestingAccessor() { From 45419c214b13caf0c1daf0ac02ea6cd7d2be26ce Mon Sep 17 00:00:00 2001 From: Kristen Kozak Date: Fri, 3 Nov 2017 12:48:19 -0700 Subject: [PATCH 7/8] Remove NO_EXTRA_TAG_HEADER_VALUE_PREFIX. --- .../java/io/grpc/internal/testing/StatsTestUtils.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java index 588ca898658..188fccf39c7 100644 --- a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java +++ b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java @@ -112,7 +112,6 @@ public long getMetricAsLongOrFail(Measure measure) { public static final TagKey EXTRA_TAG = TagKey.create("/rpc/test/extratag"); private static final String EXTRA_TAG_HEADER_VALUE_PREFIX = "extratag:"; - private static final String NO_EXTRA_TAG_HEADER_VALUE_PREFIX = "noextratag"; /** * A {@link Tagger} implementation that saves metrics records to be accessible from {@link @@ -200,8 +199,6 @@ public TagContext fromByteArray(byte[] bytes) throws TagContextDeserializationEx .put(EXTRA_TAG, TagValue.create(serializedString.substring(EXTRA_TAG_HEADER_VALUE_PREFIX.length()))) .build(); - } else if (serializedString.startsWith(NO_EXTRA_TAG_HEADER_VALUE_PREFIX)) { - return tagger.empty(); } else { throw new TagContextDeserializationException("Malformed value"); } @@ -210,11 +207,7 @@ public TagContext fromByteArray(byte[] bytes) throws TagContextDeserializationEx @Override public byte[] toByteArray(TagContext tags) { TagValue extraTagValue = getTags(tags).get(EXTRA_TAG); - if (extraTagValue == null) { - return NO_EXTRA_TAG_HEADER_VALUE_PREFIX.getBytes(UTF_8); - } else { - return (EXTRA_TAG_HEADER_VALUE_PREFIX + extraTagValue.asString()).getBytes(UTF_8); - } + return (EXTRA_TAG_HEADER_VALUE_PREFIX + extraTagValue.asString()).getBytes(UTF_8); } } From 403c6e38eb4e80ba122d5b0a1a1fe92a1d581984 Mon Sep 17 00:00:00 2001 From: Kristen Kozak Date: Sat, 4 Nov 2017 10:43:16 -0700 Subject: [PATCH 8/8] Throw exception when FakeTagContext doesn't contain expected tag. --- .../src/main/java/io/grpc/internal/testing/StatsTestUtils.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java index 188fccf39c7..77fc9405da4 100644 --- a/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java +++ b/testing/src/main/java/io/grpc/internal/testing/StatsTestUtils.java @@ -207,6 +207,9 @@ public TagContext fromByteArray(byte[] bytes) throws TagContextDeserializationEx @Override public byte[] toByteArray(TagContext tags) { TagValue extraTagValue = getTags(tags).get(EXTRA_TAG); + if (extraTagValue == null) { + throw new UnsupportedOperationException("TagContext must contain EXTRA_TAG"); + } return (EXTRA_TAG_HEADER_VALUE_PREFIX + extraTagValue.asString()).getBytes(UTF_8); } }