-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Dask-on-Ray] Add Dask config helper, set task-based shuffle by default. #21114
[Dask-on-Ray] Add Dask config helper, set task-based shuffle by default. #21114
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. I've always wanted this override to happen by default!
@ericl Once I saw that Dask Distributed was doing it, I felt a lot better about it. 😄 Although they still have the option |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job finding this. Is there a way to add a test?
//python/ray/util/dask:test_dask_optimization FAILED in 3 out of 3 in 9.9s |
…n't trigger auto-init.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Request change to add test cases?
@rkooo567 Sure, I guess I could remove the explicit setting of the scheduler in the scheduler tests now that it happens automatically at import time. |
Does this mean we allow to disable this in some cases? If so, I think we should have a test for that (disabling case) |
Lgtm!! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also OK with this approach.
Dask default's to a disk-based shuffle even thought we're using a distributed scheduler, which appears to be resulting in dropped data since the filesystem isn't shared across nodes. Dask Distributed manually sets the shuffle algorithm in the global config to the task-based shuffle, which the Dask-on-Ray scheduler should probably do as well.
This PR adds a Dask config helper,
enable_dask_on_ray
, that sets Dask-on-Ray as the default scheduler along with changing the default shuffle to a task-based shuffle. The shuffle method can still be overridden by the user by manually specifyingdf.set_index(shuffle="disk")
.Related issue number
Closes #20108, closes #20992, closed #14048
Checks
scripts/format.sh
to lint the changes in this PR.