Skip to content

Commit

Permalink
Propagate trace context always (GoogleCloudPlatform#1345)
Browse files Browse the repository at this point in the history
* Propagate trace from frontend always

Enable OpenTelemetry trace context propagation regardless of environment vars. This will allow for context to be injected from another source (e.g. a service mesh) and propagated regardless of whether the frontend binary exports the trace.

This PR uses OpenTelemetry trace propagation features to extract and inject relevant headers to/from requests. By default, it looks for W3C tracecontext headers, but will also fall back to baggage when those headers are not found. It is important for context ti be propagated any time downsteam service calls are made, otherwise any potential traces will appear orphaned (i.e. outside the appropriate context.) This can happen when another process intercepts headers and creates trace spans as mentioned above, or in case of custom tracing scenarios where not all potential spans are sampled or exported. Maintaining trace context is important in such scenarios.

For the above reason, we do not propagate traces in services that do not make any downstream service calls (e.g. paymentservice, emailservice because there are no downstream dependencies that require trace propagation.

* Allow otel to extract context from HTTP

* Always propagate trace context in checkoutservice

* Propagate trace context always in currencyservice

* Propagate trace context in productcatalog always

* Add trace context propagation

Adding context propagation in grpc client of recommendation service

* Do not set tracer provider by default

Tracer Provider should only be initialized if traces are being emitted by the recommendation server. In other cases, we want to propagate trace context without emitting a trace (in case traces are emitted elsewhere.)
  • Loading branch information
arbrown committed Jan 18, 2023
1 parent 6f113c0 commit 079d943
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 67 deletions.
35 changes: 14 additions & 21 deletions src/checkoutservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,15 @@ func main() {
}

var srv *grpc.Server
//TODO(arbrown) Add metrics hook
if os.Getenv("ENABLE_TRACING") == "1" {
srv = grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
)
} else {
srv = grpc.NewServer()
}

// Propagate trace context always
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, propagation.Baggage{}))
srv = grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()),
)

pb.RegisterCheckoutServiceServer(srv, svc)
healthpb.RegisterHealthServer(srv, svc)
Expand Down Expand Up @@ -170,9 +170,7 @@ func initTracing() {
sdktrace.WithBatcher(exporter),
sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, propagation.Baggage{}))

}

func initProfiling(service, version string) {
Expand Down Expand Up @@ -209,15 +207,10 @@ func mustConnGRPC(ctx context.Context, conn **grpc.ClientConn, addr string) {
var err error
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
if os.Getenv("ENABLE_TRACING") == "1" {
*conn, err = grpc.DialContext(ctx, addr,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
} else {
*conn, err = grpc.DialContext(ctx, addr,
grpc.WithInsecure())
}
*conn, err = grpc.DialContext(ctx, addr,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
if err != nil {
panic(errors.Wrapf(err, "grpc: failed to connect %s", addr))
}
Expand Down
14 changes: 8 additions & 6 deletions src/currencyservice/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@ else {
});
}

// Register GRPC OTel Instrumentation for trace propagation
// regardless of whether tracing is emitted.
const { GrpcInstrumentation } = require('@opentelemetry/instrumentation-grpc');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');

registerInstrumentations({
instrumentations: [new GrpcInstrumentation()]
});

if(process.env.ENABLE_TRACING == "1") {
console.log("Tracing enabled.")
const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node');
const { SimpleSpanProcessor } = require('@opentelemetry/sdk-trace-base');
const { GrpcInstrumentation } = require('@opentelemetry/instrumentation-grpc');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');
const { OTLPTraceExporter } = require("@opentelemetry/exporter-otlp-grpc");

const provider = new NodeTracerProvider();
Expand All @@ -42,10 +48,6 @@ if(process.env.ENABLE_TRACING == "1") {

provider.addSpanProcessor(new SimpleSpanProcessor(new OTLPTraceExporter({url: collectorUrl})));
provider.register();

registerInstrumentations({
instrumentations: [new GrpcInstrumentation()]
});
}
else {
console.log("Tracing disabled.")
Expand Down
29 changes: 12 additions & 17 deletions src/frontend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func main() {

svc := new(frontendServer)

otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, propagation.Baggage{}))

if os.Getenv("ENABLE_TRACING") == "1" {
log.Info("Tracing enabled.")
initTracing(log, ctx, svc)
Expand Down Expand Up @@ -147,11 +151,9 @@ func main() {
r.HandleFunc("/_healthz", func(w http.ResponseWriter, _ *http.Request) { fmt.Fprint(w, "ok") })

var handler http.Handler = r
handler = &logHandler{log: log, next: handler} // add logging
handler = ensureSessionID(handler) // add session ID
if os.Getenv("ENABLE_TRACING") == "1" {
handler = otelhttp.NewHandler(handler, "frontend") // add OTel tracing
}
handler = &logHandler{log: log, next: handler} // add logging
handler = ensureSessionID(handler) // add session ID
handler = otelhttp.NewHandler(handler, "frontend") // add OTel tracing

log.Infof("starting server on " + addr + ":" + srvPort)
log.Fatal(http.ListenAndServe(addr+":"+srvPort, handler))
Expand All @@ -173,9 +175,7 @@ func initTracing(log logrus.FieldLogger, ctx context.Context, svc *frontendServe
sdktrace.WithBatcher(exporter),
sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, propagation.Baggage{}))

return tp, err
}

Expand Down Expand Up @@ -214,15 +214,10 @@ func mustConnGRPC(ctx context.Context, conn **grpc.ClientConn, addr string) {
var err error
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
if os.Getenv("ENABLE_TRACING") == "1" {
*conn, err = grpc.DialContext(ctx, addr,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
} else {
*conn, err = grpc.DialContext(ctx, addr,
grpc.WithInsecure())
}
*conn, err = grpc.DialContext(ctx, addr,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
if err != nil {
panic(errors.Wrapf(err, "grpc: failed to connect %s", addr))
}
Expand Down
30 changes: 11 additions & 19 deletions src/productcatalogservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,14 @@ func run(port string) string {
if err != nil {
log.Fatal(err)
}
// Propagate trace context
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, propagation.Baggage{}))
var srv *grpc.Server
if os.Getenv("ENABLE_TRACING") == "1" {
srv = grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()))
} else {
srv = grpc.NewServer()
}
srv = grpc.NewServer(
grpc.UnaryInterceptor(otelgrpc.UnaryServerInterceptor()),
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor()))

svc := &productCatalog{}

Expand Down Expand Up @@ -177,9 +177,6 @@ func initTracing() error {
sdktrace.WithBatcher(exporter),
sdktrace.WithSampler(sdktrace.AlwaysSample()))
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, propagation.Baggage{}))
return err
}

Expand Down Expand Up @@ -285,15 +282,10 @@ func mustConnGRPC(ctx context.Context, conn **grpc.ClientConn, addr string) {
var err error
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
if os.Getenv("ENABLE_TRACING") == "1" {
*conn, err = grpc.DialContext(ctx, addr,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
} else {
*conn, err = grpc.DialContext(ctx, addr,
grpc.WithInsecure())
}
*conn, err = grpc.DialContext(ctx, addr,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor()))
if err != nil {
panic(errors.Wrapf(err, "grpc: failed to connect %s", addr))
}
Expand Down
10 changes: 6 additions & 4 deletions src/recommendationservice/recommendation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from grpc_health.v1 import health_pb2_grpc

from opentelemetry import trace
from opentelemetry.instrumentation.grpc import GrpcInstrumentorServer
from opentelemetry.instrumentation.grpc import GrpcInstrumentorClient, GrpcInstrumentorServer
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
Expand Down Expand Up @@ -104,9 +104,13 @@ def Watch(self, request, context):
logger.info("Profiler disabled.")

try:
grpc_client_instrumentor = GrpcInstrumentorClient()
grpc_client_instrumentor.instrument()
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
if os.environ["ENABLE_TRACING"] == "1":
otel_endpoint = os.getenv("COLLECTOR_SERVICE_ADDR", "localhost:4317")
trace.set_tracer_provider(TracerProvider())
otel_endpoint = os.getenv("COLLECTOR_SERVICE_ADDR", "localhost:4317")
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(
Expand All @@ -115,8 +119,6 @@ def Watch(self, request, context):
)
)
)
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()
except (KeyError, DefaultCredentialsError):
logger.info("Tracing disabled.")
except Exception as e:
Expand Down

0 comments on commit 079d943

Please sign in to comment.