Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Streaming generator task waits for all object report acks before finishing the task #44079

Merged
merged 2 commits into from
Mar 18, 2024

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented Mar 18, 2024

Why are these changes needed?

This fixes a bug in handling task (worker process) failures for streaming generator tasks. Streaming generator executors report the values of the dynamic returns to the caller asynchronously during the task execution. If a report is lost, then the caller will not be able to get the value of the yield'ed ObjectRef. Therefore, we need to wait until all in-flight object reports have been acked before we can finish the streaming generator task. Otherwise, it is possible for the caller to think that the task has finished, but then the executor dies before it can report the dynamic return object's value.

Also cleans up some generator_waiter.cc headers, etc.

Related issue number

Closes #43914 and #43798.

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Stephanie Wang <[email protected]>
@stephanie-wang stephanie-wang added release-blocker P0 Issue that blocks the release ray 2.10 labels Mar 18, 2024
Signed-off-by: Stephanie Wang <[email protected]>
@can-anyscale can-anyscale merged commit 82c26af into ray-project:master Mar 18, 2024
4 of 5 checks passed
@stephanie-wang stephanie-wang deleted the fix-43914 branch March 18, 2024 18:26
stephanie-wang added a commit to stephanie-wang/ray that referenced this pull request Mar 18, 2024
…re finishing the task (ray-project#44079)

This fixes a bug in handling task (worker process) failures for streaming generator tasks. Streaming generator executors report the values of the dynamic returns to the caller asynchronously during the task execution. If a report is lost, then the caller will not be able to get the value of the yield'ed ObjectRef. Therefore, we need to wait until all in-flight object reports have been acked before we can finish the streaming generator task. Otherwise, it is possible for the caller to think that the task has finished, but then the executor dies before it can report the dynamic return object's value.

Also cleans up some generator_waiter.cc headers, etc.

Signed-off-by: Stephanie Wang <[email protected]>
can-anyscale pushed a commit that referenced this pull request Mar 18, 2024
…re finishing the task (#44079) (#44092)

This fixes a bug in handling task (worker process) failures for streaming generator tasks. Streaming generator executors report the values of the dynamic returns to the caller asynchronously during the task execution. If a report is lost, then the caller will not be able to get the value of the yield'ed ObjectRef. Therefore, we need to wait until all in-flight object reports have been acked before we can finish the streaming generator task. Otherwise, it is possible for the caller to think that the task has finished, but then the executor dies before it can report the dynamic return object's value.

Also cleans up some generator_waiter.cc headers, etc.

Signed-off-by: Stephanie Wang <[email protected]>
# the yield'ed ObjectRef. Therefore, we must wait for all in-flight object
# reports to complete before finishing the task.
with nogil:
return_status = context.waiter.get().WaitAllObjectsReported()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang can you help me understand why we need this one?

Provided that we apply back-pressure here, which should block until we get a response for ReportGeneratorItemReturns

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand why this is only applicable to sync path and not an async one

stephanie-wang added a commit that referenced this pull request Mar 26, 2024
… before continuing (#44257)

#42260 updated streaming generator tasks to asynchronously report generator returns, instead of synchronously reporting each generator return before yielding the next return. However this has a couple problems:

    If the task still has a reference to the yielded value, it may modify the value. The serialized and reported return will then have a different value than expected.

    As per [core] Streaming generator task waits for all object report acks before finishing the task #44079, we need to track the number of in-flight RPCs to report generator returns, so that we can wait for them all to reply before we return from the end of the task. If we increment the count of in-flight RPCs asynchronously, we can end up returning from the task while there are still in-flight RPCs.

So this PR reverts some of the logic in #42260 to wait for the generator return to be serialized into the protobuf sent back to the caller. Note that we do not wait for the reply (unless under backpressure).

We can later re-introduce asynchronous generator reports, but we will need to evaluate the performance benefit of a new implementation that also addresses both of the above points.

---------

Signed-off-by: Stephanie Wang <[email protected]>
stephanie-wang added a commit to stephanie-wang/ray that referenced this pull request Mar 27, 2024
… before continuing (ray-project#44257)

ray-project#42260 updated streaming generator tasks to asynchronously report generator returns, instead of synchronously reporting each generator return before yielding the next return. However this has a couple problems:

    If the task still has a reference to the yielded value, it may modify the value. The serialized and reported return will then have a different value than expected.

    As per [core] Streaming generator task waits for all object report acks before finishing the task ray-project#44079, we need to track the number of in-flight RPCs to report generator returns, so that we can wait for them all to reply before we return from the end of the task. If we increment the count of in-flight RPCs asynchronously, we can end up returning from the task while there are still in-flight RPCs.

So this PR reverts some of the logic in ray-project#42260 to wait for the generator return to be serialized into the protobuf sent back to the caller. Note that we do not wait for the reply (unless under backpressure).

We can later re-introduce asynchronous generator reports, but we will need to evaluate the performance benefit of a new implementation that also addresses both of the above points.

---------

Signed-off-by: Stephanie Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ray 2.10 release-blocker P0 Issue that blocks the release
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core] streaming generator object doesn't reconstruct correctly
5 participants