Skip to content

Commit

Permalink
[python] Add observability instrumentation to asyncio stack (#33992)
Browse files Browse the repository at this point in the history
This is already present in the grpc python sync stack and has been
missing from aio stack.

CC @gnossen           

<!--

If you know who should review your pull request, please assign it to
that
person, otherwise the pull request would get assigned randomly.

If your pull request is for a specific language, please add the
appropriate
lang label.

-->
  • Loading branch information
himasajeev authored Nov 9, 2023
1 parent e1cb290 commit 90af0a1
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 8 deletions.
6 changes: 6 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/_hooks.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ cdef object _custom_op_on_c_call(int op, grpc_call *call):
def install_context_from_request_call_event(RequestCallEvent event):
maybe_save_server_trace_context(event)

def install_context_from_request_call_event_aio(GrpcCallWrapper event):
pass

def uninstall_context():
pass

Expand All @@ -31,5 +34,8 @@ cdef class CensusContext:
def set_census_context_on_call(_CallState call_state, CensusContext census_ctx):
pass

def set_instrumentation_context_on_call_aio(GrpcCallWrapper call_state, CensusContext census_ctx):
pass

def get_deadline_from_context():
return None
23 changes: 19 additions & 4 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,14 @@ cdef class _AioCall(GrpcCallWrapper):

async def unary_unary(self,
bytes request,
tuple outbound_initial_metadata):
tuple outbound_initial_metadata,
object context = None):
"""Performs a unary unary RPC.
Args:
request: the serialized requests in bytes.
outbound_initial_metadata: optional outbound metadata.
context: instrumentation context.
"""
cdef tuple ops

Expand All @@ -313,6 +315,8 @@ cdef class _AioCall(GrpcCallWrapper):
cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)

if context is not None:
set_instrumentation_context_on_call_aio(self, context)
ops = (initial_metadata_op, send_message_op, send_close_op,
receive_initial_metadata_op, receive_message_op,
receive_status_on_client_op)
Expand Down Expand Up @@ -390,7 +394,8 @@ cdef class _AioCall(GrpcCallWrapper):

async def initiate_unary_stream(self,
bytes request,
tuple outbound_initial_metadata):
tuple outbound_initial_metadata,
object context = None):
"""Implementation of the start of a unary-stream call."""
# Peer may prematurely end this RPC at any point. We need a corutine
# that watches if the server sends the final status.
Expand All @@ -406,6 +411,8 @@ cdef class _AioCall(GrpcCallWrapper):
cdef Operation send_close_op = SendCloseFromClientOperation(
_EMPTY_FLAGS)

if context is not None:
set_instrumentation_context_on_call_aio(self, context)
outbound_ops = (
initial_metadata_op,
send_message_op,
Expand All @@ -429,7 +436,8 @@ cdef class _AioCall(GrpcCallWrapper):

async def stream_unary(self,
tuple outbound_initial_metadata,
object metadata_sent_observer):
object metadata_sent_observer,
object context = None):
"""Actual implementation of the complete unary-stream call.
Needs to pay extra attention to the raise mechanism. If we want to
Expand Down Expand Up @@ -460,6 +468,9 @@ cdef class _AioCall(GrpcCallWrapper):
cdef tuple inbound_ops
cdef ReceiveMessageOperation receive_message_op = ReceiveMessageOperation(_EMPTY_FLAGS)
cdef ReceiveStatusOnClientOperation receive_status_on_client_op = ReceiveStatusOnClientOperation(_EMPTY_FLAGS)

if context is not None:
set_instrumentation_context_on_call_aio(self, context)
inbound_ops = (receive_message_op, receive_status_on_client_op)

# Executes all operations in one batch.
Expand All @@ -484,7 +495,8 @@ cdef class _AioCall(GrpcCallWrapper):

async def initiate_stream_stream(self,
tuple outbound_initial_metadata,
object metadata_sent_observer):
object metadata_sent_observer,
object context = None):
"""Actual implementation of the complete stream-stream call.
Needs to pay extra attention to the raise mechanism. If we want to
Expand All @@ -495,6 +507,9 @@ cdef class _AioCall(GrpcCallWrapper):
# that watches if the server sends the final status.
status_task = self._loop.create_task(self._handle_status_once_received())

if context is not None:
set_instrumentation_context_on_call_aio(self, context)

try:
# Sends out initial_metadata ASAP.
await _send_initial_metadata(self,
Expand Down
2 changes: 2 additions & 0 deletions src/python/grpcio/grpc/_cython/_cygrpc/aio/server.pyx.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ async def _finish_handler_with_unary_response(RPCState rpc_state,
# Executes application logic
cdef object response_message
cdef _SyncServicerContext sync_servicer_context
install_context_from_request_call_event_aio(rpc_state)

if _is_async_handler(unary_handler):
# Run async method handlers in this coroutine
Expand Down Expand Up @@ -453,6 +454,7 @@ async def _finish_handler_with_unary_response(RPCState rpc_state,
rpc_state.metadata_sent = True
rpc_state.status_sent = True
await execute_batch(rpc_state, finish_ops, loop)
uninstall_context()


async def _finish_handler_with_stream_responses(RPCState rpc_state,
Expand Down
12 changes: 8 additions & 4 deletions src/python/grpcio/grpc/aio/_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ def __init__(
loop,
)
self._request = request
self._context = cygrpc.build_census_context()
self._invocation_task = loop.create_task(self._invoke())
self._init_unary_response_mixin(self._invocation_task)

Expand All @@ -574,7 +575,7 @@ async def _invoke(self) -> ResponseType:
# https://github.com/python/cpython/blob/edad4d89e357c92f70c0324b937845d652b20afd/Lib/asyncio/tasks.py#L785
try:
serialized_response = await self._cython_call.unary_unary(
serialized_request, self._metadata
serialized_request, self._metadata, self._context
)
except asyncio.CancelledError:
if not self.cancelled():
Expand Down Expand Up @@ -624,6 +625,7 @@ def __init__(
loop,
)
self._request = request
self._context = cygrpc.build_census_context()
self._send_unary_request_task = loop.create_task(
self._send_unary_request()
)
Expand All @@ -635,7 +637,7 @@ async def _send_unary_request(self) -> ResponseType:
)
try:
await self._cython_call.initiate_unary_stream(
serialized_request, self._metadata
serialized_request, self._metadata, self._context
)
except asyncio.CancelledError:
if not self.cancelled():
Expand Down Expand Up @@ -679,13 +681,14 @@ def __init__(
loop,
)

self._context = cygrpc.build_census_context()
self._init_stream_request_mixin(request_iterator)
self._init_unary_response_mixin(loop.create_task(self._conduct_rpc()))

async def _conduct_rpc(self) -> ResponseType:
try:
serialized_response = await self._cython_call.stream_unary(
self._metadata, self._metadata_sent_observer
self._metadata, self._metadata_sent_observer, self._context
)
except asyncio.CancelledError:
if not self.cancelled():
Expand Down Expand Up @@ -731,6 +734,7 @@ def __init__(
response_deserializer,
loop,
)
self._context = cygrpc.build_census_context()
self._initializer = self._loop.create_task(self._prepare_rpc())
self._init_stream_request_mixin(request_iterator)
self._init_stream_response_mixin(self._initializer)
Expand All @@ -743,7 +747,7 @@ async def _prepare_rpc(self):
"""
try:
await self._cython_call.initiate_stream_stream(
self._metadata, self._metadata_sent_observer
self._metadata, self._metadata_sent_observer, self._context
)
except asyncio.CancelledError:
if not self.cancelled():
Expand Down

0 comments on commit 90af0a1

Please sign in to comment.