Skip to content

Commit

Permalink
gRPC instrumentation: client additions (#269)
Browse files Browse the repository at this point in the history
The docs on metric labels suggests that they should probably be strings,
and all others I can find are strings, and so these ought to be also.
Otherwise, some of the exporters/processors have to handle things
specifically, and not all of these come out as nice as could be when you
`str()` them.

I've also made sure to use the `StatusCode` name, as that's the
interesting thing.

Finally, there's no need to report specifically that `error=false`, so
I've removed that tag.
  • Loading branch information
alertedsnake authored Feb 5, 2021
1 parent 55efeb6 commit ade29f6
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 51 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/246))
- Update TraceState to adhere to specs
([#276](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/276))
- `opentelemetry-instrumentation-grpc` Updated client attributes, added tests, fixed examples, docs
([#269](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/269))

### Removed
- Remove Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,29 +185,33 @@ class GrpcInstrumentorClient(BaseInstrumentor):
"""

# Figures out which channel type we need to wrap
def _which_channel(self, kwargs):
# handle legacy argument
if "channel_type" in kwargs:
if kwargs.get("channel_type") == "secure":
return ("secure_channel",)
return ("insecure_channel",)

# handle modern arguments
types = []
for ctype in ("secure_channel", "insecure_channel"):
if kwargs.get(ctype, True):
types.append(ctype)

return tuple(types)

def _instrument(self, **kwargs):
exporter = kwargs.get("exporter", None)
interval = kwargs.get("interval", 30)
if kwargs.get("channel_type") == "secure":
for ctype in self._which_channel(kwargs):
_wrap(
"grpc",
"secure_channel",
partial(self.wrapper_fn, exporter, interval),
)

else:
_wrap(
"grpc",
"insecure_channel",
partial(self.wrapper_fn, exporter, interval),
"grpc", ctype, partial(self.wrapper_fn, exporter, interval),
)

def _uninstrument(self, **kwargs):
if kwargs.get("channel_type") == "secure":
unwrap(grpc, "secure_channel")

else:
unwrap(grpc, "insecure_channel")
for ctype in self._which_channel(kwargs):
unwrap(grpc, ctype)

def wrapper_fn(
self, exporter, interval, original_func, instance, args, kwargs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,16 @@ def __init__(self, tracer, exporter, interval):
)

def _start_span(self, method):
service, meth = method.lstrip("/").split("/", 1)
attributes = {
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
"rpc.method": meth,
"rpc.service": service,
}

return self._tracer.start_as_current_span(
name=method, kind=trace.SpanKind.CLIENT
name=method, kind=trace.SpanKind.CLIENT, attributes=attributes
)

# pylint:disable=no-self-use
Expand Down Expand Up @@ -133,6 +141,7 @@ def _trace_result(self, guarded_span, rpc_info, result, client_info):
self._metrics_recorder.record_bytes_in(
response.ByteSize(), client_info.full_method
)

return result

def _start_guarded_span(self, *args, **kwargs):
Expand Down Expand Up @@ -175,11 +184,14 @@ def intercept_unary(self, request, metadata, client_info, invoker):

try:
result = invoker(request, metadata)
except grpc.RpcError:
except grpc.RpcError as err:
guarded_span.generated_span.set_status(
Status(StatusCode.ERROR)
)
raise
guarded_span.generated_span.set_attribute(
"rpc.grpc.status_code", err.code().value[0]
)
raise err

return self._trace_result(
guarded_span, rpc_info, result, client_info
Expand Down Expand Up @@ -230,9 +242,12 @@ def _intercept_server_stream(
response.ByteSize(), client_info.full_method
)
yield response
except grpc.RpcError:
except grpc.RpcError as err:
span.set_status(Status(StatusCode.ERROR))
raise
span.set_attribute(
"rpc.grpc.status_code", err.code().value[0]
)
raise err

def intercept_stream(
self, request_or_iterator, metadata, client_info, invoker
Expand Down Expand Up @@ -268,11 +283,14 @@ def intercept_stream(

try:
result = invoker(request_or_iterator, metadata)
except grpc.RpcError:
except grpc.RpcError as err:
guarded_span.generated_span.set_status(
Status(StatusCode.ERROR)
)
raise
guarded_span.generated_span.set_attribute(
"rpc.grpc.status_code", err.code().value[0],
)
raise err

return self._trace_result(
guarded_span, rpc_info, result, client_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,33 +72,32 @@ def __init__(self, meter, span_kind):

def record_bytes_in(self, bytes_in, method):
if self._meter:
labels = {"method": method}
labels = {"rpc.method": method}
self._bytes_in.add(bytes_in, labels)

def record_bytes_out(self, bytes_out, method):
if self._meter:
labels = {"method": method}
labels = {"rpc.method": method}
self._bytes_out.add(bytes_out, labels)

@contextmanager
def record_latency(self, method):
start_time = time()
labels = {
"method": method,
"status_code": grpc.StatusCode.OK, # pylint:disable=no-member
"rpc.method": method,
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.name,
}
try:
yield labels
except grpc.RpcError as exc: # pylint:disable=no-member
if self._meter:
# pylint: disable=no-member
labels["status_code"] = exc.code()
labels["rpc.grpc.status_code"] = exc.code().name
self._error_count.add(1, labels)
labels["error"] = True
labels["error"] = "true"
raise
finally:
if self._meter:
if "error" not in labels:
labels["error"] = False
elapsed_time = (time() - start_time) * 1000
self._duration.record(elapsed_time, labels)
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,21 @@ def _verify_success_records(self, num_bytes_out, num_bytes_in, method):

self.assertIsNotNone(bytes_out)
self.assertEqual(bytes_out.instrument.name, "grpcio/client/bytes_out")
self.assertEqual(bytes_out.labels, (("method", method),))
self.assertEqual(bytes_out.labels, (("rpc.method", method),))

self.assertIsNotNone(bytes_in)
self.assertEqual(bytes_in.instrument.name, "grpcio/client/bytes_in")
self.assertEqual(bytes_in.labels, (("method", method),))
self.assertEqual(bytes_in.labels, (("rpc.method", method),))

self.assertIsNotNone(duration)
self.assertEqual(duration.instrument.name, "grpcio/client/duration")
self.assertEqual(
duration.labels,
(
("error", False),
("method", method),
("status_code", grpc.StatusCode.OK),
),
self.assertSequenceEqual(
sorted(duration.labels),
[
("rpc.grpc.status_code", grpc.StatusCode.OK.name),
("rpc.method", method),
("rpc.system", "grpc"),
],
)

self.assertEqual(type(bytes_out.aggregator), SumAggregator)
Expand Down Expand Up @@ -116,6 +116,16 @@ def test_unary_unary(self):

self._verify_success_records(8, 8, "/GRPCTestServer/SimpleMethod")

self.assert_span_has_attributes(
span,
{
"rpc.method": "SimpleMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_unary_stream(self):
server_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
Expand All @@ -134,6 +144,16 @@ def test_unary_stream(self):
8, 40, "/GRPCTestServer/ServerStreamingMethod"
)

self.assert_span_has_attributes(
span,
{
"rpc.method": "ServerStreamingMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_stream_unary(self):
client_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
Expand All @@ -152,6 +172,16 @@ def test_stream_unary(self):
40, 8, "/GRPCTestServer/ClientStreamingMethod"
)

self.assert_span_has_attributes(
span,
{
"rpc.method": "ClientStreamingMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def test_stream_stream(self):
bidirectional_streaming_method(self._stub)
spans = self.memory_exporter.get_finished_spans()
Expand All @@ -172,6 +202,16 @@ def test_stream_stream(self):
40, 40, "/GRPCTestServer/BidirectionalStreamingMethod"
)

self.assert_span_has_attributes(
span,
{
"rpc.method": "BidirectionalStreamingMethod",
"rpc.service": "GRPCTestServer",
"rpc.system": "grpc",
"rpc.grpc.status_code": grpc.StatusCode.OK.value[0],
},
)

def _verify_error_records(self, method):
# pylint: disable=protected-access,no-member
self.channel._interceptor.controller.tick()
Expand All @@ -195,21 +235,33 @@ def _verify_error_records(self, method):
self.assertIsNotNone(duration)

self.assertEqual(errors.instrument.name, "grpcio/client/errors")
self.assertEqual(
errors.labels,
(
("method", method),
("status_code", grpc.StatusCode.INVALID_ARGUMENT),
self.assertSequenceEqual(
sorted(errors.labels),
sorted(
(
(
"rpc.grpc.status_code",
grpc.StatusCode.INVALID_ARGUMENT.name,
),
("rpc.method", method),
("rpc.system", "grpc"),
)
),
)
self.assertEqual(errors.aggregator.checkpoint, 1)

self.assertEqual(
duration.labels,
(
("error", True),
("method", method),
("status_code", grpc.StatusCode.INVALID_ARGUMENT),
self.assertSequenceEqual(
sorted(duration.labels),
sorted(
(
("error", "true"),
("rpc.method", method),
("rpc.system", "grpc"),
(
"rpc.grpc.status_code",
grpc.StatusCode.INVALID_ARGUMENT.name,
),
)
),
)

Expand Down

0 comments on commit ade29f6

Please sign in to comment.