Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate stateless co-assignment algorithm #7298

Closed
fjetter opened this issue Nov 11, 2022 · 4 comments
Closed

Validate stateless co-assignment algorithm #7298

fjetter opened this issue Nov 11, 2022 · 4 comments
Assignees

Comments

@fjetter
Copy link
Member

fjetter commented Nov 11, 2022

Recent changes to the scheduling heuristics to queue tasks on scheduler were removing the ability to co-assign root tasks. This feature helped to reduce network load by assigning tasks with likely shared dependents to the same worker ahead of time.

The previous implementation of this was a stateful heuristic that cannot be easily ported to the queuing system and has known shortcomings for non-trivial topologies, see #6597

An early attempt to formulate a stateless algorithm to identify groups of tasks that should be co-assigned, in this PR called "families" can be reviewed in #7076 but failed to deliver on our expectations. This approach was ultimately rejected due to complexity, runtime concerns and lack of classification power.

An alternative approach to identify "co-assignment groups" / "cogroups" has been proposed in #7141 with an alternative implementation of the same/similar algorithm in gjoseph92#6

Goals

We want to validate the algorithm in #7141 and perform necessary benchmarks to inform future work or make a decision to enable coassignment

Technical changes required for proper validation

  • The algorithm should be hooked up to queuing code path
  • The worker objective function should be tested and reevaluated
  • Logic needs to be implemented to bypass pure co-assign logic for large clusters to not suffer from too little parallelism

Benchmarking / Perf validation

  • Workloads are identified that should benefit from co-assignment
  • Metrics are identified to measure reduce network transfer. Proposal: dask_worker_transfer_incoming_count_total/dask_worker_transfer_outgoing_count_total + nbytes equivalents
  • Actual impact on runtime performance for a selected set of cluster configurations is measured and documented

Nice to have but not required

  • Up/Downscaling
  • Full lifecycle management of cogroup management (clean up state properly on scheduler)

Non-goals

  • Tune/rewrite the coassignment algorithm. If the proposed algorithm is not suitable, we should drop this effort for now
@gjoseph92
Copy link
Collaborator

We're deciding not to move ahead with this because:

  1. The algorithm doesn't do a reliable enough job of picking reasonable cogroups.
  2. We'd have to add more infrastructure on the scheduler to deal with this than we'd want to.

Summary

  1. The core challenge of coassignment + queuing is a decide_task function: when a worker has an open thread, what's the best next task to run on it (considering co-groups)?
  2. decide_task shouldn't pick a task that's intended to run on a different worker.
  3. If a task is intended to run on a different worker, but not running there yet, we need new state on the scheduler to track this—more than we're willing to add.
  4. If the cogroup algorithm did a better job, we wouldn't have to distinguish between "intended to run" and "running", and therefore wouldn't need new state.
  5. A more trustworthy co-group algorithm would make the scheduler side much easier to implement.

The Algorithm

After a couple tweaks, The Algorithm does an OK job at co-assignment. This looks pretty good, for example (first number is priority, second is cogroup, color is cogroup):

But sometimes, it puts more things in one co-group than necessary. Here's the grouping of anom_mean, a common climatology workload:

anom-mean-graph-cogroups

Notice how some groups are small and include just one part of a tree reduction, but others encompass multiple branches of a tree reduction.

The co-assignment here isn't bad. But it's clearly imbalanced. And not all tasks in the big co-groups should run at the same time.

Using cogroups on the scheduler

Traditionally on the scheduler, we think about our decide_worker function. We have a task that can run—which worker should it use?

With queuing, we need the inverse. When a task completes on a worker, it opens up a thread. We can pick any queued task to run on that worker. Which task is best to pick? This is effectively a decide_task function, given a worker.

Currently, we pick a task by just taking the highest-priority task off a global queue. This is so simple, we don't really think of it as decide_task. But this strategy is why we don't get co-assignment right now.

To make co-assignment work alongside queuing, we need a more sophisticated decide_task function. Logically, something that takes into account which co-groups the worker is already processing, and picks a task from them if possible.

That's not too hard, naively (implemented in #7394). Assuming you have some basic cogroup data structure, where given a TaskState you can quickly get the other tasks in the co-group, you just pick the next queued one. The problem is, what if there are no more queued tasks in that co-group? Then it's time to move on to a new co-group.

If you have the global queue of tasks, maybe you just pick the next task off the front. But what if that task belongs to a co-group, where other tasks in the co-group are already running on a different worker? We effectively need a new state for these tasks—they're not quite processing, since they haven't been sent to a worker, but we already know which worker they'll be on, so they're not quite queued either.

You need some way to quickly find tasks whose co-groups aren't associated with any worker. For speed, you probably need a global queue of CoGroups, or of tasks whose co-groups aren't running anywhere. As soon as any one task in a co-group gets assigned to a worker, you move the CoGroup / tasks off the queue, and add it/them to something on the WorkerState object.

This then opens up all the typical problems:

  1. Is it allowed for tasks within the same co-group to be processing on multiple workers? Planned to run on multiple workers? In memory on multiple workers?
    1. How do you decide when to do this?
    2. How do you represent it?
  2. What do you do when the worker leaves?
  3. What do you do when tasks are cancelled?
  4. If the plan for which workers will run which tasks ends up imbalanced, how do you rebalance it?

In a sense, we'd be re-creating the processing state (task is allocated to a worker), except that the worker wouldn't know about the task yet. All the infrastructure for the processing state (transitions, work stealing to rebalance, dashboard, occupancy, adaptive scaling metrics, etc.) would need equivalents for this new state.

This feels like too much infrastructure to add! Isn't there some clever shortcut?

Yeah—what if we just put the tasks in processing, instead of a new quasi-processing state? Just assign all tasks in the same co-group to processing at once, even if it oversaturates the worker a little.

You can implement this easily, no new state, smallish diff. And it works sometimes: test_basic_sum (the canonical co-assignment test) transfers 55% less data (though not the 98% less data you get with queuing disabled).

But remember how sometimes, the cogroups algorithm makes groups that are too big? If you submit those all at once, then you get root task overproduction—just what queuing was meant to prevent.

Here's a memory timeseries of anom_mean (whose graph is above, where there are a couple very big cogroups):

anom-mean-memory

Notice how if we queue, but submit all tasks in a co-group at once (oversaturating), it's exactly the same as on main when we just turn queuing off! And similarly, if we don't oversaturate a co-group, we get zero benefit—co-assignment effectively doesn't happen, because we don't have good decide_task logic.

Conclusion

So it comes back around to the co-group algorithm not being reliable enough.

If the co-group algorithm could offer a stronger guarantee—namely that all tasks in a co-group must be in memory at the same time, on the same worker, to compute a common dependency—then the scheduler side would be much simpler. Because if all tasks in a co-group have to be in memory at once anyway, then it's fine to submit them all to a worker at once—they can't oversaturate. (This was the core observation in #7076.) Then, we could just have queued and processing instead of a new quasi-processing state.

But if the co-group algorithm doesn't work that well, we have to add lots more infrastructure on the scheduler to hedge against fully committing to its recommendations.

@gjoseph92 gjoseph92 closed this as not planned Won't fix, can't repro, duplicate, stale Dec 14, 2022
@mrocklin
Copy link
Member

Something to add to a blog?

@gjoseph92
Copy link
Collaborator

@mrocklin sure, I'd be happy to copy it over to the dask or coiled blog if it might be interesting to a more general audience

@mrocklin
Copy link
Member

mrocklin commented Dec 14, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants