Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][2/2] Worker resubscribe when GCS failed #24813

Merged
merged 27 commits into from
May 17, 2022

Conversation

fishbone
Copy link
Contributor

@fishbone fishbone commented May 15, 2022

Why are these changes needed?

A follow-up PR from this one: #24628

In the previous PR, it fixed the resubscribing issue for raylet. But there is also core worker which needs to do resubscribing.

There are two ways of doing resubscribe:

  1. When the client-side detects any failure, it'll do resubscribing.
  2. Server side will ask the client to do resubscribing.
  1. is a cleaner and better solution. However, it's a little bit hard due to the following reasons:
  • We are using long-polling, so for some extreme cases, we won't be able to detect the failure. For example, the client-side received the message, but before it sends another request, the server-side restarts, and the client will miss the opportunity of detecting the failure. This could happen if we have a standby GCS that starts very fast and somehow the client-side has a lot of traffic and runs very slow.
  • The current gRPC framework doesn't give the user a way to handle failure which might need some refactoring on this one.

We can go with this way once we have gRPC streaming.

This PR is implementing 2) which includes three parts:

Correctness: whenever when a worker started, it'll register to raylet immediately (sync call) before connecting to GCS. So, we just need to send all restart rpcs to registered workers and it should work because:

  • if the worker just started and hasn't registered with the raylet: it's ok, because the worker hasn't connected with GCS yet, so no need to do resubscribing.
  • if the worker has registered with the rayelt: it's covered by the code path here.

Related issue number

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@fishbone fishbone changed the title [wip][core][1/2] Worker resubscribe when GCS failed [wip][core][2/2] Worker resubscribe when GCS failed May 16, 2022
@fishbone fishbone changed the title [wip][core][2/2] Worker resubscribe when GCS failed [core][2/2] Worker resubscribe when GCS failed May 16, 2022
@fishbone fishbone marked this pull request as ready for review May 16, 2022 18:13
Copy link
Member

@mwtian mwtian left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a plan for Python subscribers?

src/ray/raylet/worker.cc Outdated Show resolved Hide resolved
src/ray/raylet/worker.h Outdated Show resolved Hide resolved
src/ray/raylet/node_manager.cc Outdated Show resolved Hide resolved
@fishbone fishbone linked an issue May 16, 2022 that may be closed by this pull request
2 tasks
@fishbone
Copy link
Contributor Author

Is there a plan for Python subscribers?

There are logging, error, dashboard that is using pubsub and the error is not that critical.
So for python, I plan to do it with the client-side resubscribe in the python layer. I'll get a RFC PR for some feedback.

@fishbone fishbone merged commit 379fa63 into ray-project:master May 17, 2022
maxpumperla pushed a commit that referenced this pull request May 18, 2022
A follow-up PR from this one: #24628

In the previous PR, it fixed the resubscribing issue for raylet. But there is also core worker which needs to do resubscribing.

There are two ways of doing resubscribe:
1. When the client-side detects any failure, it'll do resubscribing.
2. Server side will ask the client to do resubscribing.

1) is a cleaner and better solution. However, it's a little bit hard due to the following reasons:

- We are using long-polling, so for some extreme cases, we won't be able to detect the failure. For example, the client-side received the message, but before it sends another request, the server-side restarts, and the client will miss the opportunity of detecting the failure. This could happen if we have a standby GCS that starts very fast and somehow the client-side has a lot of traffic and runs very slow.
- The current gRPC framework doesn't give the user a way to handle failure which might need some refactoring on this one.

We can go with this way once we have gRPC streaming.

This PR is implementing 2) which includes three parts:
- raylet: (#24628)
- core worker: (this pr)
- python

Correctness: whenever when a worker started, it'll register to raylet immediately (sync call) before connecting to GCS. So, we just need to send all restart rpcs to registered workers and it should work because:
- if the worker just started and hasn't registered with the raylet: it's ok, because the worker hasn't connected with GCS yet, so no need to do resubscribing.
- if the worker has registered with the rayelt: it's covered by the code path here.
fishbone added a commit that referenced this pull request May 23, 2022
This is a follow-up PRs of #24813 and #24628

Unlike the change in cpp layer, where the resubscription is done by GCS broadcast a request to raylet/core_worker and the client-side do the resubscription, in the python layer, we detect the failure in the client-side.

In case of a failure, the protocol is:

1. call subscribe
2. if timeout when doing resubscribe, throw an exception and this will crash the system. This is ok because when GCS has been down for a time longer than expected, we expect the ray cluster to be down.
3. continue to poll once subscribe ok.

However, there is an extreme case where things might be broken: the client might miss detecting a failure.

This could happen if the long-polling has been returned and the python layer is doing its own work. And before it sends another long-polling, GCS restarts and recovered. 

Here we are not going to take care of this case because:
1. usually GCS is going to take several seconds to be up and the python layer's work is simply pushing data into a queue (sync version). For the async version, it's only used in Dashboard which is not a critical component.
2. pubsub in python layer is not doing critical work: it handles logs/errors for ray job;
3. for the dashboard, it can just restart to fix the issue.


A known issue here is that we might miss logs in case of GCS failure due to the following reasons:

- py's pubsub is only doing best effort publishing. If it failed too many times, it'll skip publishing the message (lose messages from producer side)
- if message is pushed to GCS, but the worker hasn't done resubscription yet, the pushed message will be lost (lose messages from consumer side)

We think it's reasonable and valid behavior given that the logs are not defined to be a critical component and we'd like to simplify the design of pubsub in GCS.

Another things is `run_functions_on_all_workers`. We'll plan to stop using it within ray core and deprecate it in the longer term. But it won't cause a problem for the current cases because:

1. It's only set in driver and we don't support creating a new driver when GCS is down.
2. When GCS is down, we don't support starting new ray workers.

And `run_functions_on_all_workers` is only used when we initialize driver/workers.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core] Resubscribe for GCS pubsub from GCS side
3 participants