diff --git a/src/sentry/taskworker/consumer_grpc_push.py b/src/sentry/taskworker/consumer_grpc_push.py index da8c6bc9e359b..24626e206206b 100644 --- a/src/sentry/taskworker/consumer_grpc_push.py +++ b/src/sentry/taskworker/consumer_grpc_push.py @@ -62,8 +62,10 @@ def _dispatch_activation( inflight_activation: InflightActivation, ) -> WorkerServiceStub: try: + timeout_in_sec = inflight_activation.processing_deadline.seconds - time.time() dispatch_task_response = stub.Dispatch( - DispatchRequest(task_activation=inflight_activation.activation) + DispatchRequest(task_activation=inflight_activation.activation), + timeout=timeout_in_sec, ) self.pending_task_store.set_task_status( task_id=inflight_activation.activation.id, @@ -71,7 +73,7 @@ def _dispatch_activation( ) except grpc.RpcError as rpc_error: logger.exception( - "Connection lost with worker, code: %s, details: %s", + "gRPC failed, code: %s, details: %s", rpc_error.code(), rpc_error.details(), ) diff --git a/src/sentry/taskworker/pending_task_store.py b/src/sentry/taskworker/pending_task_store.py index fab39fcd615e3..8fbe0f45cc1a0 100644 --- a/src/sentry/taskworker/pending_task_store.py +++ b/src/sentry/taskworker/pending_task_store.py @@ -33,7 +33,7 @@ def get_pending_task(self) -> InflightActivation | None: return None # TODO this duration should be a tasknamespace setting, or with an option - deadline = task.added_at + timedelta(minutes=3) + deadline = datetime.now() + timedelta(minutes=3) task.update( status=InflightActivationModel.Status.PROCESSING, processing_deadline=deadline