diff --git a/src/sentry/taskdemo/__init__.py b/src/sentry/taskdemo/__init__.py index d67723fcfa407e..84f14fe18b58bc 100644 --- a/src/sentry/taskdemo/__init__.py +++ b/src/sentry/taskdemo/__init__.py @@ -17,7 +17,8 @@ @demotasks.register(name="demos.say_hello") def say_hello(name): - logger.info("hello %s", name) + # logger.info("hello %s", name) need to fix logging now that we are running this in another process + print(f"hello {name}") # noqa @demotasks.register(name="demos.broken", retry=Retry(times=5, on=(KeyError,))) diff --git a/src/sentry/taskworker/config.py b/src/sentry/taskworker/config.py index b96efd951388c6..293f00c4e600e0 100644 --- a/src/sentry/taskworker/config.py +++ b/src/sentry/taskworker/config.py @@ -13,7 +13,6 @@ from sentry_protos.sentry.v1alpha.taskworker_pb2 import RetryState, TaskActivation from sentry.conf.types.kafka_definition import Topic -from sentry.taskworker.models import InflightActivationModel from sentry.taskworker.retry import FALLBACK_RETRY, Retry from sentry.taskworker.task import Task from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition @@ -51,6 +50,9 @@ def get(self, name: str) -> Task: raise KeyError(f"No task registered with the name {name}. Check your imports") return self.__registered_tasks[name] + def contains(self, name: str) -> bool: + return name in self.__registered_tasks + def register( self, name: str, @@ -76,11 +78,10 @@ def wrapped(func): return wrapped - def retry_task(self, taskdata: InflightActivationModel) -> None: - message = taskdata.to_proto() + def retry_task(self, taskdata: TaskActivation) -> None: self.producer.produce( ArroyoTopic(name=self.topic), - KafkaPayload(key=None, value=message.activation.SerializeToString(), headers=[]), + KafkaPayload(key=None, value=taskdata.SerializeToString(), headers=[]), ) def send_task(self, task: Task, args, kwargs) -> None: diff --git a/src/sentry/taskworker/consumer_grpc_push.py b/src/sentry/taskworker/consumer_grpc_push.py index 24626e206206bf..6c421d8e76868d 100644 --- a/src/sentry/taskworker/consumer_grpc_push.py +++ b/src/sentry/taskworker/consumer_grpc_push.py @@ -62,10 +62,11 @@ def _dispatch_activation( inflight_activation: InflightActivation, ) -> WorkerServiceStub: try: - timeout_in_sec = inflight_activation.processing_deadline.seconds - time.time() dispatch_task_response = stub.Dispatch( - DispatchRequest(task_activation=inflight_activation.activation), - timeout=timeout_in_sec, + DispatchRequest( + activation=inflight_activation.activation, + processing_deadline=inflight_activation.processing_deadline, + ), ) self.pending_task_store.set_task_status( task_id=inflight_activation.activation.id, diff --git a/src/sentry/taskworker/pending_task_store.py b/src/sentry/taskworker/pending_task_store.py index 8fbe0f45cc1a0d..ee653c6f7a84f9 100644 --- a/src/sentry/taskworker/pending_task_store.py +++ b/src/sentry/taskworker/pending_task_store.py @@ -74,8 +74,8 @@ def handle_retry_state_tasks(self) -> None: status=InflightActivationModel.Status.RETRY ) for item in retry_qs: - task_ns = taskregistry.get(item.task_namespace) - task_ns.retry_task(item) + task_ns = taskregistry.get(item.namespace) + task_ns.retry_task(item.to_proto().activation) # With retries scheduled, the tasks are complete now. retry_qs.update(status=InflightActivationModel.Status.COMPLETE) diff --git a/src/sentry/taskworker/worker_process.py b/src/sentry/taskworker/worker_process.py new file mode 100644 index 00000000000000..ee5122713df7b0 --- /dev/null +++ b/src/sentry/taskworker/worker_process.py @@ -0,0 +1,10 @@ +from django.conf import settings + +from sentry.taskworker.config import taskregistry + + +def _process_activation(namespace, task_name, args, kwargs): + for module in settings.TASKWORKER_IMPORTS: + __import__(module) + + taskregistry.get(namespace).get(task_name)(*args, **kwargs) diff --git a/src/sentry/taskworker/worker_push.py b/src/sentry/taskworker/worker_push.py index fc03adaa3aa44a..f7ee0d1c8d828d 100644 --- a/src/sentry/taskworker/worker_push.py +++ b/src/sentry/taskworker/worker_push.py @@ -3,6 +3,8 @@ import logging import time from concurrent import futures +from datetime import datetime +from multiprocessing.pool import Pool import grpc import orjson @@ -19,6 +21,7 @@ ) from sentry_protos.sentry.v1alpha.taskworker_pb2_grpc import add_WorkerServiceServicer_to_server +from sentry.taskworker import worker_process from sentry.taskworker.config import TaskNamespace, taskregistry logger = logging.getLogger("sentry.taskworker") @@ -47,10 +50,9 @@ def do_imports(self) -> None: __import__(module) def Dispatch(self, request: DispatchRequest, _) -> DispatchResponse: - activation = request.task_activation - try: - task_meta = self.namespace.get(activation.taskname) - except KeyError: + activation = request.activation + + if not self.namespace.contains(activation.taskname): logger.exception("Could not resolve task with name %s", activation.taskname) return @@ -60,12 +62,26 @@ def Dispatch(self, request: DispatchRequest, _) -> DispatchResponse: next_state = TASK_ACTIVATION_STATUS_FAILURE try: task_data_parameters = orjson.loads(activation.parameters) - task_meta(*task_data_parameters["args"], **task_data_parameters["kwargs"]) + with Pool(processes=1) as pool: + result = pool.apply_async( + worker_process._process_activation, + ( + self.options["namespace"], + activation.taskname, + task_data_parameters["args"], + task_data_parameters["kwargs"], + ), + ) + result.get( + timeout=( + request.processing_deadline.ToDatetime() - datetime.now() + ).total_seconds() + ) next_state = TASK_ACTIVATION_STATUS_COMPLETE except Exception as err: logger.info("taskworker.task_errored", extra={"error": str(err)}) # TODO check retry policy - if task_meta.should_retry(activation.retry_state, err): + if self.namespace.get(activation.taskname).should_retry(activation.retry_state, err): logger.info("taskworker.task.retry", extra={"task": activation.taskname}) next_state = TASK_ACTIVATION_STATUS_RETRY task_latency = execution_time - task_added_time