Skip to content

Commit

Permalink
Conform to OpenTelemetry connect specification (#63)
Browse files Browse the repository at this point in the history
Conforms to
open-telemetry/opentelemetry-specification#3116

Namely:
- `buf_connect` is not `connect_rpc` 
- `status_code` is now `error_code` for `connect_rpc` instances. This
means that no attribute is set for success
- introduce string consts for grpc, grpcweb connect_rpc...
  • Loading branch information
joshcarp committed Jan 20, 2023
1 parent 05f1961 commit 5c0ad51
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 40 deletions.
24 changes: 14 additions & 10 deletions attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,21 @@ func addressAttributes(address string) []attribute.KeyValue {
return []attribute.KeyValue{semconv.NetPeerNameKey.String(address)}
}

func statusCodeAttribute(protocol string, serverErr error) attribute.KeyValue {
codeKey := attribute.Key("rpc." + protocol + ".status_code")
// Following the respective specifications, use integers for gRPC codes and
// strings for Connect codes.
if strings.HasPrefix(protocol, "grpc") {
func statusCodeAttribute(protocol string, serverErr error) (attribute.KeyValue, bool) {
// Following the respective specifications, use integers and "status_code" for
// gRPC codes in contrast to strings and "error_code" for Connect codes.
switch protocol {
case grpcProtocol, grpcwebProtocol:
codeKey := attribute.Key("rpc." + protocol + ".status_code")
if serverErr != nil {
return codeKey.Int64(int64(connect.CodeOf(serverErr)))
return codeKey.Int64(int64(connect.CodeOf(serverErr))), true
}
return codeKey.Int64(0), true // gRPC uses 0 for success
case connectProtocol:
codeKey := attribute.Key("rpc." + protocol + ".error_code")
if serverErr != nil {
return codeKey.String(connect.CodeOf(serverErr).String()), true
}
return codeKey.Int64(0) // gRPC uses 0 for success
} else if serverErr != nil {
return codeKey.String(connect.CodeOf(serverErr).String())
}
return codeKey.String("success")
return attribute.KeyValue{}, false
}
24 changes: 15 additions & 9 deletions interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ func (i *Interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
),
)
response, err := next(ctx, request)
attributes = append(attributes, statusCodeAttribute(protocol, err))
if statusCode, ok := statusCodeAttribute(protocol, err); ok {
attributes = append(attributes, statusCode)
}
var responseSize int
if err == nil {
if msg, ok := response.Any().(proto.Message); ok {
Expand Down Expand Up @@ -214,7 +216,9 @@ func (i *Interceptor) WrapStreamingClient(next connect.StreamingClientFunc) conn
// If error is nil a "success" is recorded on the span and on the final duration
// metric. The "rpc.<protocol>.status_code" is not defined for any other metrics for
// streams because the error only exists when finishing the stream.
state.addAttributes(statusCodeAttribute(protocol, state.error))
if statusCode, ok := statusCodeAttribute(protocol, state.error); ok {
state.addAttributes(statusCode)
}
span.SetAttributes(state.attributes...)
span.SetStatus(spanStatus(state.error))
span.End()
Expand Down Expand Up @@ -292,7 +296,9 @@ func (i *Interceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) co
},
}
err = next(ctx, streamingHandler)
state.addAttributes(statusCodeAttribute(protocol, err))
if statusCode, ok := statusCodeAttribute(protocol, err); ok {
state.addAttributes(statusCode)
}
span.SetAttributes(state.attributes...)
span.SetStatus(spanStatus(err))
instrumentation.duration.Record(ctx, i.config.now().Sub(requestStartTime).Milliseconds(), state.attributes...)
Expand All @@ -302,12 +308,12 @@ func (i *Interceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) co

func protocolToSemConv(protocol string) string {
switch protocol {
case "grpcweb":
return "grpc_web"
case "grpc":
return "grpc"
case "connect":
return "buf_connect"
case grpcwebString:
return grpcwebProtocol
case grpcProtocol:
return grpcProtocol
case connectString:
return connectProtocol
default:
return protocol
}
Expand Down
21 changes: 2 additions & 19 deletions interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ import (

const (
messagesPerRequest = 2
successString = "success"
bufConnect = "buf_connect"
bufConnect = "connect_rpc"
CumSumMethod = "CumSum"
PingMethod = "Ping"
FailMethod = "Fail"
Expand All @@ -67,7 +66,7 @@ const (
rpcServerResponseSize = "rpc.server.response.size"
rpcServerRequestsPerRPC = "rpc.server.requests_per_rpc"
rpcServerResponsesPerRPC = "rpc.server.responses_per_rpc"
rpcBufConnectStatusCode = "rpc.buf_connect.status_code"
rpcBufConnectStatusCode = "rpc.connect_rpc.error_code"
)

func TestStreamingMetrics(t *testing.T) {
Expand Down Expand Up @@ -117,7 +116,6 @@ func TestStreamingMetrics(t *testing.T) {
Attributes: attribute.NewSet(
semconv.NetPeerNameKey.String(host),
semconv.NetPeerPortKey.Int(port),
attribute.Key(rpcBufConnectStatusCode).String(successString),
semconv.RPCSystemKey.String(bufConnect),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCMethodKey.String(CumSumMethod),
Expand Down Expand Up @@ -283,7 +281,6 @@ func TestStreamingMetricsClient(t *testing.T) {
semconv.RPCSystemKey.String(bufConnect),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCMethodKey.String(CumSumMethod),
attribute.Key(rpcBufConnectStatusCode).String(successString),
),
Count: 1,
Sum: 1000.0,
Expand Down Expand Up @@ -787,7 +784,6 @@ func TestMetrics(t *testing.T) {
Attributes: attribute.NewSet(
semconv.NetPeerNameKey.String(host),
semconv.NetPeerPortKey.Int(port),
attribute.Key(rpcBufConnectStatusCode).String(successString),
semconv.RPCMethodKey.String(PingMethod),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCSystemKey.String(bufConnect),
Expand All @@ -810,7 +806,6 @@ func TestMetrics(t *testing.T) {
Attributes: attribute.NewSet(
semconv.NetPeerNameKey.String(host),
semconv.NetPeerPortKey.Int(port),
attribute.Key(rpcBufConnectStatusCode).String(successString),
semconv.RPCMethodKey.String(PingMethod),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCSystemKey.String(bufConnect),
Expand All @@ -833,7 +828,6 @@ func TestMetrics(t *testing.T) {
Attributes: attribute.NewSet(
semconv.NetPeerNameKey.String(host),
semconv.NetPeerPortKey.Int(port),
attribute.Key(rpcBufConnectStatusCode).String(successString),
semconv.RPCMethodKey.String(PingMethod),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCSystemKey.String(bufConnect),
Expand All @@ -856,7 +850,6 @@ func TestMetrics(t *testing.T) {
Attributes: attribute.NewSet(
semconv.NetPeerNameKey.String(host),
semconv.NetPeerPortKey.Int(port),
attribute.Key(rpcBufConnectStatusCode).String(successString),
semconv.RPCMethodKey.String(PingMethod),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCSystemKey.String(bufConnect),
Expand All @@ -879,7 +872,6 @@ func TestMetrics(t *testing.T) {
Attributes: attribute.NewSet(
semconv.NetPeerNameKey.String(host),
semconv.NetPeerPortKey.Int(port),
attribute.Key(rpcBufConnectStatusCode).String(successString),
semconv.RPCMethodKey.String(PingMethod),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCSystemKey.String(bufConnect),
Expand Down Expand Up @@ -983,7 +975,6 @@ func TestClientSimple(t *testing.T) {
semconv.RPCSystemKey.String(bufConnect),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCMethodKey.String(PingMethod),
attribute.Key(rpcBufConnectStatusCode).String(successString),
},
},
}, clientSpanRecorder.Ended())
Expand Down Expand Up @@ -1084,7 +1075,6 @@ func TestClientHandlerOpts(t *testing.T) {
semconv.RPCSystemKey.String(bufConnect),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCMethodKey.String(PingMethod),
attribute.Key(rpcBufConnectStatusCode).String(successString),
},
},
}, clientSpanRecorder.Ended())
Expand Down Expand Up @@ -1156,7 +1146,6 @@ func TestFilterHeader(t *testing.T) {
semconv.RPCSystemKey.String(bufConnect),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCMethodKey.String(PingMethod),
attribute.Key(rpcBufConnectStatusCode).String(successString),
},
},
}, spanRecorder.Ended())
Expand Down Expand Up @@ -1203,7 +1192,6 @@ func TestInterceptors(t *testing.T) {
semconv.RPCSystemKey.String(bufConnect),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCMethodKey.String(PingMethod),
attribute.Key(rpcBufConnectStatusCode).String(successString),
},
},
{
Expand Down Expand Up @@ -1232,7 +1220,6 @@ func TestInterceptors(t *testing.T) {
semconv.RPCSystemKey.String(bufConnect),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCMethodKey.String(PingMethod),
attribute.Key(rpcBufConnectStatusCode).String(successString),
},
},
}, spanRecorder.Ended())
Expand Down Expand Up @@ -1549,7 +1536,6 @@ func TestStreamingHandlerTracing(t *testing.T) {
semconv.RPCSystemKey.String(bufConnect),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCMethodKey.String(CumSumMethod),
attribute.Key(rpcBufConnectStatusCode).String(successString),
},
},
}, spanRecorder.Ended())
Expand Down Expand Up @@ -1598,7 +1584,6 @@ func TestStreamingClientTracing(t *testing.T) {
semconv.RPCSystemKey.String(bufConnect),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCMethodKey.String(CumSumMethod),
attribute.Key(rpcBufConnectStatusCode).String(successString),
},
},
}, spanRecorder.Ended())
Expand Down Expand Up @@ -1654,7 +1639,6 @@ func TestWithAttributeFilter(t *testing.T) {
semconv.NetPeerPortKey.Int(port),
semconv.RPCSystemKey.String(bufConnect),
semconv.RPCMethodKey.String(CumSumMethod),
attribute.Key(rpcBufConnectStatusCode).String(successString),
},
},
}, spanRecorder.Ended())
Expand Down Expand Up @@ -1709,7 +1693,6 @@ func TestWithoutServerPeerAttributes(t *testing.T) {
semconv.RPCSystemKey.String(bufConnect),
semconv.RPCServiceKey.String(pingv1connect.PingServiceName),
semconv.RPCMethodKey.String(CumSumMethod),
attribute.Key(rpcBufConnectStatusCode).String(successString),
},
},
}, spanRecorder.Ended())
Expand Down
5 changes: 5 additions & 0 deletions otelconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const (
version = "0.0.1-dev"
semanticVersion = "semver:" + version
instrumentationName = "connectrpc.com/otelconnect"
grpcProtocol = "grpc"
grpcwebString = "grpcweb"
grpcwebProtocol = "grpc_web"
connectString = "connect"
connectProtocol = "connect_rpc"
)

// Request is the information about each RPC available to filter functions. It
Expand Down
8 changes: 6 additions & 2 deletions streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ func (s *streamingState) receive(ctx context.Context, msg any, conn sendReceiver
s.error = err
// If error add it to the attributes because the stream is about to terminate.
// If no error don't add anything because status only exists at end of stream.
s.addAttributes(statusCodeAttribute(s.protocol, err))
if statusCode, ok := statusCodeAttribute(s.protocol, err); ok {
s.addAttributes(statusCode)
}
}
protomsg, ok := msg.(proto.Message)
size := proto.Size(protomsg)
Expand All @@ -103,7 +105,9 @@ func (s *streamingState) send(ctx context.Context, msg any, conn sendReceiver) e
s.error = err
// If error add it to the attributes because the stream is about to terminate.
// If no error don't add anything because status only exists at end of stream.
s.addAttributes(statusCodeAttribute(s.protocol, err))
if statusCode, ok := statusCodeAttribute(s.protocol, err); ok {
s.addAttributes(statusCode)
}
}
protomsg, ok := msg.(proto.Message)
size := proto.Size(protomsg)
Expand Down

0 comments on commit 5c0ad51

Please sign in to comment.