Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker state machine refactor #5046

Merged
merged 8 commits into from
Sep 27, 2021
Merged

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Jul 9, 2021

This refactors the worker state machine and copies the scheduler transition machine to a significant amount. There are a few changes in mechanics I want to highlight

  1. There are two new handlers acquire-replica and remove-replica which can be used to collect any kind of task, not merely tasks which are required as a dependency. this required some significant remodelling of the gather_dependency machinery, most notably since it can no longer be guaranteed that every gather_dep call has a cause assigned to it. Therefore, for diagnostics, we'll apply some heuristics to infer one.
  2. The data structure data_needed is no longer a simple list of keys of tasks which require data but rather a heap of keys to be fetched. They are sorted by their compute priority as a proxy to how urgently a worker needs a given task
  3. The logic around batching keys when fetching data from workers stays intact.
  4. The missing data handling was been a frequent source of instabilities. I changed the behaviour to be more deterministic using a periodic callback and straight forward transitions. No suspicious counters any longer, they have been broken for a very long time. See find_tasks. For this a dedicated state missing has been introduced with appropriate transitions
  5. The task release process has been changed such that a released task will no longer be immediately removed form Worker.tasks unless this is safe to do. This is primarily to avoid race conditions relying on state. The most notable change in this is the way we deal with flight and executing tasks. Releasing these tasks historically left either a thread or a async task dangling we lost track of. This has frequently been causing race conditions. Now, these tasks are no longer forgotten, released or removed but rather transitioned to an intermediary state cancelled. This cancelled state will block any further transitions until the thread/async task finishes and transitions the task then to the state, the most recent stimulus requested.
  6. For 'crossing' transitions, i.e. executing->fetch or flight->waiting, we similarly transition a task into an intermediary resumed state following similar semantics as the cancelled state. This idea has been more thoroughly described in Internal worker state transitions #4413
  7. Due to changed semantics in Worker.release_key I highly encourage the immediate deprecation of the WorkerPlugin.release_key hook. Very similar events can be intercepted using the WorkerPlugin.transition hook with more context information. I added the relevant deprecation warnings already

Closes #5323
Closes #4413

Old Description This is another attempt to refactor the worker state machine as drafted out in https://github.com//issues/4413

The transition mechanism is 95% copied from the scheduler side with a few refinements. The state machine, for instance, allows to concatenate states with kwargs and it adds nicer exceptions. This is not necessarily a functional enhancement but should rather help out with tracing if anything goes wrong

More importantly, as we discussed over in #4982 (comment) , our gather_dep mechanism did not work for "tasks without dependents" since every fetched task expects there to be a dependents. On main this is implemented by putting every task-to-be-executed into data_needed such that we'll fetch the dependencies for those tasks.
This PR now puts all tasks in "fetch" state into this data_needed explicitly instead of working indirectly via the dependents. The data_needed is kept sorted using the ordinary priorities which should resemble the previous "fetch dependencies for tasks we want to compute now first". I ran some performance tests on shuffle workloads and couldn't detect any difference. Will try to test a few other workloads but so far I don't see any regressions.
I will leave a few dedicated comment in the code with the relevant logic.

For the remove_replica, I would suggest to refactor the free_keys, superfluous_data, release_key methods to be less complicated in a follow up PR

cc @mrocklin @crusaderky

TODOs (ignore them; these are mental notes for myself):

  • Task stream for transfers is a bit of cheating right now since the "cause" we attached the startstops, strictly speaking, doesn't exist any longer
  • Delete dead code of the old transition system
  • Enable validate_state in self.transitions (this should not finally work; i.e. ensure that the state is valid after every transaction this was previously only partially true).
  • Cleanup all timeouts from debugging
  • Fix skipped/xfailed tests
  • Verify long-running is working as expected since I didn't implement an exit transition
  • Rescheduled missing (should be one of the skipped tests)

Comment on lines 1707 to 1851
def transition_released_fetch__recs(self, ts):
if self.validate:
assert ts.state == "released"
assert ts.runspec is None

for w in ts.who_has:
self.pending_data_per_worker[w].append(ts.key)
ts.state = "fetch"
heapq.heappush(self.data_needed, (ts.priority, ts.key))
return {}, []
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to keep track on scheduler side of "who is fetching what" we could emit a message in here

Comment on lines 2391 to 2360
skipped_worker_in_flight = list()
while self.data_needed and (
len(self.in_flight_workers) < self.total_out_connections
or self.comm_nbytes < self.total_comm_nbytes
):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is now (subjectively) a bit simpler. self.data_needed no longer holds the dependents for which we are fetching dependencies but now it includes the actual dependencies / replicas to fetch

else:
worker = random.choice(list(workers))

to_gather, total_nbytes = self.select_keys_for_gather(worker, ts.key)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even replicas will now benefit from batched fetching, just like ordinary dependencies

Comment on lines 2626 to 2320
# FIXME: With the changed system this no longer makes any sense
# It didn't make much sense before since not all of the tasks
# fetched in this coro are even dependencies of `cause` but are
# simply co-located
if cause:
cause.startstops.append(
{
"action": "transfer",
"start": start + self.scheduler_delay,
"stop": stop + self.scheduler_delay,
"source": worker,
}
)
Copy link
Member Author

@fjetter fjetter Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit awkward. The task stream is built such that the startstops are attached to the cause. The cause / dependent is actually something I would like to remove entirely from gather_dep but with this work around everything works almost like before

@mrocklin
Copy link
Member

@fjetter are you blocked on anything here? Review?

cc'ing also @gforsyth

@fjetter
Copy link
Member Author

fjetter commented Jul 14, 2021

are you blocked on anything here? Review?

not blocked. working on my TODO list and will update shortly. I'll ping once I require a proper review

@fjetter fjetter force-pushed the worker_state_machine_refactor branch from 0980736 to d416520 Compare July 14, 2021 15:56
@fjetter
Copy link
Member Author

fjetter commented Jul 14, 2021

  • Complexity of a few methods (e.g. add_task / compute_task) is now much simpler
  • All transitions use now a similar machinery as the scheduler. This is almost a bit overkill for the worker since most transitions are trivial but it allows for much better control over the allowed transitions.
  • The new transitioning system now also drags around a stimulus_id which should always be generated if an external actor issues a state transitions, e.g. if the scheduler asks for key deletion, a task compute, etc. This ID is kept for all follow up transitions and allows to track the causality of events even across async tasks
  • To allow for replicas to be fetched more naturally, the Worker.data_needed structure no longer holds tasks which require data but rather the tasks we need to fetch. this mostly includes dependencies but also requested replicas
  • The change in semantics of data_needed allowed for simplifications in ensure_compute but made instrumentation a bit more awkward since we attach startstops of transfers to the dependent / cause. I tried a few things to attach the startstops to the tasks themselves instead of the dependent but that wasn't really successful. Now I simply pick one dendent of all tasks to be fetched and this will be the lucky one to be responsible for all transfers. imho, this is not much worse than before since we were never able to account for a direct task<->dependency connection due to the batching

Comment on lines +1859 to +2123
for w in ts.who_has:
self.pending_data_per_worker[w].append(ts.key)
ts.state = "fetch"
heapq.heappush(self.data_needed, (ts.priority, ts.key))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the important change. Every task in state fetch is now part of data_needed

@mrocklin
Copy link
Member

I look forward to reviewing this (although warning, this week is busy for me). In the meantime, two comments:

  1. It looks like there are two handlers to acquire and release replicas, correct? My understanding is that this should unblock @crusaderky . If so, can I ask that you two get together at some point to design a next step on active memory management?
  2. Given that this touches state machine internals it might be worth reaching out to some people that we know who have demanding workloads and ask for them to try things out.

@fjetter
Copy link
Member Author

fjetter commented Jul 14, 2021

It looks like there are two handlers to acquire and release replicas, correct? My understanding is that this should unblock

Yes. The logic is probably not 100%, yet, but it should be good enough to get started with

@fjetter
Copy link
Member Author

fjetter commented Jul 15, 2021

There is a failure in test_secede_balances which deals with concurrently running tasks / active threads. I could imagine some fringe connection to this PR and will look if I can reproduce and connect it to these changes.

OSX errors are

@fjetter
Copy link
Member Author

fjetter commented Jul 15, 2021

The test_secede_balances problem seems to be unrelated. It appears that we never rejoin seceded threads, regardless of whether rejoin is ever called.
I guess the check_thread_leak doesn't catch something like this because I assume the threads are cleaned up by the time we check since the executor should be closed by then. I will follow up in another PR

@mrocklin
Copy link
Member

@fjetter can you say a little bit about the current status here? Is this close to done, or is there still an indeterminate amount of work left to do?

@fjetter
Copy link
Member Author

fjetter commented Jul 30, 2021

Is this close to done, or is there still an indeterminate amount of work left to do?

I think it's good enough for an increment. There are still things I would like to do but considering the scope I will defer this to a later PR. I will give this another shake down and see if I encounter any more instabilities

@fjetter fjetter force-pushed the worker_state_machine_refactor branch from cbb1b28 to c3ceffd Compare August 4, 2021 15:43
@@ -1969,6 +1969,8 @@ def __init__(
("processing", "erred"): self.transition_processing_erred,
("no-worker", "released"): self.transition_no_worker_released,
("no-worker", "waiting"): self.transition_no_worker_waiting,
# TODO: Write a test. Worker disconnects -> no-worker -> reconnect with task to memory. Triggered every few hundred times by test_handle_superfluous_data
Copy link
Member Author

@fjetter fjetter Aug 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Add a test for the no-worker -> memory scheduler transition

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

distributed/scheduler.py Outdated Show resolved Hide resolved
Copy link
Member Author

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is stable now 🤞

Even though I claimed this to be almost done, I was facing stability issues and went all the way implementing #4413, i.e. there are a few new states but I'll leave explanatory comments once I cleaned up

I will still need to cleanup the code base so I discourage reviews until this is through. I will ping once cleaned up and leave section specific comments about new/changed behaviour to guide reviews.

distributed/tests/test_cancelled_state.py Outdated Show resolved Hide resolved
distributed/tests/test_failed_workers.py Outdated Show resolved Hide resolved
distributed/tests/test_failed_workers.py Outdated Show resolved Hide resolved
distributed/tests/test_failed_workers.py Outdated Show resolved Hide resolved
distributed/tests/test_worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
distributed/worker.py Outdated Show resolved Hide resolved
@fjetter fjetter force-pushed the worker_state_machine_refactor branch from bee8e03 to df8e7df Compare August 5, 2021 17:27
@fjetter fjetter mentioned this pull request Aug 5, 2021
5 tasks
Comment on lines 130 to 133
if isinstance(fs, list):
# Below iterator relies on this being a generator to cancel
# remaining futures
fs = (val for val in fs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This chunk should disappear after merging from main

@fjetter fjetter marked this pull request as ready for review August 5, 2021 17:52
@fjetter
Copy link
Member Author

fjetter commented Sep 9, 2021

Edit: I already reverted 1. and 2.

  1. ValueError: I figured the repr of TaskState would be more informative (I don't have a strong opinion here)
  2. I should likely revert this one
  3. The stealing_logs.getvalue() loop only works if I update the variable stealing_logs
  4. I didn't find time to polish this graph/dot representation of the transition states and opted for removal
diff --git a/distributed/stealing.py b/distributed/stealing.py
index 0297691f..24f7ce74 100644
--- a/distributed/stealing.py
+++ b/distributed/stealing.py
@@ -264,7 +264,7 @@ class WorkStealing(SchedulerPlugin):
                     await self.scheduler.remove_worker(thief.address)
                 self.log(("confirm", key, victim.address, thief.address))
             else:
-                raise ValueError(f"Unexpected task state: {state}")
+                raise ValueError(f"Unexpected task state: {ts}")
         except Exception as e:
             logger.exception(e)
             if LOG_PDB:
diff --git a/distributed/tests/test_client_executor.py b/distributed/tests/test_client_executor.py
index f4a90297..fe23f8c7 100644
--- a/distributed/tests/test_client_executor.py
+++ b/distributed/tests/test_client_executor.py
@@ -155,7 +155,7 @@ def test_map(client):
         assert number_of_processing_tasks(client) > 0
         # Garbage collect the iterator => remaining tasks are cancelled
         del it
-        time.sleep(0.5)
+        time.sleep(0.1)
         assert number_of_processing_tasks(client) == 0
 
 
diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py
index be7f1bc5..82fe97be 100644
--- a/distributed/tests/test_steal.py
+++ b/distributed/tests/test_steal.py
@@ -703,9 +703,10 @@ async def test_dont_steal_already_released(c, s, a, b):
     with captured_logger(
         logging.getLogger("distributed.stealing"), level=logging.DEBUG
     ) as stealing_logs:
-        msg = f"Key released between request and confirm: {key}"
-        while msg not in stealing_logs.getvalue():
+        logs = stealing_logs.getvalue()
+        while f"Key released between request and confirm: {key}" not in logs:
             await asyncio.sleep(0.05)
+            logs = stealing_logs.getvalue()
 
 
 @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2)
diff --git a/distributed/worker.py b/distributed/worker.py
index c0e48f26..54152d4a 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1693,46 +1693,6 @@ class Worker(ServerNode):
 
         return recommendations, scheduler_msgs
 
-    def transition_table_to_dot(self, filename="worker-transitions", format=None):
-        import graphviz
-
-        from dask.dot import graphviz_to_file
-
-        g = graphviz.Digraph(
-            graph_attr={
-                "concentrate": "True",
-            },
-            # node_attr=node_attr,
-            # edge_attr=edge_attr
-        )
-        all_states = set()
-        for edge in self._transitions_table.keys():
-            all_states.update(set(edge))
-
-        seen = set()
-        with g.subgraph(name="cluster_0") as c:
-            c.attr(style="filled", color="lightgrey")
-            c.node_attr.update(style="filled", color="white")
-            c.attr(label="executable")
-            for state in (
-                "waiting",
-                "ready",
-                "executing",
-                "constrained",
-                "long-running",
-            ):
-                c.node(state, label=state)
-                seen.add(state)
-
-        with g.subgraph(name="cluster_1") as c:
-            for state in ["fetch", "flight", "missing"]:
-                c.attr(label="dependency")
-                c.node(state, label=state)
-                seen.add(state)
-
-        g.edges(self._transitions_table.keys())
-        return graphviz_to_file(g, filename=filename, format=format)
-
     def handle_compute_task(
         self,
         *,

@crusaderky
Copy link
Collaborator

ValueError: I figured the repr of TaskState would be more informative (I don't have a strong opinion here)

You are doing if state ==... elif state ==... else raise ValueError(). state is a function parameter that has potentially nothing to do with ts._state, so printing ts doesn't give any useful information here.

I should likely revert this one

You set that timeout to 0.5s in main in another PR, which looks like a partial backport of this one. It looks stable there, so I would advise changing it further.

The stealing_logs.getvalue() loop only works if I update the variable stealing_logs

The two algorithms are functionally equivalent

I didn't find time to polish this graph/dot representation of the transition states and opted for removal

OK

@fjetter
Copy link
Member Author

fjetter commented Sep 9, 2021

You are right, of course. Reverted everything except of the removal of the table_to_dot thing 👍

Copy link
Collaborator

@crusaderky crusaderky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's Green Seal of Approval from me 🎊

@fjetter
Copy link
Member Author

fjetter commented Sep 13, 2021

Since there is no more feedback incoming, I intend to merge this PR by tomorrow. If anybody intends to leave another review, please let me know and I can postpone the merge. If nobody raises another issue, I will merge tomorrow around 1:00 pm UTC

@mrocklin
Copy link
Member

mrocklin commented Sep 13, 2021 via email

@fjetter
Copy link
Member Author

fjetter commented Sep 14, 2021

We discussed whether or not we want to merge this in todays dask maintainer meeting. Since this Friday is an upcoming ordinary release scheduled, we settled on delaying merging this PR until after the release. The delay will give us time to address follow up issues and should help us maintain stable releases.

@crusaderky
Copy link
Collaborator

Merge conflicts from main resolved at crusaderky/worker_state_machine_refactor:
https://github.com/crusaderky/distributed/blob/296c3d64adccc77d19bcbc58777294e6e493dab5/distributed/scheduler.py#L5379-L5382

@fjetter
Copy link
Member Author

fjetter commented Sep 27, 2021

Test failures appear to be unrelated.

Thank you @crusaderky and @gforsyth for the review, I know this was a tough one.

@fjetter fjetter merged commit a8d4ffa into dask:main Sep 27, 2021
@mrocklin
Copy link
Member

mrocklin commented Sep 27, 2021 via email

@chrisroat
Copy link
Contributor

Wow. Nice work!

This seems to be a bit more than a refactor, but it's a lot for me to take in. Are there some expected gains, as well? (I regularly need to go in and to kill GKE workloads that get stuck, and wonder if this may help me.)

@fjetter
Copy link
Member Author

fjetter commented Sep 28, 2021

Are there some expected gains, as well? (I regularly need to go in and to kill GKE workloads that get stuck, and wonder if this may help me.)

That's our hope. We hope this will add to stability, in paticular with deadlocks. There are a few other benefits in here

  • prioritized dependency fetching Should Worker.data_needed be priority-ordered? #5323
  • Enablement of explicit replica fetching w/out need for dependents (requirement for active memory management)
  • There are more debugging and tracing information in case a deadlock or similar events happen
  • A few weird edge cases around "missing-data" have been resolved by reworking how missing dependencies are handled. The new system should be more robust
  • Down the line, this should enable speculative task assignment which came up recently more often Speculatively assign tasks to workers #3974

(I regularly need to go in and to kill GKE workloads that get stuck, and wonder if this may help me.)

I've been debugging such stuck workloads for the past >6 months and patched many deadlocks / race conditions. I ended up writing a script for extracting the cluster state once it is stuck, see #5068 If you are facing this situation again you might want to run that script and open an issue with the resulting dump. Best case, this helps me track down your deadlock and makes dask better for everyone.

@chrisroat
Copy link
Contributor

Thanks. I filed #5366 . My tests are showing several stalled graphs. I think it is worse than 2021.9.1, but these things are hard to pin down for certain as the cluster environments can be different (scaling up/down at different rates, or even different # of total nodes available).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Should Worker.data_needed be priority-ordered? Internal worker state transitions
5 participants