Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add gRPC streaming support by moving to the async callback/reactor API. #15279

Closed

Conversation

clarkzinzow
Copy link
Contributor

@clarkzinzow clarkzinzow commented Apr 13, 2021

Why are these changes needed?

Our current gRPC layer only supports unary RPCs, while some of our current communication patterns, such as object location subscriptions, actor method calls, and within-lease task submissions, and future communication patterns, such as a non-Redis GCS-based pub/sub, are best suited to streaming RPCs. Streaming RPCs take full advantage of HTTP/2, streaming requests and/or responses within an existing HTTP/2 request/response, ridding ourselves of the overhead of e.g. creating a new HTTP/2 request for each unary call. Moreover, we have ordering guarantees within a single RPC, which saves us from RPC reordering issues that we may encounter when using unary RPCs that would normally have to be handled in the application-level protocol.

The current async gRPC layer is implemented using the completion queue API, where both the client and the server implement RPC call abstractions that transition through a state machine representing stages of a unary RPC call. Although we could attempt to extend this completion queue state machine to support streaming RPCs, this PR adds support for gRPC streaming by creating a new gRPC layer that uses the experimental (but soon to be stable) async callback/reactor API. This provides a gRPC-native callback API for unary RPCs, and a reactor API for each of the streaming RPCs. Although there isn't a callback API for the server-side of unary RPCs, we wrap the unary reactor API with a callback API ourselves, so the common unary RPC case can be entirely implemented with callbacks at the application-level, and is defined via macros such as:

UNARY_CALLBACK_RPC_CLIENT_METHOD(NodeManagerService, RequestResourceReport, grpc_client_, )

on the client and

UNARY_CALLBACK_RPC_SERVICE_HANDLER(NodeManagerService, RequestResourceReport)

on the server, maintaining application-level API compatibility with the completion queue gRPC layer for unary RPCs. This allows us to gradually port each gRPC client and server to the new callback-based layer by only updating the gRPC client and server (i.e. macro) definitions, with no application-level changes necessary. Also note that this is entirely a layer on top of gRPC without any observable transport-level effects, meaning that the client and server for a given gRPC service can be ported independently. Beyond being simpler to implement than extending the completion queue state machine, this should eliminate a lot of technical debt in our current gRPC layer and allow us to take advantage of upstream work on this callback/reactor API (which is active!) instead of maintaining and extending our own completion queue --> callback layer.

Note that this PR (currently) ports the raylet gRPC client and server to the callback API. This is primarily to demonstrate the API and that there are no performance regressions. If we wish, we could merge this PR without porting any of the gRPC services to the callback API and wait until we think the layer is stable (either gating behind a feature flag or removing the port entirely for now).

Streaming Reactor API Design

See the gRFC for the async callback API design and see the de-experimentalization PR for examples. Both the client and server macros provide a thin layer that essentially exposes these APIs to the application layer. An end-to-end streaming RPC example will be implemented in a future PR.

Risks

  1. The nexting threadpool that's used to process the underlying gRPC completion queue is configured and owned by the C++ gRPC library, and doesn't provide any hooks to e.g. configure how many threads are used (currently set to num_cores / 2, clamped to the range [2, 16]) or how many completion queues to use (currently set to 1). Compared to the completion queue implementation, where we can control both the number of completion queues and the number of nexting threads, this puts us in quite an opinionated box that could result in performance regressions in some cases (e.g., taking valuable CPU resources from the core worker). However, preliminary benchmarking (with the raylet using the new gRPC layer) has actually shown performance improvements over our existing CQ-based gRPC layer. This is possibly due to the num_cores / 2 nexting threads being a much more aggressive gRPC-level resource allocation than what is done when using the completion queue layer.
  2. Directly using the reactor APIs for streaming RPCs can introduce a good bit of transport-level complexity to the application layer. For this reason, I believe that we should experiment with exposing a user-friendly reader/writer API implemented on top of reactors, where the client-side API takes a callback (or exposes a reader interface) to handle responses from the server and exposes a writer interface to push new requests onto the stream, and the server-side API takes a callback (or exposes a reader interface) to handle requests from the client and exposes a writer interface to push new responses onto the stream; any requirements for statefulness in the callbacks can be achieved by the callbacks closing over stateful objects and using said objects in their request/response processing. We've added an experimental client-side response handler callback + request writer API for client-side streaming RPCs (currently unused) to demonstrate this idea, see here. If it proves difficult to provide generic callback or reader/writer APIs on top of the streaming reactors, we can isolate the reactors in shared application-level abstractions whose APIs are transport-agnostic (e.g. a pub/sub abstraction) or within application-level clients (e.g. the RayletClient or GcsClient) that would handle the reactors themselves and expose transport-agnostic APIs.
  3. Unexpected edge cases where the callback/reactor API fails due to a bug internal to gRPC. I think that the risk of this is pretty low; although it's technically still an experimental API, it has been used internally at Google, in production, for at least 2 years.

Other Niceties

  • The client and server contexts are now exposed to the application layer, which means that application code will be able to tell when an RPC has been cancelled, will be able to issue out-of-band client-side and in-band server-side RPC cancellations, and will be able to use hooks for adding custom metadata to RPCs and getting RPC performance metrics.

TODOs

  • Add support for method timeout.
  • Get soft validation of design.
  • Update callback client to use new handle-based RPC asio stats collection pattern.
  • Harden handling of disconnection and shutdown.
  • Audit request and response object lifetimes.
  • Upgrade gRPC to stable (non-release-candidate) version.
  • Update Java gRPC layer to be compatible with the gRPC lib upgrade.
  • Fix failing ASAN test (heap-use-after-free).
  • Fix MacOS setup failure.
  • Fix shutdown segfault occurring in a few Python tests (test_global_gc on Linux, a few tests on MacOS).
  • Fix Java test failure.
  • Fix Windows Python headers missing failure.
  • Fix ASAN use-after-free.
  • Try to gate the callback gRPC layer behind a feature flag, including the porting of all components. This will most likely have to be done within each ported gRPC service's client and server implementation, since the code generation is different for each layer and I don't think that we could wedge it behind a common GrpcClient and GrpcServer interface (could explore this more).
  • Improve docs for adding a callback-based unary RPC.
  • Do further performance regression testing, especially with the core worker and GCS.
  • (Optional) Move raylet gRPC client and server back to CQ-based layer (in case we want to merge this as a WIP implementation of the callback layer and hold off on porting the raylet to it). This isn't necessary if we gate the new layer behind a feature flag, so we'd only do this if the feature flag proves difficult to implement.
  • (Optional) More focused unit testing would be ideal, although we already have pretty good coverage with the e2e Python tests.
  • (Optional) Improve docs for adding a reactor-based streaming RPC (this could, and probably should, wait until we port a unary RPC to a streaming RPC).
  • (Optional) Minimize nontrivial code contained in macros (we should prefer to keep the macros thin).
  • (Future PR) Port existing unary RPC to streaming RPC.
  • (Future PR) Add user-friendly reader/writer APIs on top of client and server streaming reactors.
  • (Future PR) Port rest of gRPC clients and servers to use callback-based layer and remove completion queue layer.

Related issue number

Closes #15219

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

url = "https://github.com/grpc/grpc/archive/4790ab6d97e634a1ede983be393f3bb3c132b2f7.tar.gz",
sha256 = "df83bd8a08975870b8b254c34afbecc94c51a55198e6e3a5aab61d62f40b7274",
# url = "https://github.com/grpc/grpc/archive/53ba4a101e80e1a67d4ec741b7e1aad6ea8d790f.tar.gz",
# url = "https://github.com/grpc/grpc/archive/refs/tags/v1.36.4.tar.gz",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove these alternatives before merging.

@@ -287,7 +287,7 @@ class ClientCallManager {
std::atomic<unsigned int> rr_index_;

/// The gRPC `CompletionQueue` object used to poll events.
std::vector<grpc::CompletionQueue> cqs_;
std::vector<std::unique_ptr<grpc::CompletionQueue>> cqs_;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gRPC completion queues are no longer copyable.

@clarkzinzow clarkzinzow force-pushed the core/feat/grpc-streaming branch 4 times, most recently from c68c6d0 to 2566cfa Compare April 13, 2021 20:41
@rkooo567 rkooo567 self-assigned this Apr 15, 2021
@rkooo567
Copy link
Contributor

rkooo567 commented Apr 15, 2021

Hey @clarkzinzow Do we still need this PR -> #14598 after yours?

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Apr 15, 2021
@clarkzinzow
Copy link
Contributor Author

@rkooo567 That PR will still be necessary until we've ported every gRPC service to use the new gRPC layer, so I think that PR should still be merged.

@rkooo567
Copy link
Contributor

Makes sense. Can you remove the tag once tests are in the passing state?

// capturing `this` since the gRPC-level callback may outlive this
// GrpcCallbackClient instance, while we're more confident that it will not
// outlive the event loop, which should only be destroyed on process exit.
[&io_context = io_context_, response, callback = std::move(callback), ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback is not safe, we need to use shared_form_this to return a shared_ptr of this and capture it in callback to make sure async safe callback.

Copy link
Contributor Author

@clarkzinzow clarkzinzow Apr 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'd agree if we were going to reference a method or member on this client object in the callback, but the only member that we're going to reference is io_context_, which we pass in to the callback by reference explicitly. The lifetime assumption that is made here is that the IO context will outlive the gRPC-internal nexting thread (where this callback is invoked), which I'm less sure about, but that wouldn't be solved by capturing a this shared pointer.

const ClientCallback<Reply> &callback,
std::shared_ptr<grpc::ClientContext> ctx = nullptr,
const std::string call_name = "UNKNOWN_RPC") {
auto reactor = new ClientWriterReactor<GrpcService, Request, Reply>(
Copy link
Contributor

@qicosmos qicosmos Apr 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use shared_ptr maybe better, and no need to delete this after callback, more safer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't go with a shared pointer given that (1) the gRPC stub method expects a raw pointer, (2) the gRPC library guarantees that the reactor will never be used after OnDone() is called. If we were to use a shared pointer for this, (1) would still require us to give gRPC a raw pointer to the reactor which would result in a leaked pointer in the shared pointer's reference counting and would require us to guarantee that the application-level shared pointer to ClientWriterReactor outlives the raw pointer held within the gRPC library (otherwise the gRPC library would do a use-after-free), which seems impossible to guarantee.

Given that (2) gives us clear semantics for when the reactor should be deleted, it should be easier to create the reactor as a raw pointer, and explicitly delete itself at the end of OnDone() (we can document in the reactor API that it shouldn't be used after OnDone() is called).

I know that this is still not ideal, so definitely lmk if you have any ideas on how to make this reactor lifecycle management better!

@clarkzinzow
Copy link
Contributor Author

clarkzinzow commented Jun 22, 2021

FYI, the callback API is no longer considered to be experimental.

@clarkzinzow
Copy link
Contributor Author

clarkzinzow commented Jun 22, 2021

Current CI failures:

  • Java tests: Double free somehow getting hit within the metric tests. Still need to debug this.
  • Python small + large tests: test_job_timestamps is failing with timestamps that differ by a millisecond. I've seen this test be flaky before with off-by-one timestamps, so maybe this PR is making it consistently fail with a slight add in overhead that's making these timestamps separate. Still need to look into this.
  • rllib two_step_game_qmix example: Not sure how to interpret the periodic updates in the test logs, although I did notice that /tmp is being used for the object store instead of /dev/shm, although I doubt that's it. May have to ping someone that knows more about rllib for this one. Note that this test is currently pretty flaky.
  • Dask-on-Ray tests with Ray Client: Known backwards compatibility break that's been fixed in more recent master, should resolve upon another rebase.
  • Windows build: Currently appears to be failing to find Python headers within a gRPC Bazel rule. May be due to the gRPC upgrade, possibly in combination with our grpc-python patch.

@clarkzinzow clarkzinzow force-pushed the core/feat/grpc-streaming branch 7 times, most recently from 216b355 to bb81807 Compare June 25, 2021 21:10
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually looks a lot simpler than I thought, and it definitely makes code smaller!! (which is great!). Here is a couple thoughts;

  1. only porting Unary RPC implementation first (and test it before merging -> revert) -> 2. Port all RPCs to the Unary RPCs in this layer first -> 3. implement streaming. Ideally, we should have some gap between 2~3 to make sure stability is guaranteed. Another possible approach is to modify the macro a little bit to feature flag them. At least for Unary RPCs, it seems to have the same macro definition.

}
auto response = std::make_shared<Reply>();
auto stats_handle = io_context_.RecordStart(call_name);
(stub_->experimental_async()->*method)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this experimental flag is gone now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, once we update gRPC again we'll be able to use the callback/reactor API from the stable namespace.

#include "ray/common/grpc_util.h"
#include "ray/common/ray_config.h"
#include "ray/common/status.h"
#include "ray/rpc/client_call.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are we using from this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're only using the ClientCallback type definition. If we want to remove that dependency, we can duplicate that type definition in this file.

@@ -1,23 +1,63 @@
diff --git third_party/py/python_configure.bzl third_party/py/python_configure.bzl
--- third_party/py/python_configure.bzl
+++ third_party/py/python_configure.bzl
@@ -163,1 +163,1 @@
@@ -177,7 +177,7 @@ def _get_bash_bin(repository_ctx):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tell me why we need to change this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We updated our gRPC dependency, so these patches were no longer applying to the correct lines, i.e. the correct code.

- name = shared_object_name,
+ name = cc_kwargs.pop("name", shared_object_name),
- srcs = [stem + ".cpp"],
+ srcs = [stem + ".cpp"] + cc_kwargs.pop("srcs", []),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense why you added this, but do we need this change in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We updated our gRPC dependency, so these patches were no longer applying to the correct lines, i.e. the correct code.

@clarkzinzow
Copy link
Contributor Author

clarkzinzow commented Aug 9, 2021

It actually looks a lot simpler than I thought, and it definitely makes code smaller!! (which is great!)

Agreed! 😁

Does this use any of existing grpc code? (e.g., client_call.h)? I am asking it because you seem to import the file, but I couldn't find the usage.

It does not use any existing gRPC layer code except for the ClientCallback type definition. If we want to remove that dependency, we can duplicate that type definition in grpc_callback_client.h.

Looks like it is not experimental anymore according to https://github.com/grpc/grpc/pull/25728/files#diff-79410abf9a4d8c0b0cfb8544485bc610c5eadffb1025d1206ab7f0c177e6a635R194, but we still seem to use them? Any reason why we don't use the non-experimental apis?

The non-experimental APIs are the experimental APIs moved out of the experimental namespace; we can update our gRPC layer to use the non-experimental APIs after we update our gRPC dependency (the APIs were de-experimentalized while this PR was open).

Also, I think we can definitely break this down. One thing we can do is;

  1. only porting Unary RPC implementation first (and test it before merging -> revert) -> 2. Port all RPCs to the Unary RPCs in this layer first -> 3. implement streaming. Ideally, we should have some gap between 2~3 to make sure stability is guaranteed. Another possible approach is to modify the macro a little bit to feature flag them. At least for Unary RPCs, it seems to have the same macro definition.

We can definitely break this down, but I should clarify a few things

  1. only porting Unary RPC implementation first (and test it before merging -> revert) -> 2. Port all RPCs to the Unary RPCs in this layer first

Since the old gRPC layer only supports unary RPCs, there are only unary RPCs to port! 😄

Another possible approach is to modify the macro a little bit to feature flag them. At least for Unary RPCs, it seems to have the same macro definition.

The macro could be shared, but then application code won't have access to the ClientContext or the ServerContext, which is a big feature win for the new gRPC layer. We still won't be able to easily feature flag it, since the old gRPC client requires a ClientCallManager while the new gRPC client requires a raw io_context (no client call manager needed), and the old gRPC service doesn't derive from the same base class as the new gRPC service (see node_manager_server.h diff and new CALLBACK_SERVICE macro).

We could do something fancier for feature flagging, but it will probably require modifying the existing gRPC layer, which I was trying to avoid. Shouldn't be that difficult though.

Copy link
Contributor

@richardliaw richardliaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setup changes lgtm as codeowner

@clarkzinzow clarkzinzow force-pushed the core/feat/grpc-streaming branch 2 times, most recently from 1678ec2 to 277ff23 Compare August 20, 2021 21:44
@MissiontoMars
Copy link

Switching to gRPC streaming is really cool! But there are still some corner cases we should concern:

  1. Does gRPC allow the user to set timeout parameters? Including connection timeout and RPC call timeout.
  2. Does reconnection supported or not?
    In Ant, we have encountered the above problems in our production environment. Currently, we use brpc instead of grpc in internal codebase, also consider switching to grpc If there is a solution to these problems.

@clarkzinzow clarkzinzow force-pushed the core/feat/grpc-streaming branch 2 times, most recently from 3d88da0 to 076aa52 Compare September 30, 2021 15:14
@qicosmos
Copy link
Contributor

qicosmos commented Sep 30, 2021

Maybe we could replace the ugly macros with generic templates, i think coroutine will be the ultra improvement of async callbacks, the async code will be much more simple and clean.
i will do some tests with coroutine.

@clarkzinzow
Copy link
Contributor Author

Maybe we could replace the ugly macros with generic templates, i think coroutine will be the ultra improvement of async callbacks, the async code will be much more simple and clean.
i will do some tests with coroutine.

Agreed on both points! I didn't deviate from the macros or the asio event loop model since I'm hoping to keep this PR small, but we're hoping to do both soon.

@bveeramani
Copy link
Member

‼️ ACTION REQUIRED ‼️

We've switched our code formatter from YAPF to Black (see #21311).

To prevent issues with merging your code, here's what you'll need to do:

  1. Install Black
pip install -I black==21.12b0
  1. Format changed files with Black
curl -o format-changed.sh https://gist.githubusercontent.com/bveeramani/42ef0e9e387b755a8a735b084af976f2/raw/7631276790765d555c423b8db2b679fd957b984a/format-changed.sh
chmod +x ./format-changed.sh
./format-changed.sh
rm format-changed.sh
  1. Commit your changes.
git add --all
git commit -m "Format Python code with Black"
  1. Merge master into your branch.
git pull upstream master
  1. Resolve merge conflicts (if necessary).

After running these steps, you'll have the updated format.sh.

@kfstorm
Copy link
Member

kfstorm commented Mar 13, 2022

‼️ ACTION REQUIRED ‼️

We've updated our formatting configuration for C++ code. (see #22725)

This PR includes C++ code change. To prevent issues with merging your code, here's what you'll need to do:

  1. Merge the latest changes from upstream/master branch into your branch.
git pull upstream master
git merge upstream/master
  1. Resolve merge conflicts (if necessary).

After running these steps, you'll have the updated C++ formatting configuration.

  1. Format changed files.
scripts/format.sh
  1. Commit your changes.
git add --all
git commit -m "Format C++ code"

@stale
Copy link

stale bot commented Apr 16, 2022

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.

  • If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@stale stale bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Apr 16, 2022
@fishbone fishbone removed their assignment Jun 30, 2022
@stale stale bot removed the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jun 30, 2022
@stale
Copy link

stale bot commented Jul 30, 2022

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.

  • If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@stale stale bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jul 30, 2022
@stale
Copy link

stale bot commented Aug 13, 2022

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!

1 similar comment
@stale
Copy link

stale bot commented Sep 9, 2022

Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message.

Please feel free to reopen or open a new issue if you'd still like it to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for opening the issue!

@stale stale bot closed this Sep 9, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. stale The issue is stale. It will be closed within 7 days unless there are further conversation
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Core] Add gRPC streaming support.
8 participants