diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index 7c3e420abfd3..f5ceb3aa4c8e 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -54,6 +54,11 @@ from ray.serve.controller import ServeController from ray.serve.deployment import Deployment from ray.serve.exceptions import RayServeException +from ray.serve.generated.serve_pb2 import ( + DeploymentRoute, + DeploymentRouteList, + DeploymentStatusInfoList, +) from ray.experimental.dag import DAGNode from ray.serve.handle import RayServeHandle, RayServeSyncHandle from ray.serve.http_util import ASGIHTTPSender, make_fastapi_class_based_view @@ -283,7 +288,7 @@ def _wait_for_deployments_shutdown(self, timeout_s: int = 60): """ start = time.time() while time.time() - start < timeout_s: - statuses = ray.get(self._controller.get_deployment_statuses.remote()) + statuses = self.get_deployment_statuses() if len(statuses) == 0: break else: @@ -308,7 +313,7 @@ def _wait_for_deployment_healthy(self, name: str, timeout_s: int = -1): """ start = time.time() while time.time() - start < timeout_s or timeout_s < 0: - statuses = ray.get(self._controller.get_deployment_statuses.remote()) + statuses = self.get_deployment_statuses() try: status = statuses[name] except KeyError: @@ -341,7 +346,7 @@ def _wait_for_deployment_deleted(self, name: str, timeout_s: int = 60): """ start = time.time() while time.time() - start < timeout_s: - statuses = ray.get(self._controller.get_deployment_statuses.remote()) + statuses = self.get_deployment_statuses() if name not in statuses: break else: @@ -435,15 +440,38 @@ def delete_deployments(self, names: Iterable[str], blocking: bool = True) -> Non @_ensure_connected def get_deployment_info(self, name: str) -> Tuple[DeploymentInfo, str]: - return ray.get(self._controller.get_deployment_info.remote(name)) + deployment_route = DeploymentRoute.FromString( + ray.get(self._controller.get_deployment_info.remote(name)) + ) + return ( + DeploymentInfo.from_proto(deployment_route.deployment_info), + deployment_route.route if deployment_route.route != "" else None, + ) @_ensure_connected def list_deployments(self) -> Dict[str, Tuple[DeploymentInfo, str]]: - return ray.get(self._controller.list_deployments.remote()) + deployment_route_list = DeploymentRouteList.FromString( + ray.get(self._controller.list_deployments.remote()) + ) + return { + deployment_route.deployment_info.name: ( + DeploymentInfo.from_proto(deployment_route.deployment_info), + deployment_route.route if deployment_route.route != "" else None, + ) + for deployment_route in deployment_route_list.deployment_routes + } @_ensure_connected def get_deployment_statuses(self) -> Dict[str, DeploymentStatusInfo]: - return ray.get(self._controller.get_deployment_statuses.remote()) + proto = DeploymentStatusInfoList.FromString( + ray.get(self._controller.get_deployment_statuses.remote()) + ) + return { + deployment_status_info.name: DeploymentStatusInfo.from_proto( + deployment_status_info + ) + for deployment_status_info in proto.deployment_status_infos + } @_ensure_connected def get_handle( @@ -582,6 +610,9 @@ def get_deploy_args( else: raise TypeError("config must be a DeploymentConfig or a dictionary.") + deployment_config.version = version + deployment_config.prev_version = prev_version + if ( deployment_config.autoscaling_config is not None and deployment_config.max_concurrent_queries @@ -596,9 +627,7 @@ def get_deploy_args( controller_deploy_args = { "name": name, "deployment_config_proto_bytes": deployment_config.to_proto_bytes(), - "replica_config": replica_config, - "version": version, - "prev_version": prev_version, + "replica_config_proto_bytes": replica_config.to_proto_bytes(), "route_prefix": route_prefix, "deployer_job_id": ray.get_runtime_context().job_id, } diff --git a/python/ray/serve/common.py b/python/ray/serve/common.py index 49efa5fbe88e..e63469d31326 100644 --- a/python/ray/serve/common.py +++ b/python/ray/serve/common.py @@ -6,6 +6,12 @@ from ray.actor import ActorHandle from ray.serve.config import DeploymentConfig, ReplicaConfig from ray.serve.autoscaling_policy import AutoscalingPolicy +from ray.serve.generated.serve_pb2 import ( + DeploymentInfo as DeploymentInfoProto, + DeploymentStatusInfo as DeploymentStatusInfoProto, + DeploymentStatus as DeploymentStatusProto, + DeploymentLanguage, +) EndpointTag = str ReplicaTag = str @@ -29,6 +35,16 @@ class DeploymentStatusInfo: status: DeploymentStatus message: str = "" + def to_proto(self): + return DeploymentStatusInfoProto(status=self.status, message=self.message) + + @classmethod + def from_proto(cls, proto: DeploymentStatusInfoProto): + return cls( + status=DeploymentStatus(DeploymentStatusProto.Name(proto.status)), + message=proto.message, + ) + class DeploymentInfo: def __init__( @@ -95,6 +111,47 @@ def actor_def(self): return self._cached_actor_def + @classmethod + def from_proto(cls, proto: DeploymentInfoProto): + deployment_config = ( + DeploymentConfig.from_proto(proto.deployment_config) + if proto.deployment_config + else None + ) + data = { + "deployment_config": deployment_config, + "replica_config": ReplicaConfig.from_proto( + proto.replica_config, + deployment_config.deployment_language + if deployment_config + else DeploymentLanguage.PYTHON, + ), + "start_time_ms": proto.start_time_ms, + "actor_name": proto.actor_name if proto.actor_name != "" else None, + "serialized_deployment_def": proto.serialized_deployment_def + if proto.serialized_deployment_def != b"" + else None, + "version": proto.version if proto.version != "" else None, + "end_time_ms": proto.end_time_ms if proto.end_time_ms != 0 else None, + "deployer_job_id": ray.get_runtime_context().job_id, + } + + return cls(**data) + + def to_proto(self): + data = { + "start_time_ms": self.start_time_ms, + "actor_name": self.actor_name, + "serialized_deployment_def": self.serialized_deployment_def, + "version": self.version, + "end_time_ms": self.end_time_ms, + } + if self.deployment_config: + data["deployment_config"] = self.deployment_config.to_proto() + if self.replica_config: + data["replica_config"] = self.replica_config.to_proto() + return DeploymentInfoProto(**data) + @dataclass class ReplicaName: diff --git a/python/ray/serve/config.py b/python/ray/serve/config.py index e976d82f66c1..97db3bfc4473 100644 --- a/python/ray/serve/config.py +++ b/python/ray/serve/config.py @@ -1,4 +1,5 @@ import inspect +import json import pickle from enum import Enum from typing import Any, Callable, Dict, List, Optional, Tuple, Union @@ -27,7 +28,9 @@ DeploymentConfig as DeploymentConfigProto, DeploymentLanguage, AutoscalingConfig as AutoscalingConfigProto, + ReplicaConfig as ReplicaConfigProto, ) +from ray.serve.utils import ServeEncoder class AutoscalingConfig(BaseModel): @@ -129,6 +132,9 @@ class DeploymentConfig(BaseModel): # the deploymnent use. deployment_language: Any = DeploymentLanguage.PYTHON + version: Optional[str] = None + prev_version: Optional[str] = None + class Config: validate_assignment = True extra = "forbid" @@ -144,7 +150,7 @@ def set_max_queries_by_mode(cls, v, values): # noqa 805 raise ValueError("max_concurrent_queries must be >= 0") return v - def to_proto_bytes(self): + def to_proto(self): data = self.dict() if data.get("user_config"): data["user_config"] = pickle.dumps(data["user_config"]) @@ -152,11 +158,13 @@ def to_proto_bytes(self): data["autoscaling_config"] = AutoscalingConfigProto( **data["autoscaling_config"] ) - return DeploymentConfigProto(**data).SerializeToString() + return DeploymentConfigProto(**data) + + def to_proto_bytes(self): + return self.to_proto().SerializeToString() @classmethod - def from_proto_bytes(cls, proto_bytes: bytes): - proto = DeploymentConfigProto.FromString(proto_bytes) + def from_proto(cls, proto: DeploymentConfigProto): data = MessageToDict( proto, including_default_value_fields=True, @@ -170,9 +178,19 @@ def from_proto_bytes(cls, proto_bytes: bytes): data["user_config"] = None if "autoscaling_config" in data: data["autoscaling_config"] = AutoscalingConfig(**data["autoscaling_config"]) - + if "prev_version" in data: + if data["prev_version"] == "": + data["prev_version"] = None + if "version" in data: + if data["version"] == "": + data["version"] = None return cls(**data) + @classmethod + def from_proto_bytes(cls, proto_bytes: bytes): + proto = DeploymentConfigProto.FromString(proto_bytes) + return cls.from_proto(proto) + class ReplicaConfig: def __init__( @@ -286,6 +304,54 @@ def _validate(self): raise TypeError("resources in ray_actor_options must be a dictionary.") self.resource_dict.update(custom_resources) + @classmethod + def from_proto( + cls, proto: ReplicaConfigProto, deployment_language: DeploymentLanguage + ): + deployment_def = None + if proto.serialized_deployment_def != b"": + if deployment_language == DeploymentLanguage.PYTHON: + deployment_def = cloudpickle.loads(proto.serialized_deployment_def) + else: + # TODO use messagepack + deployment_def = cloudpickle.loads(proto.serialized_deployment_def) + + init_args = pickle.loads(proto.init_args) if proto.init_args != b"" else None + init_kwargs = ( + pickle.loads(proto.init_kwargs) if proto.init_kwargs != b"" else None + ) + ray_actor_options = ( + json.loads(proto.ray_actor_options) + if proto.ray_actor_options != "" + else None + ) + + return ReplicaConfig(deployment_def, init_args, init_kwargs, ray_actor_options) + + @classmethod + def from_proto_bytes( + cls, proto_bytes: bytes, deployment_language: DeploymentLanguage + ): + proto = ReplicaConfigProto.FromString(proto_bytes) + return cls.from_proto(proto, deployment_language) + + def to_proto(self): + data = { + "serialized_deployment_def": self.serialized_deployment_def, + } + if self.init_args: + data["init_args"] = pickle.dumps(self.init_args) + if self.init_kwargs: + data["init_kwargs"] = pickle.dumps(self.init_kwargs) + if self.ray_actor_options: + data["ray_actor_options"] = json.dumps( + self.ray_actor_options, cls=ServeEncoder + ) + return ReplicaConfigProto(**data) + + def to_proto_bytes(self): + return self.to_proto().SerializeToString() + class DeploymentMode(str, Enum): NoServer = "NoServer" diff --git a/python/ray/serve/controller.py b/python/ray/serve/controller.py index d7e0dbdab364..14588bf81801 100644 --- a/python/ray/serve/controller.py +++ b/python/ray/serve/controller.py @@ -12,7 +12,6 @@ from ray.serve.deployment_state import ReplicaState, DeploymentStateManager from ray.serve.common import ( DeploymentInfo, - DeploymentStatusInfo, EndpointTag, EndpointInfo, NodeId, @@ -151,12 +150,21 @@ def get_http_proxies(self) -> Dict[NodeId, ActorHandle]: """Returns a dictionary of node ID to http_proxy actor handles.""" return self.http_state.get_http_proxy_handles() + def get_http_proxy_names(self) -> bytes: + """Returns the http_proxy actor name list serialized by protobuf.""" + from ray.serve.generated.serve_pb2 import ActorNameList + + actor_name_list = ActorNameList( + names=self.http_state.get_http_proxy_names().values() + ) + return actor_name_list.SerializeToString() + def autoscale(self) -> None: """Updates autoscaling deployments with calculated num_replicas.""" for deployment_name, ( deployment_info, route_prefix, - ) in self.list_deployments().items(): + ) in self.list_deployments_internal().items(): deployment_config = deployment_info.deployment_config autoscaling_policy = deployment_info.autoscaling_policy @@ -223,9 +231,10 @@ async def run_control_loop(self) -> None: def _put_serve_snapshot(self) -> None: val = dict() - for deployment_name, (deployment_info, route_prefix) in self.list_deployments( - include_deleted=True - ).items(): + for deployment_name, ( + deployment_info, + route_prefix, + ) in self.list_deployments_internal(include_deleted=True).items(): entry = dict() entry["name"] = deployment_name entry["namespace"] = ray.get_runtime_context().namespace @@ -295,9 +304,7 @@ def deploy( self, name: str, deployment_config_proto_bytes: bytes, - replica_config: ReplicaConfig, - version: Optional[str], - prev_version: Optional[str], + replica_config_proto_bytes: bytes, route_prefix: Optional[str], deployer_job_id: "ray._raylet.JobID", ) -> bool: @@ -307,6 +314,11 @@ def deploy( deployment_config = DeploymentConfig.from_proto_bytes( deployment_config_proto_bytes ) + version = deployment_config.version + prev_version = deployment_config.prev_version + replica_config = ReplicaConfig.from_proto_bytes( + replica_config_proto_bytes, deployment_config.deployment_language + ) if prev_version is not None: existing_deployment_info = self.deployment_state_manager.get_deployment( @@ -375,14 +387,14 @@ def delete_deployments(self, names: Iterable[str]) -> None: for name in names: self.delete_deployment(name) - def get_deployment_info(self, name: str) -> Tuple[DeploymentInfo, str]: + def get_deployment_info(self, name: str) -> bytes: """Get the current information about a deployment. Args: name(str): the name of the deployment. Returns: - (DeploymentInfo, route) + DeploymentRoute's protobuf serialized bytes Raises: KeyError if the deployment doesn't exist. @@ -393,9 +405,14 @@ def get_deployment_info(self, name: str) -> Tuple[DeploymentInfo, str]: route = self.endpoint_state.get_endpoint_route(name) - return deployment_info, route + from ray.serve.generated.serve_pb2 import DeploymentRoute - def list_deployments( + deployment_route = DeploymentRoute( + deployment_info=deployment_info.to_proto(), route=route + ) + return deployment_route.SerializeToString() + + def list_deployments_internal( self, include_deleted: Optional[bool] = False ) -> Dict[str, Tuple[DeploymentInfo, str]]: """Gets the current information about all deployments. @@ -422,5 +439,48 @@ def list_deployments( ) } - def get_deployment_statuses(self) -> Dict[str, DeploymentStatusInfo]: - return self.deployment_state_manager.get_deployment_statuses() + def list_deployments(self, include_deleted: Optional[bool] = False) -> bytes: + """Gets the current information about all deployments. + + Args: + include_deleted(bool): Whether to include information about + deployments that have been deleted. + + Returns: + DeploymentRouteList's protobuf serialized bytes + """ + from ray.serve.generated.serve_pb2 import DeploymentRouteList, DeploymentRoute + + deployment_route_list = DeploymentRouteList() + for deployment_name, ( + deployment_info, + route_prefix, + ) in self.list_deployments_internal(include_deleted=include_deleted).items(): + deployment_info_proto = deployment_info.to_proto() + deployment_info_proto.name = deployment_name + deployment_route_list.deployment_routes.append( + DeploymentRoute( + deployment_info=deployment_info_proto, route=route_prefix + ) + ) + return deployment_route_list.SerializeToString() + + def get_deployment_statuses(self) -> bytes: + """Gets the current status information about all deployments. + + Returns: + DeploymentStatusInfoList's protobuf serialized bytes + """ + from ray.serve.generated.serve_pb2 import DeploymentStatusInfoList + + deployment_status_info_list = DeploymentStatusInfoList() + for ( + name, + deployment_status_info, + ) in self.deployment_state_manager.get_deployment_statuses().items(): + deployment_status_info_proto = deployment_status_info.to_proto() + deployment_status_info_proto.name = name + deployment_status_info_list.deployment_status_infos.append( + deployment_status_info_proto + ) + return deployment_status_info_list.SerializeToString() diff --git a/python/ray/serve/http_state.py b/python/ray/serve/http_state.py index 233504743922..8ac8cf715d3f 100644 --- a/python/ray/serve/http_state.py +++ b/python/ray/serve/http_state.py @@ -40,6 +40,7 @@ def __init__( self._config = config self._override_controller_namespace = _override_controller_namespace self._proxy_actors: Dict[NodeId, ActorHandle] = dict() + self._proxy_actor_names: Dict[NodeId, str] = dict() # Will populate self.proxy_actors with existing actors. if _start_proxies_on_init: @@ -55,6 +56,9 @@ def get_config(self): def get_http_proxy_handles(self) -> Dict[NodeId, ActorHandle]: return self._proxy_actors + def get_http_proxy_names(self) -> Dict[NodeId, str]: + return self._proxy_actor_names + def update(self): self._start_proxies_if_needed() self._stop_proxies_if_needed() @@ -128,6 +132,7 @@ def _start_proxies_if_needed(self) -> None: ) self._proxy_actors[node_id] = proxy + self._proxy_actor_names[node_id] = name def _stop_proxies_if_needed(self) -> bool: """Removes proxy actors from any nodes that no longer exist.""" @@ -140,6 +145,7 @@ def _stop_proxies_if_needed(self) -> bool: for node_id in to_stop: proxy = self._proxy_actors.pop(node_id) + del self._proxy_actor_names[node_id] ray.kill(proxy, no_restart=True) async def ensure_http_route_exists(self, endpoint: EndpointTag, timeout_s: float): diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index 22dfe667fe17..09f563ffdeef 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -11,6 +11,7 @@ BasicAutoscalingPolicy, calculate_desired_num_replicas, ) +from ray.serve.common import DeploymentInfo from ray.serve.deployment_state import ReplicaState from ray.serve.config import AutoscalingConfig from ray.serve.constants import CONTROL_LOOP_PERIOD_S @@ -19,6 +20,7 @@ import ray from ray import serve +from ray.serve.generated.serve_pb2 import DeploymentRouteList class TestCalculateDesiredNumReplicas: @@ -154,7 +156,16 @@ def test_assert_no_replicas_deprovisioned(): def get_deployment_start_time(controller: ServeController, deployment: Deployment): """Return start time for given deployment""" - deployments = ray.get(controller.list_deployments.remote()) + deployment_route_list = DeploymentRouteList.FromString( + ray.get(controller.list_deployments.remote()) + ) + deployments = { + deployment_route.deployment_info.name: ( + DeploymentInfo.from_proto(deployment_route.deployment_info), + deployment_route.route if deployment_route.route != "" else None, + ) + for deployment_route in deployment_route_list.deployment_routes + } deployment_info, _route_prefix = deployments[deployment.name] return deployment_info.start_time_ms diff --git a/python/ray/serve/tests/test_controller.py b/python/ray/serve/tests/test_controller.py index b34a5917e8d0..eb46b7d7a1a8 100644 --- a/python/ray/serve/tests/test_controller.py +++ b/python/ray/serve/tests/test_controller.py @@ -3,6 +3,8 @@ import ray from ray import serve +from ray.serve.common import DeploymentInfo +from ray.serve.generated.serve_pb2 import DeploymentRoute def test_redeploy_start_time(serve_instance): @@ -15,7 +17,10 @@ def test(_): return "1" test.deploy() - deployment_info_1, route_1 = ray.get(controller.get_deployment_info.remote("test")) + deployment_route = DeploymentRoute.FromString( + ray.get(controller.get_deployment_info.remote("test")) + ) + deployment_info_1 = DeploymentInfo.from_proto(deployment_route.deployment_info) start_time_ms_1 = deployment_info_1.start_time_ms time.sleep(0.1) @@ -25,7 +30,10 @@ def test(_): return "2" test.deploy() - deployment_info_2, route_2 = ray.get(controller.get_deployment_info.remote("test")) + deployment_route = DeploymentRoute.FromString( + ray.get(controller.get_deployment_info.remote("test")) + ) + deployment_info_2 = DeploymentInfo.from_proto(deployment_route.deployment_info) start_time_ms_2 = deployment_info_2.start_time_ms assert start_time_ms_1 == start_time_ms_2 diff --git a/python/ray/serve/tests/test_cross_language.py b/python/ray/serve/tests/test_cross_language.py index 6d85f6d07f46..8a14b173930d 100644 --- a/python/ray/serve/tests/test_cross_language.py +++ b/python/ray/serve/tests/test_cross_language.py @@ -33,9 +33,7 @@ def test_controller_starts_java_replica(shutdown_only): # noqa: F811 controller.deploy.remote( name=deployment_name, deployment_config_proto_bytes=config.to_proto_bytes(), - replica_config=replica_config, - version=None, - prev_version=None, + replica_config_proto_bytes=replica_config.to_proto_bytes(), route_prefix=None, deployer_job_id=ray.get_runtime_context().job_id, ) diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index 2a8ecaea3ef5..96f1efd820ac 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -18,6 +18,7 @@ from ray.cluster_utils import Cluster, cluster_not_supported from ray.serve.constants import SERVE_ROOT_URL_ENV_KEY, SERVE_PROXY_NAME from ray.serve.exceptions import RayServeException +from ray.serve.generated.serve_pb2 import ActorNameList from ray.serve.utils import block_until_http_ready, get_all_node_ids, format_actor_name from ray.serve.config import HTTPOptions from ray.serve.api import internal_get_global_client @@ -447,6 +448,10 @@ def test_fixed_number_proxies(ray_cluster): node_to_http_actors = ray.get(controller_handle.get_http_proxies.remote()) assert len(node_to_http_actors) == 2 + proxy_names_bytes = ray.get(controller_handle.get_http_proxy_names.remote()) + proxy_names = ActorNameList.FromString(proxy_names_bytes) + assert len(proxy_names.names) == 2 + serve.shutdown() ray.shutdown() cluster.shutdown() diff --git a/src/ray/protobuf/serve.proto b/src/ray/protobuf/serve.proto index d7e800e4f558..843effae6ea1 100644 --- a/src/ray/protobuf/serve.proto +++ b/src/ray/protobuf/serve.proto @@ -86,6 +86,10 @@ message DeploymentConfig { // The deployment's autoscaling configuration. AutoscalingConfig autoscaling_config = 10; + + string version = 11; + + string prev_version = 12; } // Deployment language. @@ -131,6 +135,15 @@ message EndpointSet { map endpoints = 1; } +// Now Actor handle can be transfered across language through ray call, but the list of +// Actor handles can't. So we use this message wrapped a Actor name list to pass actor +// list across language. When Actor handle list supports across language, this message can +// be replaced. +message ActorNameList { + repeated string names = 1; +} + +// It is deprecated. Use ActorNameList instead. message ActorSet { repeated string names = 1; } @@ -139,3 +152,49 @@ message DeploymentVersion { string code_version = 1; bytes user_config = 2; } + +message ReplicaConfig { + bytes serialized_deployment_def = 1; + bytes init_args = 2; + bytes init_kwargs = 3; + string ray_actor_options = 4; +} + +message DeploymentInfo { + string name = 1; + DeploymentConfig deployment_config = 2; + ReplicaConfig replica_config = 3; + int64 start_time_ms = 4; + string actor_name = 5; + bytes serialized_deployment_def = 6; + string version = 7; + int64 end_time_ms = 8; +} + +// Wrap DeploymentInfo and route. The "" route value need to be convert to None/null. +message DeploymentRoute { + DeploymentInfo deployment_info = 1; + string route = 2; +} + +// Wrap a list for DeploymentRoute. +message DeploymentRouteList { + repeated DeploymentRoute deployment_routes = 1; +} + +enum DeploymentStatus { + UPDATING = 0; + HEALTHY = 1; + UNHEALTHY = 2; +} + +message DeploymentStatusInfo { + string name = 1; + DeploymentStatus status = 2; + string message = 3; +} + +// Wrap a list for DeploymentStatusInfo. +message DeploymentStatusInfoList { + repeated DeploymentStatusInfo deployment_status_infos = 1; +}