-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Yield remaining results from async map_batches
#47696
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Signed-off-by: Scott Lee <[email protected]>
Signed-off-by: Scott Lee <[email protected]>
Signed-off-by: Scott Lee <[email protected]>
scottjlee
commented
Sep 18, 2024
await asyncio.sleep(i % 5) | ||
print("yield", i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove extraneous prints
scottjlee
requested review from
ericl,
scv119,
c21,
amogkam,
bveeramani,
raulchen,
stephanie-wang and
omatthew98
as code owners
September 18, 2024 03:42
raulchen
reviewed
Sep 18, 2024
@@ -358,6 +358,14 @@ async def process_all_batches(): | |||
_validate_batch_output(out_batch) | |||
yield out_batch | |||
|
|||
# Drain the queue to yield any remaining results. | |||
while not output_batch_queue.empty(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just update the above condition to while not future.done() or not output_batch_queue.empty()
?
raulchen
approved these changes
Sep 18, 2024
Signed-off-by: Scott Lee <[email protected]>
ujjawal-khare
pushed a commit
to ujjawal-khare-27/ray
that referenced
this pull request
Oct 15, 2024
…47696) ## Why are these changes needed? When using an async actor with `map_batches()`, there is currently an unhandled edge case, where if tasks are scheduled very closely with one another, and all remaining futures complete at the same time, some remaining items in the internal queue to yield results from the futures will not be yielded. This PR ensures that we fully drain the internal queue to get all expected results. Concretely, this issue came up while using async actors to yield results from vLLM async engine. ## Related issue number ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Scott Lee <[email protected]> Signed-off-by: ujjawal-khare <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Why are these changes needed?
When using an async actor with
map_batches()
, there is currently an unhandled edge case, where if tasks are scheduled very closely with one another, and all remaining futures complete at the same time, some remaining items in the internal queue to yield results from the futures will not be yielded. This PR ensures that we fully drain the internal queue to get all expected results.Concretely, this issue came up while using async actors to yield results from vLLM async engine.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.