Skip to content

Commit

Permalink
handle grpc timeout with processing deadline
Browse files Browse the repository at this point in the history
  • Loading branch information
enochtangg committed Sep 26, 2024
1 parent 61ecc8c commit 8e5fd62
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
6 changes: 4 additions & 2 deletions src/sentry/taskworker/consumer_grpc_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,18 @@ def _dispatch_activation(
inflight_activation: InflightActivation,
) -> WorkerServiceStub:
try:
timeout_in_sec = inflight_activation.processing_deadline.seconds - time.time()
dispatch_task_response = stub.Dispatch(
DispatchRequest(task_activation=inflight_activation.activation)
DispatchRequest(task_activation=inflight_activation.activation),
timeout=timeout_in_sec,
)
self.pending_task_store.set_task_status(
task_id=inflight_activation.activation.id,
task_status=dispatch_task_response.status,
)
except grpc.RpcError as rpc_error:
logger.exception(
"Connection lost with worker, code: %s, details: %s",
"gRPC failed, code: %s, details: %s",
rpc_error.code(),
rpc_error.details(),
)
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/taskworker/pending_task_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_pending_task(self) -> InflightActivation | None:
return None

# TODO this duration should be a tasknamespace setting, or with an option
deadline = task.added_at + timedelta(minutes=3)
deadline = datetime.now() + timedelta(minutes=3)

task.update(
status=InflightActivationModel.Status.PROCESSING, processing_deadline=deadline
Expand Down

0 comments on commit 8e5fd62

Please sign in to comment.