Skip to content

Commit

Permalink
otelgrpc: Add peer attributes to spans generated by stats handlers (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
pellared authored Jan 30, 2024
1 parent bfae9a3 commit ef8063b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add the new `go.opentelemetry.io/contrib/instrgen` package to provide auto-generated source code instrumentation. (#3068, #3108)
- Add peer attributes to spans recorded by `NewClientHandler`, `NewServerHandler` in `go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc`. (#4873)

## [1.22.0/0.47.0/0.16.0/0.2.0] - 2024-01-18

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -179,6 +180,10 @@ func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool
)
}
case *stats.OutTrailer:
case *stats.OutHeader:
if p, ok := peer.FromContext(ctx); ok {
span.SetAttributes(peerAttr(p.Addr.String())...)
}
case *stats.End:
var rpcStatusAttr attribute.KeyValue

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"io"
"net"
"strconv"
"sync"
"testing"

Expand Down Expand Up @@ -74,7 +75,7 @@ func TestStatsHandler(t *testing.T) {
doCalls(client)

t.Run("ClientSpans", func(t *testing.T) {
checkClientSpans(t, clientSR.Ended())
checkClientSpans(t, clientSR.Ended(), listener.Addr().String())
})

t.Run("ClientMetrics", func(t *testing.T) {
Expand All @@ -90,9 +91,14 @@ func TestStatsHandler(t *testing.T) {
})
}

func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan, addr string) {
require.Len(t, spans, 5)

host, p, err := net.SplitHostPort(addr)
require.NoError(t, err)
port, err := strconv.Atoi(p)
require.NoError(t, err)

emptySpan := spans[0]
assert.False(t, emptySpan.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/EmptyCall", emptySpan.Name())
Expand Down Expand Up @@ -121,6 +127,8 @@ func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, emptySpan.Attributes())

largeSpan := spans[1]
Expand Down Expand Up @@ -151,6 +159,8 @@ func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, largeSpan.Attributes())

streamInput := spans[2]
Expand Down Expand Up @@ -209,6 +219,8 @@ func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, streamInput.Attributes())

streamOutput := spans[3]
Expand Down Expand Up @@ -266,6 +278,8 @@ func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, streamOutput.Attributes())

pingPong := spans[4]
Expand Down Expand Up @@ -350,6 +364,8 @@ func checkClientSpans(t *testing.T, spans []trace.ReadOnlySpan) {
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr(host),
semconv.NetSockPeerPort(port),
}, pingPong.Attributes())
}

Expand Down Expand Up @@ -379,11 +395,15 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
},
},
}, emptySpan.Events())
port, ok := findAttribute(emptySpan.Attributes(), semconv.NetSockPeerPortKey)
assert.True(t, ok)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("EmptyCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr("127.0.0.1"),
port,
}, emptySpan.Attributes())

largeSpan := spans[1]
Expand All @@ -409,11 +429,15 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
},
},
}, largeSpan.Events())
port, ok = findAttribute(largeSpan.Attributes(), semconv.NetSockPeerPortKey)
assert.True(t, ok)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("UnaryCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr("127.0.0.1"),
port,
}, largeSpan.Attributes())

streamInput := spans[2]
Expand Down Expand Up @@ -467,11 +491,15 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
},
// client does not record an event for the server response.
}, streamInput.Events())
port, ok = findAttribute(streamInput.Attributes(), semconv.NetSockPeerPortKey)
assert.True(t, ok)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("StreamingInputCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr("127.0.0.1"),
port,
}, streamInput.Attributes())

streamOutput := spans[3]
Expand Down Expand Up @@ -524,11 +552,15 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
},
},
}, streamOutput.Events())
port, ok = findAttribute(streamOutput.Attributes(), semconv.NetSockPeerPortKey)
assert.True(t, ok)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("StreamingOutputCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr("127.0.0.1"),
port,
}, streamOutput.Attributes())

pingPong := spans[4]
Expand Down Expand Up @@ -608,11 +640,15 @@ func checkServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
},
},
}, pingPong.Events())
port, ok = findAttribute(pingPong.Attributes(), semconv.NetSockPeerPortKey)
assert.True(t, ok)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethodKey.String("FullDuplexCall"),
semconv.RPCServiceKey.String("grpc.testing.TestService"),
otelgrpc.RPCSystemGRPC,
otelgrpc.GRPCStatusCodeKey.Int64(int64(codes.OK)),
semconv.NetSockPeerAddr("127.0.0.1"),
port,
}, pingPong.Attributes())
}

Expand Down

0 comments on commit ef8063b

Please sign in to comment.