Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement basic consumer push model #78098

Merged
merged 11 commits into from
Oct 1, 2024
14 changes: 5 additions & 9 deletions src/sentry/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,24 +259,20 @@ def worker(ignore_unknown_queues: bool, **options: Any) -> None:
@log_options()
@configuration
def taskworker(**options: Any) -> None:
from sentry.taskworker.worker import Worker
from sentry.taskworker.worker import serve

with managed_bgtasks(role="taskworker"):
worker = Worker(
namespace=options.get("namespace"),
)
worker.start()
raise SystemExit(worker.exitcode)
serve(**options)


@run.command()
@log_options()
@configuration
def kafka_task_grpc_server(**options: Any) -> None:
from sentry.taskworker.grpc_server import serve
def kafka_task_grpc(**options: Any) -> None:
from sentry.taskworker.consumer_grpc import start

with managed_bgtasks(role="taskworker"):
serve()
start()


@run.command()
Expand Down
57 changes: 57 additions & 0 deletions src/sentry/taskworker/consumer_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
import time

import grpc
from sentry_protos.sentry.v1alpha.taskworker_pb2 import (
TASK_ACTIVATION_STATUS_PENDING,
DispatchRequest,
)
from sentry_protos.sentry.v1alpha.taskworker_pb2_grpc import WorkerServiceStub

from sentry.taskworker.pending_task_store import PendingTaskStore

logger = logging.getLogger("sentry.taskworker.grpc_server")


class ConsumerGrpc:
def __init__(self) -> None:
self.pending_task_store = PendingTaskStore()
self.host = "localhost"
self.server_port = 50051
self.channel = grpc.insecure_channel(f"{self.host}:{self.server_port}")
self.stub = WorkerServiceStub(self.channel)

def start(self):
while True:
self.dispatch_task()
Copy link
Member Author

@john-z-yang john-z-yang Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have a multithreading pool or async here since consumer and worker are unlikely to be 1:1. We only query the db when an ongoing connection with the worker closes. Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think multithreading is going to be necessary. I think we'll want to pair the threadpool count to be <= the number of workers per partition-consumer. Having more threads than workers feels like it would lead to contention on workers.


def dispatch_task(self):
in_flight_activation = self.pending_task_store.get_pending_task()
if not in_flight_activation:
logger.info("No tasks")
time.sleep(1)
return
try:
dispatch_task_response = self.stub.Dispatch(
DispatchRequest(task_activation=in_flight_activation.activation)
)
self.pending_task_store.set_task_status(
task_id=in_flight_activation.activation.id,
task_status=dispatch_task_response.status,
)
except grpc.RpcError as rpc_error:
logger.exception(
"Connection lost with worker, code: %s, details: %s",
rpc_error.code(),
rpc_error.details(),
)
self.pending_task_store.set_task_status(
task_id=in_flight_activation.activation.id,
task_status=TASK_ACTIVATION_STATUS_PENDING,
)
Copy link
Member Author

@john-z-yang john-z-yang Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm questioning whether this is the correct behaviour. My intuition is to not use the same code paths to handle worker connection failure because this seems to be an error with the platform, so it should not consume quotas set by the Task defined by the user.
The argument can be made that the execution of the task can cause the worker to OOM, which is a user issue.

Regardless, not having a check here could be dangerous in a production environment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we fail to connect to a worker, the task wouldn't have been processed and restoring the state to pending makes sense to me. We'd also need to reset the processing_deadline for the activation so that it doesn't get flagged as a worker timeout and retried later.

time.sleep(1)


def start():
consumer_grpc = ConsumerGrpc()
consumer_grpc.start()
49 changes: 0 additions & 49 deletions src/sentry/taskworker/grpc_server.py

This file was deleted.

59 changes: 23 additions & 36 deletions src/sentry/taskworker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,35 @@

import logging
import time
from concurrent import futures

import grpc
import orjson
from django.conf import settings
from sentry_protos.sentry.v1alpha.taskworker_pb2 import (
TASK_ACTIVATION_STATUS_COMPLETE,
TASK_ACTIVATION_STATUS_FAILURE,
TASK_ACTIVATION_STATUS_RETRY,
DispatchRequest,
DispatchResponse,
)
from sentry_protos.sentry.v1alpha.taskworker_pb2_grpc import (
WorkerServiceServicer as BaseWorkerServiceServicer,
)
from sentry_protos.sentry.v1alpha.taskworker_pb2_grpc import add_WorkerServiceServicer_to_server

from sentry.taskworker.config import TaskNamespace, taskregistry

logger = logging.getLogger("sentry.taskworker")


class Worker:
class WorkerServicer(BaseWorkerServiceServicer):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good if we had a way to have the push and pull models co-exist in the repository at the same time. That would help with throughput comparisons as we could run the two options close to each other.

Perhaps we can have two Worker implementations that are toggled with CLI flags?

Copy link
Member Author

@john-z-yang john-z-yang Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we split the command into push and pull variants for worker and grpc

__namespace: TaskNamespace | None = None

def __init__(self, **options):
def __init__(self, **options) -> None:
super().__init__()
self.options = options
self.exitcode = None
self.do_imports()

@property
def namespace(self) -> TaskNamespace:
Expand All @@ -36,25 +45,8 @@ def do_imports(self) -> None:
for module in settings.TASKWORKER_IMPORTS:
__import__(module)

def start(self) -> None:
self.do_imports()
try:
while True:
self.process_tasks(self.namespace)
except KeyboardInterrupt:
self.exitcode = 1
except Exception:
logger.exception("Worker process crashed")

def process_tasks(self, namespace: TaskNamespace) -> None:
from sentry.taskworker.service.client import task_client

activation = task_client.get_task(topic=namespace.topic)
if not activation:
logger.info("No tasks")
time.sleep(1)
return

def Dispatch(self, request: DispatchRequest, _) -> DispatchResponse:
activation = request.task_activation
try:
task_meta = self.namespace.get(activation.taskname)
except KeyError:
Expand All @@ -78,17 +70,12 @@ def process_tasks(self, namespace: TaskNamespace) -> None:
task_latency = execution_time - task_added_time
logger.info("task.complete", extra={"latency": task_latency})

if next_state == TASK_ACTIVATION_STATUS_COMPLETE:
logger.info(
"taskworker.task.complete", extra={"task": activation.taskname, "id": activation.id}
)
task_client.complete_task(task_id=activation.id)
else:
logger.info(
"taskworker.task.change_status",
extra={"task": activation.taskname, "state": next_state},
)
task_client.set_task_status(
task_id=activation.id,
task_status=next_state,
)
return DispatchResponse(status=next_state)


def serve(**options):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
add_WorkerServiceServicer_to_server(WorkerServicer(**options), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
Loading