Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queuing inhibits releasing of tasks #7396

Closed
fjetter opened this issue Dec 13, 2022 · 4 comments · Fixed by #7402
Closed

Queuing inhibits releasing of tasks #7396

fjetter opened this issue Dec 13, 2022 · 4 comments · Fixed by #7402
Assignees

Comments

@fjetter
Copy link
Member

fjetter commented Dec 13, 2022

The transition logic for queued tasks is unstable and ordering dependent when releasing tasks.

Specifically, _exit_processing_common is popping things from the worker queues whenever something leaves the processing state, i.e. specifically also during the transition processing->released.

for qts in self._next_queued_tasks_for_worker(ws):
if self.validate:
assert qts.key not in recommendations, recommendations[qts.key]
recommendations[qts.key] = "processing"

recommendations are generated for a key to be transitioned into processing.

This new recommendation will then overwrite an earlier recommendation to release/forget this task

recommendations.update(new_recs)

s.t. the task is never forgotten but are in state processing instead.

Note: These tasks are technically in a corrupt state then because they do not have a TaskState.who_needs and they do not have any dependent with a who_needs but we're not checking this in validate_state which is why this issue didn't pop up, I believe (and I suggest to not introduce this. Walking the entire graph for every task is prohibitively expensive and will slow down our tests)

# minimal reproducer
@gen_cluster(client=True)
async def test_forget_tasks_while_processing(c, s, a, b):
    futures = c.map(inc, range(1000))
    await futures[0]
    await c.close()
    assert not s.tasks
# Original reproducer
@gen_cluster(client=True)
async def test_large_map_first_work(c, s, a, b):
    futures = c.map(inc, range(1000))
    async for _ in as_completed(futures):
        break
    await c.restart()

cc @gjoseph92

@fjetter
Copy link
Member Author

fjetter commented Dec 13, 2022

The impact is relatively mild. We are obviously recomputing some tasks again even though we shouldn't. Once the workers complete, the scheduler will tell them to release the task again. We will have "zombie" tasks in state released on the scheduler but workers will be clean.

When using restart, this situation can produce other follow up failures since we already cleaned up some other state, e.g. TaskPrefixes, see coiled/benchmarks#521 (comment)

@gjoseph92
Copy link
Collaborator

This makes sense, but I'm curious why assert qts.key not in recommendations is not triggering in these cases. When I run test_forget_tasks_while_processing, I don't see that assertion fail. I'll look into it more.

I assume this means that there's a previous batch of recommendations which recommends a task to released, but while processing those recommendations (with a new, empty recommendations dict) we recommend the task to processing and overwrite the old batch. I'm wondering how _exit_processing_common can identify this situation, if state hasn't been changed for about the task in question yet.

@gjoseph92 gjoseph92 self-assigned this Dec 13, 2022
@fjetter
Copy link
Member Author

fjetter commented Dec 13, 2022

I'm concerned this is a more deeply rooted issue caused by us just updating the recommendation dict here

recommendations.update(new_recs)

Having key collisions here will inevitably cause problems. I guess we were lucky so far?

@gjoseph92
Copy link
Collaborator

Yeah, I was also thinking that. You could argue recommendations might make more sense as a stack. I think we may implicitly rely on this behavior in too many places though.


Another thing is that maybe _next_queued_tasks_for_worker shouldn't be part of exit_processing_common. That is, maybe it shouldn't be in response to a task transitioning (because there's really no relationship between the task that exits processing and the task we pop off the queue). Instead, maybe we should do it in response to stimuli as a separate transitions cycle:

diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index dbaa7cfa..153d6c80 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -5291,12 +5291,19 @@ class Scheduler(SchedulerState, ServerNode):
         recommendations, client_msgs, worker_msgs = r
         self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
 
+        recommendations = self.stimulus_task_slot_opened(stimulus_id=stimulus_id)
+        self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
+
         self.send_all(client_msgs, worker_msgs)
 
     def handle_task_erred(self, key: str, stimulus_id: str, **msg) -> None:
         r: tuple = self.stimulus_task_erred(key=key, stimulus_id=stimulus_id, **msg)
         recommendations, client_msgs, worker_msgs = r
         self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
+
+        recommendations = self.stimulus_task_slot_opened(stimulus_id=stimulus_id)
+        self._transitions(recommendations, client_msgs, worker_msgs, stimulus_id)
+
         self.send_all(client_msgs, worker_msgs)
 
     def release_worker_data(self, key: str, worker: str, stimulus_id: str) -> None:

Lastly, by just adding this assertion we at least fail validation:

diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index dbaa7cfa..7c6b763f 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -3088,6 +3088,7 @@ class SchedulerState:
         assert ts not in self.unrunnable
         assert ts not in self.queued
         assert all(dts.who_has for dts in ts.dependencies)
+        assert ts.who_wants or ts.waiters
 
     def _add_to_processing(self, ts: TaskState, ws: WorkerState) -> Msgs:
         """Set a task as processing on a worker and return the worker messages to send"""

So perhaps either:

  1. We should check in _next_queued_tasks_for_worker that the task is actually valid to run (ts.who_wants or ts.waiters). If it's not, leave it alone; another transition should deal with it imminently
  2. Check in _add_to_processing that the task should actually be run; if not, return None or something. All the transition_*_processing functions would then need to handle this case, and recommend the key to released if so.

Currently, I think I'm most in favor of the _next_queued_tasks_for_worker in response to stimuli option. It feels kind of logical that scheduling another queued task happens in response to stimulus, not as a side effect of a transition.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants