Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support collective style tasks #8624

Open
trivialfis opened this issue Apr 19, 2024 · 11 comments
Open

Support collective style tasks #8624

trivialfis opened this issue Apr 19, 2024 · 11 comments

Comments

@trivialfis
Copy link

trivialfis commented Apr 19, 2024

Hi, this is a feature request for distributed to support collective-style tasks. MPI-style programming is widely used in machine learning for sample-based parallelism. Examples are gradient boosting and neural networks. Both of them use some form of allreduce to aggregate gradient information.

The feature request can be divided into two parts. The first is a notion of grouped tasks, and the second is an abstraction for obtaining worker-local data without OOM. Collective communication requires all workers to present in the same communication group, which means tasks should be launched and finished together. In addition, error handling needs to be synchronized. If one of the tasks fails, then all the other tasks should also be restarted. For the second part, since collective tasks are usually aware of the workers and each task processes data residing on its local worker. It would be nice to have an abstraction in dask or distributed to obtain local partitions as iterators with data spilling support.

The feature request does not require distributed to implement communication algorithms like barrier or allreduce. Applications are likely to have their communication channels like gloo or nccl.

Alternative

Currently, XGBoost specifies a unique worker address for each task and acquires a MultiLock to ensure all workers in the group are available during execution. This has the drawback of breaking the error recovery code inside distributed.
As for local data, XGBoost simply collects them as numpy arrays or pandas dataframe, which forces all the data to be loaded into memory and disregards Dask’s data spilling, leading to significant memory overhead.

Related

@mrocklin
Copy link
Member

In principle I agree that some kind of collective tasks would be useful for a variety of applications (XGBoost, distributed pytorch, MPI, dataframe shuffling, rechunking). So in principle I'd say that what you're asking for is in scope.

The thing to do here is to come up with a technical design that achieves what these use cases need, fits in nicely to the existing state machines, and doesn't significantly increase maintenance burden / complexity of other parts of Dask (or is accompanied with a suitable long-term plan to address that increased complexity / maintenance burden).

This is achievable, but hard. I think a good first step for you (or anyone) who wanted to push on this, would be to learn a lot more about internal Dask scheduling mechanics. Of course, there's no obligation here; that's just what I would recommend as a first step if you wanted to push on this yourself.

@fjetter
Copy link
Member

fjetter commented Apr 19, 2024

FWIW I see potential in factoring out some of this from the existing P2P extension. The P2P extension is already written in a way that splits the error handling / state machine integration from the business logic. I could see this being used for a more general API. This wouldn't be trivial of course but I don't think we'd have to start from scratch

@trivialfis
Copy link
Author

Doing some new tests with the Kubernetes operator today with dask and ran into this issue. The cluster can allocate new pods during task processing, which breaks XGBoost since XGBoost needs to know the exact number of nodes being used. If a new node pops up or disappears while a fraction of XGBoost nodes are pending for the rest of the communication groups, the behavior is undefined (and difficult to define even though I can debug what's happening).

@jacobtomlinson
Copy link
Member

I can see cases where generic collective operations would be useful. In this case it sounds like this particular situation was caused by a Pending worker becoming Ready during an XGBoost operation. Perhaps as a workaround you need to run client.wait_for_workers(n) to ensure your cluster is ready before you begin processing.

@trivialfis
Copy link
Author

trivialfis commented May 13, 2024

Thank you for the suggestion. XGBoost uses the workers from the data partitions, so if there's data on the worker, it's available. The issue is, at some point after knowing the available workers, but before training starts, a new worker shows up.

@trivialfis
Copy link
Author

Is there a way for a plugin (any plugin) to know when a task has aborted? I looked into the scheduler plugin, there's a transition for exception but not abort.

@fjetter
Copy link
Member

fjetter commented Nov 4, 2024

What exactly do you mean with "abort"?

@trivialfis
Copy link
Author

os.abort()
sys.exit(-1)
std::terminate()

In general, things that kill the worker.

@fjetter
Copy link
Member

fjetter commented Nov 4, 2024

that would be any kind of transition with start state processing and a final state that isn't memory.

A word of caution: Implementing scheduler plugins that react to these kind of things can be very complex since there are a bazillion races one has to think through. If you share a bit more of what you want to implement we might be able to help

@trivialfis
Copy link
Author

trivialfis commented Nov 4, 2024

Thank you for the answer. I'm looking for a way to implement resilience support for XGBoost. As described in this issue, all workers need to fail together, and there need to be a way to communicate the failure with healthy workers so that they can restart gracefully.

I'm still at the drawing board at the moment and gathering what can be done and what cannot be done. There are many potential states when stuttering can happen, so I would love to learn as much as possible before coding

@fjetter
Copy link
Member

fjetter commented Nov 4, 2024

Thank you for the answer. I'm looking for a way to implement resilience support for XGBoost. As described in this issue, all workers need to fail together, and there need to be a way to communicate the failure with healthy workers so that they can restart gracefully.

Yes, that's what I thought. This is what we're doing in our P2P extension for shuffling. There are many race conditions this can trigger and it's not just about catching that single transition on the scheduler

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants