Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Root-ish tasks all schedule onto one worker #6573

Closed
gjoseph92 opened this issue Jun 14, 2022 · 6 comments
Closed

Root-ish tasks all schedule onto one worker #6573

gjoseph92 opened this issue Jun 14, 2022 · 6 comments

Comments

@gjoseph92
Copy link
Collaborator

import time
import dask
import distributed

client = distributed.Client(n_workers=4, threads_per_worker=1)

root = dask.delayed(lambda n: "x" * n)(dask.utils.parse_bytes("1MiB"), dask_key_name="root")
results = [dask.delayed(lambda *args: None)(root, i) for i in range(10000)]
dask.compute(results)

Initially a few results tasks run on other workers, but after about .5 sec, all tasks are just running on a single worker and the other three are idle.
Screen Shot 2022-06-13 at 8 08 48 PM

I would have expected these tasks to be evenly assigned to all workers up front

Some variables to play with:

  • If the size of the root task is smaller, tasks will be assigned to other workers
  • If you remove dask_key_name="root", then all tasks (including the root) will all run on the same worker. I assume this is because they have similar same key names (lambda) and therefore the same task group, and some scheduling heuristics are based not on graph structure but on naming heuristics

Distributed version: 2022.6.0

@gjoseph92 gjoseph92 added bug Something is broken performance labels Jun 14, 2022
@fjetter
Copy link
Member

fjetter commented Jun 14, 2022

This is a work stealing problem. If we disable work stealing, it works as expected

import time
import dask
import distributed
with dask.config.set({"distributed.scheduler.work-stealing": False}):
    client = distributed.Client(n_workers=4, threads_per_worker=1)
root = dask.delayed(lambda n: "x" * n)(dask.utils.parse_bytes("1MiB"), dask_key_name="root")
results = [dask.delayed(lambda *args: None)(root, i) for i in range(10000)]
dask.compute(results)

Jun-14-2022 10-26-27

@fjetter
Copy link
Member

fjetter commented Aug 11, 2022

I looked into this briefly today and could narrow this down to a couple of issues

  • Initially, before the execution time could be learned, unknown-task-duration is used for the compute time which basically enables work stealing for all tasks initially for a brief moment. This causes the imbalance.
  • Yet unknown why work stealing even corrects that drastically in the first place see comment below
  • After the first couple of tasks finished we will learn the actual execution time and will see that it is merely a couple of microseconds, i.e. work stealing will always ignore them
    if compute_time < 0.005: # 5ms, just give up
    return None, None
    I'm already arguing in Improve work stealing for scaling situations #4920 that this approach is wrong. This causes the initial imbalance never to be corrected. If these two lines are removed, the occupancy fluctuates but stays roughly homogeneous. Of course, in this situation, no work stealing would still be a better choice

Whether or not unknown tasks are allowed to be stolen is actually a disputed topic.
#5392 disabled this behavior based on a previously written test and #5564 reported this as a regression which caused a revert in #5572

@fjetter
Copy link
Member

fjetter commented Aug 11, 2022

The initial imbalance is cause by work stealing being selecting potential victims greedily in case there are no saturated workers around.

Specifically the following lines

saturated = topk(10, s.workers.values(), key=combined_occupancy)
saturated = [
ws
for ws in saturated
if combined_occupancy(ws) > 0.2 and len(ws.processing) > ws.nthreads
]
are responsible for quite astonishing stealing decisions, e.g. even between two as idle classified workers.

Therefore, as soon as at least one of the workers is classified as idle due to being slightly faster than another worker, this idle worker is allowed to steal work from basically everywhere causing all work to gravitate towards this specific worker.

Finally, this for-loop is exhausting the entire stealable set for the targeted victim/"saturated" worker without accounting for any in-flight occupancy. in-flight occupancy is used for sorting and therefore picking a victim (see combined_occupancy) but we never reevaluate if we should stop stealing from the victim which ultimately causes a severe overcorrection that is very likely responsible for the very aggressive fluctuations we see in stealing often (e.g. #5243)

@fjetter
Copy link
Member

fjetter commented Sep 7, 2022

I investigated what's causing the initial spike of stealing events. Very early in the computation we see that slightly less than 7.5k stealing decisions are enacted which causes this initial imbalance.

Assuming perfect initial task placement, this is explained by the way we update occupancies. Right after the root task finished and all tasks are assigned we can see that the occupancies are already biased due to #7004

image

this bias is amplified in this example due to a double counting problem of occupancies #7003

after an update of the task duration, _reevaluate_occupancy_worker which is ran periodically and selects one worker to recompute the results assuming CPU load is not above a threshold

will reevaluate this, causing in this example the occupancy of the selected worker to drop dramatically since the unknown-duration is, by default 0.5s but the runtime of these tasks is much smaller. This drastic drop causes this worker to be classified as idle and it will steal all the tasks.

Even if this round robin recalculation is replaced with an exact, "always compute all workers" function, the bias introduced by the double counting causes the worker with the dependency to always be classified as idle which causes heavy stealing as well

image

xref #5243

Due to how we determine if keys are allowed to be stolen, this imbalance may never be corrected again, see

compute_time = ws.processing[ts]
if compute_time < 0.005: # 5ms, just give up
return None, None
nbytes = ts.get_nbytes_deps()
transfer_time = nbytes / self.scheduler.bandwidth + LATENCY
cost_multiplier = transfer_time / compute_time
if cost_multiplier > 100:
return None, None
.

In this specific case, this will never be rebalance again because of the fast compute time of the tasks

@crusaderky
Copy link
Collaborator

The reproducer no longer works after #7036.

import time
import dask
import distributed

client = distributed.Client(n_workers=4, threads_per_worker=1)

root = dask.delayed(lambda n: "x" * n)(dask.utils.parse_bytes("1MiB"), dask_key_name="root")
results = [dask.delayed(lambda *args: None)(root, i) for i in range(10000)]
r2 = dask.persist(results)
distributed.wait(r2)

for ws in client.cluster.scheduler.workers.values():
    print(ws.address, len(ws.has_what))

Before #7036:
tcp://127.0.0.1:35985 6881
tcp://127.0.0.1:38925 957
tcp://127.0.0.1:44315 1155
tcp://127.0.0.1:46187 1007

After #7036:
tcp://127.0.0.1:35657 2500
tcp://127.0.0.1:41311 2500
tcp://127.0.0.1:42593 2500
tcp://127.0.0.1:45729 2500

@hendrikmakait hendrikmakait mentioned this issue Oct 4, 2022
2 tasks
@fjetter
Copy link
Member

fjetter commented Oct 18, 2022

After #7036 the original reproducer was timing dependent (pending on a reevaluate_occupancy)

#7075 closes this. It added the original reproducer as a test case

@fjetter fjetter closed this as completed Oct 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants