From 11c8de2ae2ac14a58874dcc9f180667b4537b14f Mon Sep 17 00:00:00 2001 From: Sihan Wang Date: Mon, 17 Oct 2022 09:50:35 -0700 Subject: [PATCH] [Serve][Doc] Direct Ingress documentation (#29149) (#29347) --- doc/source/_toc.yml | 1 + doc/source/serve/direct-ingress.md | 121 +++++++++++++++++ doc/source/serve/doc_code/direct_ingress.py | 89 +++++++++++++ .../direct_ingress_with_customized_schema.py | 32 +++++ doc/source/serve/doc_code/test_service_pb2.py | 85 ++++++++++++ .../serve/doc_code/test_service_pb2_grpc.py | 126 ++++++++++++++++++ doc/source/serve/user-guide.md | 1 + 7 files changed, 455 insertions(+) create mode 100644 doc/source/serve/direct-ingress.md create mode 100644 doc/source/serve/doc_code/direct_ingress.py create mode 100644 doc/source/serve/doc_code/direct_ingress_with_customized_schema.py create mode 100755 doc/source/serve/doc_code/test_service_pb2.py create mode 100755 doc/source/serve/doc_code/test_service_pb2_grpc.py diff --git a/doc/source/_toc.yml b/doc/source/_toc.yml index f2be623476fd..49c413318d58 100644 --- a/doc/source/_toc.yml +++ b/doc/source/_toc.yml @@ -194,6 +194,7 @@ parts: - file: serve/handling-dependencies - file: serve/managing-java-deployments - file: serve/migration + - file: serve/direct-ingress - file: serve/architecture - file: serve/tutorials/index sections: diff --git a/doc/source/serve/direct-ingress.md b/doc/source/serve/direct-ingress.md new file mode 100644 index 000000000000..dca8115e11e4 --- /dev/null +++ b/doc/source/serve/direct-ingress.md @@ -0,0 +1,121 @@ +# Experimental Direct Ingress + +In the 2.1, Serve provides an alpha version of [gRPC](https://grpc.io/) ingress. + +With RPC protocol, You will get: + +* Standardized inference request/response schema during client and serve. +* High performant endpoint than HTTP protocol. + +In this section, you will learn how to + +* use Serve's built-in gRPC schema to receive client traffic +* bring your own gRPC schema into your Serve application + +## Use Serve's Schema + +Serve provides a simple gRPC schema to machine learning inference workload. It is designed to be kept simple, and you are encouraged to adapt it for your own need. +``` +message PredictRequest { + map input = 2; +} + +message PredictResponse { + bytes prediction = 1; +} + +service PredictAPIsService { + rpc Predict(PredictRequest) returns (PredictResponse); +} +``` + +Take a look at the following code samples for using `DefaultgRPCDriver` in Ray Serve. + +To implement the Serve, your class needs to inherit `ray.serve.drivers.DefaultgRPCDriver`. +```{literalinclude} ../serve/doc_code/direct_ingress.py +:start-after: __begin_server__ +:end-before: __end_server__ +:language: python +``` + +Client: +You can use Serve's built-in gRPC client to send query to the model. + +```{literalinclude} ../serve/doc_code/direct_ingress.py +:start-after: __begin_client__ +:end-before: __end_client__ +:language: python +``` + +:::{note} +* `input` is a dictionary of `map ` following the schema described above. +* The user input data needs to be serialized to `bytes` type and fed into the `input`. +* The response will be under `bytes` type, which means the user code is responsible for serializing the output into bytes. +* By default, the gRPC port is 9000. You can change it by passing port number when calling DefaultgRPCDriver bind function. +* If the serialization/deserialization cost is huge and unnecessary, you can also bring your own schema to use! Checkout [Bring your own schema](bring-your-own-schema) section! +* There is no difference of scaling config for your business code in gRPC case, you can set the config scaling/autoscaling config inside the `serve.deployment` decorator. +::: + +### Client schema code generation +You can use the client either by importing it from the `ray` Python package. Alternatively, you can just copy [Serve's protobuf file](https://github.com/ray-project/ray/blob/e16f49b327bbc1c18e8fc5d0ac4fa8c2f1144412/src/ray/protobuf/serve.proto#L214-L225) to generate the gRPC client. + +* Install the gRPC code generation tools +``` +pip install grpcio-tools +``` + +* Generate gRPC code based on the schema +``` +python -m grpc_tools.protoc --proto_path=src/ray/protobuf/ --python_out=. --grpc_python_out=. src/ray/protobuf/serve.proto +``` +After the two steps above, you should have `serve_pb2.py` and `serve_pb2_grpc.py` files generated. + +(bring-your-own-schema)= + +## Bring your own schema + +If you have a customized schema to use, Serve also supports it! + +Assume you have the following customized schema and have generated the corresponding gRPC code: + + +``` +message PingRequest { + bool no_reply = 1; +} +message PingReply { +} + +message PingTimeoutRequest {} +message PingTimeoutReply {} + +service TestService { + rpc Ping(PingRequest) returns (PingReply); + rpc PingTimeout(PingTimeoutRequest) returns (PingTimeoutReply); +} +``` + +After the code is generated, you can implement the business logic for gRPC server by creating a subclass of the generated `TestServiceServicer`, and then you just need two extra steps to adopt your schema into Ray Serve. + +* Inherit `ray.serve.drivers.gRPCIngress` in your implementation class. +* Add the `@serve.deployment(is_driver_deployment=True)` decorator. + +Server: +```{literalinclude} ../serve/doc_code/direct_ingress_with_customized_schema.py +:start-after: __begin_server__ +:end-before: __end_server__ +:language: python +``` + +Client: +You can directly use the client code to play it! +```{literalinclude} ../serve/doc_code/direct_ingress_with_customized_schema.py +:start-after: __begin_client__ +:end-before: __end_client__ +:language: python +``` + +:::{note} +* `is_driver_deployment` (experimental flag) is needed to mark the class as driver, serve will make sure the driver class deployment gets deployed one replica per node. +* `gRPCIngress` is used for starting a gRPC server. Your driver class needs to inherit from it. +::: diff --git a/doc/source/serve/doc_code/direct_ingress.py b/doc/source/serve/doc_code/direct_ingress.py new file mode 100644 index 000000000000..9ed16d73de6d --- /dev/null +++ b/doc/source/serve/doc_code/direct_ingress.py @@ -0,0 +1,89 @@ +# flake8: noqa + +# __begin_server__ +import ray +from ray import serve +from ray.serve.drivers import DefaultgRPCDriver +from ray.serve.handle import RayServeDeploymentHandle +from ray.serve.deployment_graph import InputNode +from typing import Dict +import struct + + +@serve.deployment +class FruitMarket: + def __init__( + self, + orange_stand: RayServeDeploymentHandle, + apple_stand: RayServeDeploymentHandle, + ): + self.directory = { + "ORANGE": orange_stand, + "APPLE": apple_stand, + } + + async def check_price(self, inputs: Dict[str, bytes]) -> float: + costs = 0 + for fruit, amount in inputs.items(): + if fruit not in self.directory: + return + fruit_stand = self.directory[fruit] + ref: ray.ObjectRef = await fruit_stand.remote(int(amount)) + result = await ref + costs += result + return bytearray(struct.pack("f", costs)) + + +@serve.deployment +class OrangeStand: + def __init__(self): + self.price = 2.0 + + def __call__(self, num_oranges: int): + return num_oranges * self.price + + +@serve.deployment +class AppleStand: + def __init__(self): + self.price = 3.0 + + def __call__(self, num_oranges: int): + return num_oranges * self.price + + +with InputNode() as input: + orange_stand = OrangeStand.bind() + apple_stand = AppleStand.bind() + fruit_market = FruitMarket.bind(orange_stand, apple_stand) + my_deployment = DefaultgRPCDriver.bind(fruit_market.check_price.bind(input)) + +serve.run(my_deployment) +# __end_server__ + +# __begin_client__ +import grpc +from ray.serve.generated import serve_pb2, serve_pb2_grpc +import asyncio +import struct + + +async def send_request(): + async with grpc.aio.insecure_channel("localhost:9000") as channel: + stub = serve_pb2_grpc.PredictAPIsServiceStub(channel) + response = await stub.Predict( + serve_pb2.PredictRequest( + input={"ORANGE": bytes("10", "utf-8"), "APPLE": bytes("3", "utf-8")} + ) + ) + return response + + +async def main(): + resp = await send_request() + print(struct.unpack("f", resp.prediction)) + + +asyncio.get_event_loop().run_until_complete(main()) + +# __end_client__ diff --git a/doc/source/serve/doc_code/direct_ingress_with_customized_schema.py b/doc/source/serve/doc_code/direct_ingress_with_customized_schema.py new file mode 100644 index 000000000000..ec2e88615794 --- /dev/null +++ b/doc/source/serve/doc_code/direct_ingress_with_customized_schema.py @@ -0,0 +1,32 @@ +# flake8: noqa + +# __begin_server__ +from ray import serve +from ray.serve.drivers import gRPCIngress +import test_service_pb2_grpc, test_service_pb2 + + +@serve.deployment(is_driver_deployment=True) +class MyDriver(test_service_pb2_grpc.TestServiceServicer, gRPCIngress): + def __init__(self): + super().__init__() + + async def Ping(self, request, context): + # play with your dag and then reply + return test_service_pb2.PingReply() + + +my_deployment = MyDriver.bind() + +serve.run(my_deployment) +# __end_server__ + + +# __begin_client__ +import grpc +import test_service_pb2_grpc, test_service_pb2 + +channel = grpc.aio.insecure_channel("localhost:9000") +stub = test_service_pb2_grpc.TestServiceStub(channel) +response = stub.Ping(test_service_pb2.PingRequest()) +# __end_client__ diff --git a/doc/source/serve/doc_code/test_service_pb2.py b/doc/source/serve/doc_code/test_service_pb2.py new file mode 100755 index 000000000000..bb6e98e81dc9 --- /dev/null +++ b/doc/source/serve/doc_code/test_service_pb2.py @@ -0,0 +1,85 @@ +# flake8: noqa + +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: src/ray/protobuf/test_service.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n#src/ray/protobuf/test_service.proto\x12\x07ray.rpc"(\n\x0bPingRequest\x12\x19\n\x08no_reply\x18\x01 \x01(\x08R\x07noReply"\x0b\n\tPingReply"\x14\n\x12PingTimeoutRequest"\x12\n\x10PingTimeoutReply2\x86\x01\n\x0bTestService\x12\x30\n\x04Ping\x12\x14.ray.rpc.PingRequest\x1a\x12.ray.rpc.PingReply\x12\x45\n\x0bPingTimeout\x12\x1b.ray.rpc.PingTimeoutRequest\x1a\x19.ray.rpc.PingTimeoutReplyb\x06proto3' +) + + +_PINGREQUEST = DESCRIPTOR.message_types_by_name["PingRequest"] +_PINGREPLY = DESCRIPTOR.message_types_by_name["PingReply"] +_PINGTIMEOUTREQUEST = DESCRIPTOR.message_types_by_name["PingTimeoutRequest"] +_PINGTIMEOUTREPLY = DESCRIPTOR.message_types_by_name["PingTimeoutReply"] +PingRequest = _reflection.GeneratedProtocolMessageType( + "PingRequest", + (_message.Message,), + { + "DESCRIPTOR": _PINGREQUEST, + "__module__": "src.ray.protobuf.test_service_pb2" + # @@protoc_insertion_point(class_scope:ray.rpc.PingRequest) + }, +) +_sym_db.RegisterMessage(PingRequest) + +PingReply = _reflection.GeneratedProtocolMessageType( + "PingReply", + (_message.Message,), + { + "DESCRIPTOR": _PINGREPLY, + "__module__": "src.ray.protobuf.test_service_pb2" + # @@protoc_insertion_point(class_scope:ray.rpc.PingReply) + }, +) +_sym_db.RegisterMessage(PingReply) + +PingTimeoutRequest = _reflection.GeneratedProtocolMessageType( + "PingTimeoutRequest", + (_message.Message,), + { + "DESCRIPTOR": _PINGTIMEOUTREQUEST, + "__module__": "src.ray.protobuf.test_service_pb2" + # @@protoc_insertion_point(class_scope:ray.rpc.PingTimeoutRequest) + }, +) +_sym_db.RegisterMessage(PingTimeoutRequest) + +PingTimeoutReply = _reflection.GeneratedProtocolMessageType( + "PingTimeoutReply", + (_message.Message,), + { + "DESCRIPTOR": _PINGTIMEOUTREPLY, + "__module__": "src.ray.protobuf.test_service_pb2" + # @@protoc_insertion_point(class_scope:ray.rpc.PingTimeoutReply) + }, +) +_sym_db.RegisterMessage(PingTimeoutReply) + +_TESTSERVICE = DESCRIPTOR.services_by_name["TestService"] +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + _PINGREQUEST._serialized_start = 48 + _PINGREQUEST._serialized_end = 88 + _PINGREPLY._serialized_start = 90 + _PINGREPLY._serialized_end = 101 + _PINGTIMEOUTREQUEST._serialized_start = 103 + _PINGTIMEOUTREQUEST._serialized_end = 123 + _PINGTIMEOUTREPLY._serialized_start = 125 + _PINGTIMEOUTREPLY._serialized_end = 143 + _TESTSERVICE._serialized_start = 146 + _TESTSERVICE._serialized_end = 280 +# @@protoc_insertion_point(module_scope) diff --git a/doc/source/serve/doc_code/test_service_pb2_grpc.py b/doc/source/serve/doc_code/test_service_pb2_grpc.py new file mode 100755 index 000000000000..73d0d3e7edd8 --- /dev/null +++ b/doc/source/serve/doc_code/test_service_pb2_grpc.py @@ -0,0 +1,126 @@ +# flake8: noqa + +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import test_service_pb2 as src_dot_ray_dot_protobuf_dot_test__service__pb2 + + +class TestServiceStub(object): + """Missing associated documentation comment in .proto file.""" + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.Ping = channel.unary_unary( + "/ray.rpc.TestService/Ping", + request_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.SerializeToString, + response_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.FromString, + ) + self.PingTimeout = channel.unary_unary( + "/ray.rpc.TestService/PingTimeout", + request_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.SerializeToString, + response_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.FromString, + ) + + +class TestServiceServicer(object): + """Missing associated documentation comment in .proto file.""" + + def Ping(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + def PingTimeout(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + + +def add_TestServiceServicer_to_server(servicer, server): + rpc_method_handlers = { + "Ping": grpc.unary_unary_rpc_method_handler( + servicer.Ping, + request_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.FromString, + response_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.SerializeToString, + ), + "PingTimeout": grpc.unary_unary_rpc_method_handler( + servicer.PingTimeout, + request_deserializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.FromString, + response_serializer=src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + "ray.rpc.TestService", rpc_method_handlers + ) + server.add_generic_rpc_handlers((generic_handler,)) + + +# This class is part of an EXPERIMENTAL API. +class TestService(object): + """Missing associated documentation comment in .proto file.""" + + @staticmethod + def Ping( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/ray.rpc.TestService/Ping", + src_dot_ray_dot_protobuf_dot_test__service__pb2.PingRequest.SerializeToString, + src_dot_ray_dot_protobuf_dot_test__service__pb2.PingReply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) + + @staticmethod + def PingTimeout( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/ray.rpc.TestService/PingTimeout", + src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutRequest.SerializeToString, + src_dot_ray_dot_protobuf_dot_test__service__pb2.PingTimeoutReply.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/doc/source/serve/user-guide.md b/doc/source/serve/user-guide.md index 71264b6b677f..48637a7e2354 100644 --- a/doc/source/serve/user-guide.md +++ b/doc/source/serve/user-guide.md @@ -13,3 +13,4 @@ This user guide will help you navigate the Ray Serve project and show you how to - [Handling Dependencies](handling-dependencies) - [Experimental Java API](managing-java-deployments) - [1.x to 2.x API Migration Guide](migration) +- [Experimental gRPC Support](direct-ingress)