-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Support generators for tasks with multiple return values #25247
[core] Support generators for tasks with multiple return values #25247
Conversation
There isn't really an API change since the caller and reader of the task will still be written the same way. However, this is adding support for a new set of programs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! Could we also add a section in the docs on task returns covering this?
Also, should we document / test it for actor method calls too?
@@ -2051,6 +2062,12 @@ cdef class CoreWorker: | |||
contained_id, &task_output_inlined_bytes, | |||
&returns[0][i]) | |||
|
|||
i += 1 | |||
if i < n_returns: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great error messages
for i in range(3): | ||
yield i | ||
|
||
# NOTE: Similar to normal functions, these objects will not be available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit of a footgun and won't be what people will be expecting given the normal behavior of generators. Not a problem for this PR, but to do yield properly, on the receiving side it should return an iterator over ObjectRefs of the result and produce each element as soon as it has been generated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good point! For now, I think we should use the same semantics as normal tasks with multiple returns, especially since the ObjectRefs returned to the caller are a list, not a generator. But if we decide to support the generator interface in the future, I do think we should consider providing the iterator semantics instead.
…urn values (ray-project#25247)" (ray-project#25380)" This reverts commit 80168a0.
…urn values (#25247)" (#25380)" (#25383) Duplicate for #25247. Adds a fix for Dask-on-Ray. Previously, for tasks with multiple return values, we implicitly allowed returning a dict with the return index as the key. This was used by Dask-on-Ray, but this is not documented behavior, and we now require task returns to be iterable instead.
This uses the generators introduced in #25247 to reduce memory usage during the merge stage in push-based shuffle. These tasks merge groups of map outputs, so it fits a generator pattern where we want to return merged outputs one at a time. Verified that this allows for merging more/larger objects at a time than the current list-based version. I also tried this for the map stage in random_shuffle, but it didn't seem to make a difference in memory usage for Arrow blocks. I think this is probably because Arrow is already doing some zero-copy optimizations when selecting rows? Also adds a new line to Dataset stats for memory usage. Unfortunately it's hard to get an accurate reading of physical memory usage in Python and this value will probably be an overestimate in a lot of cases. I didn't see a difference before and after this PR for the merge stage, for example. Arguably this field should be opt-in. For 100MB partitions, for example: ``` Substage 0 read->random_shuffle_map: 10/10 blocks executed * Remote wall time: 1.44s min, 3.32s max, 2.57s mean, 25.74s total * Remote cpu time: 1.42s min, 2.53s max, 2.03s mean, 20.25s total * Worker memory usage (MB): 462 min, 864 max, 552 mean * Output num rows: 12500000 min, 12500000 max, 12500000 mean, 125000000 total * Output size bytes: 101562500 min, 101562500 max, 101562500 mean, 1015625000 total * Tasks per node: 10 min, 10 max, 10 mean; 1 nodes used Substage 1 random_shuffle_reduce: 10/10 blocks executed * Remote wall time: 1.47s min, 2.94s max, 2.17s mean, 21.69s total * Remote cpu time: 1.45s min, 1.88s max, 1.71s mean, 17.09s total * Worker memory usage (MB): 462 min, 1047 max, 831 mean * Output num rows: 12500000 min, 12500000 max, 12500000 mean, 125000000 total * Output size bytes: 101562500 min, 101562500 max, 101562500 mean, 1015625000 total * Tasks per node: 10 min, 10 max, 10 mean; 1 nodes used ``` ## Checks - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Co-authored-by: Eric Liang <[email protected]>
Why are these changes needed?
Adds support for Python generators instead of just normal return functions when a task has multiple return values. This will allow developers to cut down on total memory usage for tasks, as they can free previous return values before allocating the next one on the heap.
The semantics for num_returns are about the same as usual tasks - the function will throw an error if the number of values returned by the generator does not match the number of return values specified by the user. The one difference is that if num_returns=1, the task will throw the usual Python exception that the generator cannot be pickled.
As an example, this feature will allow us to reduce memory usage in Datasets shuffle operations (see #25200 for a prototype).
Checks
scripts/format.sh
to lint the changes in this PR.