Skip to content

Commit

Permalink
Run task in separate process
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang committed Sep 28, 2024
1 parent 8e5fd62 commit 4251bb0
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 16 deletions.
3 changes: 2 additions & 1 deletion src/sentry/taskdemo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

@demotasks.register(name="demos.say_hello")
def say_hello(name):
logger.info("hello %s", name)
# logger.info("hello %s", name) need to fix logging now that we are running this in another process
print(f"hello {name}") # noqa


@demotasks.register(name="demos.broken", retry=Retry(times=5, on=(KeyError,)))
Expand Down
9 changes: 5 additions & 4 deletions src/sentry/taskworker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from sentry_protos.sentry.v1alpha.taskworker_pb2 import RetryState, TaskActivation

from sentry.conf.types.kafka_definition import Topic
from sentry.taskworker.models import InflightActivationModel
from sentry.taskworker.retry import FALLBACK_RETRY, Retry
from sentry.taskworker.task import Task
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
Expand Down Expand Up @@ -51,6 +50,9 @@ def get(self, name: str) -> Task:
raise KeyError(f"No task registered with the name {name}. Check your imports")
return self.__registered_tasks[name]

def contains(self, name: str) -> bool:
return name in self.__registered_tasks

def register(
self,
name: str,
Expand All @@ -76,11 +78,10 @@ def wrapped(func):

return wrapped

def retry_task(self, taskdata: InflightActivationModel) -> None:
message = taskdata.to_proto()
def retry_task(self, taskdata: TaskActivation) -> None:
self.producer.produce(
ArroyoTopic(name=self.topic),
KafkaPayload(key=None, value=message.activation.SerializeToString(), headers=[]),
KafkaPayload(key=None, value=taskdata.SerializeToString(), headers=[]),
)

def send_task(self, task: Task, args, kwargs) -> None:
Expand Down
7 changes: 4 additions & 3 deletions src/sentry/taskworker/consumer_grpc_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ def _dispatch_activation(
inflight_activation: InflightActivation,
) -> WorkerServiceStub:
try:
timeout_in_sec = inflight_activation.processing_deadline.seconds - time.time()
dispatch_task_response = stub.Dispatch(
DispatchRequest(task_activation=inflight_activation.activation),
timeout=timeout_in_sec,
DispatchRequest(
activation=inflight_activation.activation,
processing_deadline=inflight_activation.processing_deadline,
),
)
self.pending_task_store.set_task_status(
task_id=inflight_activation.activation.id,
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/taskworker/pending_task_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def handle_retry_state_tasks(self) -> None:
status=InflightActivationModel.Status.RETRY
)
for item in retry_qs:
task_ns = taskregistry.get(item.task_namespace)
task_ns.retry_task(item)
task_ns = taskregistry.get(item.namespace)
task_ns.retry_task(item.to_proto().activation)
# With retries scheduled, the tasks are complete now.
retry_qs.update(status=InflightActivationModel.Status.COMPLETE)

Expand Down
10 changes: 10 additions & 0 deletions src/sentry/taskworker/worker_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from django.conf import settings

from sentry.taskworker.config import taskregistry


def _process_activation(namespace, task_name, args, kwargs):
for module in settings.TASKWORKER_IMPORTS:
__import__(module)

taskregistry.get(namespace).get(task_name)(*args, **kwargs)
28 changes: 22 additions & 6 deletions src/sentry/taskworker/worker_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import time
from concurrent import futures
from datetime import datetime
from multiprocessing.pool import Pool

import grpc
import orjson
Expand All @@ -19,6 +21,7 @@
)
from sentry_protos.sentry.v1alpha.taskworker_pb2_grpc import add_WorkerServiceServicer_to_server

from sentry.taskworker import worker_process
from sentry.taskworker.config import TaskNamespace, taskregistry

logger = logging.getLogger("sentry.taskworker")
Expand Down Expand Up @@ -47,10 +50,9 @@ def do_imports(self) -> None:
__import__(module)

def Dispatch(self, request: DispatchRequest, _) -> DispatchResponse:
activation = request.task_activation
try:
task_meta = self.namespace.get(activation.taskname)
except KeyError:
activation = request.activation

if not self.namespace.contains(activation.taskname):
logger.exception("Could not resolve task with name %s", activation.taskname)
return

Expand All @@ -60,12 +62,26 @@ def Dispatch(self, request: DispatchRequest, _) -> DispatchResponse:
next_state = TASK_ACTIVATION_STATUS_FAILURE
try:
task_data_parameters = orjson.loads(activation.parameters)
task_meta(*task_data_parameters["args"], **task_data_parameters["kwargs"])
with Pool(processes=1) as pool:
result = pool.apply_async(
worker_process._process_activation,
(
self.options["namespace"],
activation.taskname,
task_data_parameters["args"],
task_data_parameters["kwargs"],
),
)
result.get(
timeout=(
request.processing_deadline.ToDatetime() - datetime.now()
).total_seconds()
)
next_state = TASK_ACTIVATION_STATUS_COMPLETE
except Exception as err:
logger.info("taskworker.task_errored", extra={"error": str(err)})
# TODO check retry policy
if task_meta.should_retry(activation.retry_state, err):
if self.namespace.get(activation.taskname).should_retry(activation.retry_state, err):
logger.info("taskworker.task.retry", extra={"task": activation.taskname})
next_state = TASK_ACTIVATION_STATUS_RETRY
task_latency = execution_time - task_added_time
Expand Down

0 comments on commit 4251bb0

Please sign in to comment.