-
-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implement basic consumer push model #78098
base: hackweek-kafkatasks
Are you sure you want to change the base?
Conversation
logger.exception( | ||
"Connection lost with worker, code: %s, details: %s", | ||
rpc_error.code(), | ||
rpc_error.details(), | ||
) | ||
self.pending_task_store.set_task_status( | ||
task_id=in_flight_activation.activation.id, | ||
task_status=TASK_ACTIVATION_STATUS_PENDING, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm questioning whether this is the correct behaviour. My intuition is to not use the same code paths to handle worker connection failure because this seems to be an error with the platform, so it should not consume quotas set by the Task defined by the user.
The argument can be made that the execution of the task can cause the worker to OOM, which is a user issue.
Regardless, not having a check here could be dangerous in a production environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we fail to connect to a worker, the task wouldn't have been processed and restoring the state to pending makes sense to me. We'd also need to reset the processing_deadline
for the activation so that it doesn't get flagged as a worker timeout and retried later.
while True: | ||
self.dispatch_task() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should have a multithreading pool or async here since consumer and worker are unlikely to be 1:1. We only query the db when an ongoing connection with the worker closes. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think multithreading is going to be necessary. I think we'll want to pair the threadpool count to be <= the number of workers per partition-consumer. Having more threads than workers feels like it would lead to contention on workers.
while True: | ||
self.dispatch_task() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think multithreading is going to be necessary. I think we'll want to pair the threadpool count to be <= the number of workers per partition-consumer. Having more threads than workers feels like it would lead to contention on workers.
logger.exception( | ||
"Connection lost with worker, code: %s, details: %s", | ||
rpc_error.code(), | ||
rpc_error.details(), | ||
) | ||
self.pending_task_store.set_task_status( | ||
task_id=in_flight_activation.activation.id, | ||
task_status=TASK_ACTIVATION_STATUS_PENDING, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we fail to connect to a worker, the task wouldn't have been processed and restoring the state to pending makes sense to me. We'd also need to reset the processing_deadline
for the activation so that it doesn't get flagged as a worker timeout and retried later.
src/sentry/taskworker/worker.py
Outdated
|
||
from sentry.taskworker.config import TaskNamespace, taskregistry | ||
|
||
logger = logging.getLogger("sentry.taskworker") | ||
|
||
|
||
class Worker: | ||
class WorkerServicer(BaseWorkerServiceServicer): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good if we had a way to have the push and pull models co-exist in the repository at the same time. That would help with throughput comparisons as we could run the two options close to each other.
Perhaps we can have two Worker implementations that are toggled with CLI flags?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, we split the command into push
and pull
variants for worker and grpc
5e62a25
to
0ca7a36
Compare
with ThreadPoolExecutor(max_workers=len(self.available_stubs)) as executor: | ||
logger.info("Starting consumer grpc with %s threads", len(self.available_stubs)) | ||
while True: | ||
inflight_activation = self._poll_pending_task() | ||
|
||
if len(self.available_stubs) == 0: | ||
done, not_done = wait(self.current_connections, return_when=FIRST_COMPLETED) | ||
self.available_stubs.extend([future.result() for future in done]) | ||
self.current_connections = not_done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ideally this should be done via async grpc instead of regular grpc with multithreading, because with async we only perform preemption during IO, where as with multithreading it's up to the discretion of cpython.
But I am not sure how to replicate this behaviour (where we only poll db when a stub is free) with async python, and if the difference is enough to pose measurable performance benefits. So I decided to implement this instead
@@ -33,7 +33,7 @@ def get_pending_task(self) -> InflightActivation | None: | |||
return None | |||
|
|||
# TODO this duration should be a tasknamespace setting, or with an option | |||
deadline = task.added_at + timedelta(minutes=3) | |||
deadline = datetime.now() + timedelta(minutes=3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The processing deadline should be the timestamp at which the task was pulled out of the datastore to be sent/received to/by the worker plus some adjustable duration. The added_at
timestamp is first captured when the message was read from kafka and inserted to the datastore. In the scenario where the worker does not pick up the task before task.added_at + <adjustable duration>
, then the task can never be completed. More simply, the current time will always be ahead of processing_deadline.
timeout_in_sec = inflight_activation.processing_deadline.seconds - time.time() | ||
dispatch_task_response = stub.Dispatch( | ||
DispatchRequest(task_activation=inflight_activation.activation), | ||
timeout=timeout_in_sec, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this a bit more, a timeout might be better handled on server (worker) side instead of client side.
Here's my reasoning:
I think when a task times out, this should be considered as if it has thrown an exception, because it is not a platform problem (like failing to connect to a worker), but an issue with the execution of the task itself. So it should go into the same flow in the worker that determines the next state of the activation (here) instead of requeuing the task into the store like what we're doing right now.
@enochtangg what do you think?
4251bb0
to
5b17c85
Compare
Overview
Verified that this works with super long running tasks (30 minutes or more) and the grpc server and client seems to maintain an active socket connect this whole time, so if the worker dies, the client immediately sees it.
Start the arroyo consumer as before
Start the consumer GRPC with
Start the worker with
With Django shell, run
The work should be distributed pretty evenly across the 2 workers
worker 1
worker 2