From 8e5fd62417abb8075c6528399a5fc761e25e98ad Mon Sep 17 00:00:00 2001 From: Enoch Tang Date: Thu, 26 Sep 2024 12:59:12 -0400 Subject: [PATCH] handle grpc timeout with processing deadline --- src/sentry/taskworker/consumer_grpc_push.py | 6 ++++-- src/sentry/taskworker/pending_task_store.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/sentry/taskworker/consumer_grpc_push.py b/src/sentry/taskworker/consumer_grpc_push.py index da8c6bc9e359b4..24626e206206bf 100644 --- a/src/sentry/taskworker/consumer_grpc_push.py +++ b/src/sentry/taskworker/consumer_grpc_push.py @@ -62,8 +62,10 @@ def _dispatch_activation( inflight_activation: InflightActivation, ) -> WorkerServiceStub: try: + timeout_in_sec = inflight_activation.processing_deadline.seconds - time.time() dispatch_task_response = stub.Dispatch( - DispatchRequest(task_activation=inflight_activation.activation) + DispatchRequest(task_activation=inflight_activation.activation), + timeout=timeout_in_sec, ) self.pending_task_store.set_task_status( task_id=inflight_activation.activation.id, @@ -71,7 +73,7 @@ def _dispatch_activation( ) except grpc.RpcError as rpc_error: logger.exception( - "Connection lost with worker, code: %s, details: %s", + "gRPC failed, code: %s, details: %s", rpc_error.code(), rpc_error.details(), ) diff --git a/src/sentry/taskworker/pending_task_store.py b/src/sentry/taskworker/pending_task_store.py index fab39fcd615e3a..8fbe0f45cc1a0d 100644 --- a/src/sentry/taskworker/pending_task_store.py +++ b/src/sentry/taskworker/pending_task_store.py @@ -33,7 +33,7 @@ def get_pending_task(self) -> InflightActivation | None: return None # TODO this duration should be a tasknamespace setting, or with an option - deadline = task.added_at + timedelta(minutes=3) + deadline = datetime.now() + timedelta(minutes=3) task.update( status=InflightActivationModel.Status.PROCESSING, processing_deadline=deadline