From 1e658a5f9ae26c2c585847fce65a2c2b876db247 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Fri, 17 Mar 2023 13:02:17 -0700 Subject: [PATCH] [serve] Add replica info to metadata rest api (#33292) This adds details about the live replicas for each deployment to be fetched from the new GET endpoint. Sample replica detail from running a test application: ``` replica_id: app2_BasicDriver#KhlXQe state: RUNNING pid: 25853 actor_name: SERVE_REPLICA::app2_BasicDriver#KhlXQe actor_id: 2355af670b023966af79501501000000 node_id: 3631e75fc5312752c54b567ee66491a1e58a0420f0abc5b1c44e70cf node_ip: 192.168.0.141 start_time_s: 1678818083.039281 ``` Details: * `is_allocated` on each replica used to return just the node id for the controller to confirm the replica has been placed on a node and started. Now, it returns a tuple of runtime-context-related info: * `pid` * `actor_id` * `node_id` * `node_ip` * The four fields listed above that are retrieved from the replica actor may be `None` before the actor is actually scheduled, so they are marked optional in the schema. (The rest of the fields are filled in immediately when the replica is created to be tracked in the controller) ``` class ReplicaDetails(BaseModel, extra=Extra.forbid): replica_id: str state: ReplicaState pid: Optional[int] actor_name: str actor_id: Optional[str] node_id: Optional[str] node_ip: Optional[str] start_time_s: float ``` Signed-off-by: Edward Oakes --- .../modules/serve/tests/test_serve_agent.py | 20 +++++-- python/ray/serve/_private/common.py | 8 +++ python/ray/serve/_private/deployment_state.py | 58 ++++++++++++++++--- python/ray/serve/_private/replica.py | 11 +++- python/ray/serve/schema.py | 38 +++++++++++- .../serve/tests/test_autoscaling_policy.py | 2 +- python/ray/serve/tests/test_cluster.py | 3 +- .../test_deployment_graph_autoscaling.py | 2 +- .../ray/serve/tests/test_deployment_state.py | 2 +- 9 files changed, 121 insertions(+), 23 deletions(-) diff --git a/dashboard/modules/serve/tests/test_serve_agent.py b/dashboard/modules/serve/tests/test_serve_agent.py index c11c9eb4f4e5..8a5cc19fdacc 100644 --- a/dashboard/modules/serve/tests/test_serve_agent.py +++ b/dashboard/modules/serve/tests/test_serve_agent.py @@ -13,7 +13,7 @@ from ray.serve._private.constants import SERVE_NAMESPACE from ray.serve.tests.conftest import * # noqa: F401 F403 from ray.serve.schema import ServeInstanceDetails -from ray.serve._private.common import ApplicationStatus, DeploymentStatus +from ray.serve._private.common import ApplicationStatus, DeploymentStatus, ReplicaState GET_OR_PUT_URL = "http://localhost:52365/api/serve/deployments/" STATUS_URL = "http://localhost:52365/api/serve/deployments/status" @@ -547,13 +547,23 @@ def applications_running(): # CHECK: application details for app in ["app1", "app2"]: assert app_details[app].route_prefix == f"/{app}" - for dep_details in app_details[app].deployments.values(): - assert dep_details.status == DeploymentStatus.HEALTHY - + for deployment in app_details[app].deployments.values(): + assert deployment.status == DeploymentStatus.HEALTHY # Route prefix should be app level options eventually - assert "route_prefix" not in dep_details.deployment_config.dict( + assert "route_prefix" not in deployment.deployment_config.dict( exclude_unset=True ) + assert len(deployment.replicas) == deployment.deployment_config.num_replicas + + for replica in deployment.replicas: + assert replica.state == ReplicaState.RUNNING + assert ( + deployment.name in replica.replica_id + and deployment.name in replica.actor_name + ) + assert replica.actor_id and replica.node_id and replica.node_ip + assert replica.start_time_s > app_details[app].last_deployed_time_s + print("Finished checking application details.") diff --git a/python/ray/serve/_private/common.py b/python/ray/serve/_private/common.py index c14493b488d3..c5f239d3a9d1 100644 --- a/python/ray/serve/_private/common.py +++ b/python/ray/serve/_private/common.py @@ -28,6 +28,14 @@ class EndpointInfo: route: str +class ReplicaState(str, Enum): + STARTING = "STARTING" + UPDATING = "UPDATING" + RECOVERING = "RECOVERING" + RUNNING = "RUNNING" + STOPPING = "STOPPING" + + class ApplicationStatus(str, Enum): NOT_STARTED = "NOT_STARTED" DEPLOYING = "DEPLOYING" diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 6b9d4ff012b3..982b990a6687 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -29,9 +29,11 @@ ReplicaName, ReplicaTag, RunningReplicaInfo, + ReplicaState, ) from ray.serve.schema import ( DeploymentDetails, + ReplicaDetails, _deployment_info_to_schema, ) from ray.serve.config import DeploymentConfig @@ -62,14 +64,6 @@ logger = logging.getLogger(SERVE_LOGGER_NAME) -class ReplicaState(Enum): - STARTING = 1 - UPDATING = 2 - RECOVERING = 3 - RUNNING = 4 - STOPPING = 5 - - class ReplicaStartupStatus(Enum): PENDING_ALLOCATION = 1 PENDING_INITIALIZATION = 2 @@ -217,11 +211,14 @@ def __init__( # the non-detached case. self._actor_handle: ActorHandle = None + self._pid: int = None + self._actor_id: str = None if isinstance(scheduling_strategy, NodeAffinitySchedulingStrategy): self._node_id = scheduling_strategy.node_id else: # Populated after replica is allocated. self._node_id: str = None + self._node_ip: str = None # Populated in self.stop(). self._graceful_shutdown_ref: ObjectRef = None @@ -264,11 +261,26 @@ def actor_handle(self) -> Optional[ActorHandle]: def max_concurrent_queries(self) -> int: return self._max_concurrent_queries + @property + def pid(self) -> Optional[int]: + """Returns the pid of the actor, None if not started.""" + return self._pid + + @property + def actor_id(self) -> Optional[str]: + """Returns the actor id, None if not started.""" + return self._actor_id + @property def node_id(self) -> Optional[str]: """Returns the node id of the actor, None if not placed.""" return self._node_id + @property + def node_ip(self) -> Optional[str]: + """Returns the node ip of the actor, None if not placed.""" + return self._node_ip + def _check_obj_ref_ready(self, obj_ref: ObjectRef) -> bool: ready, _ = ray.wait([obj_ref], timeout=0) return len(ready) == 1 @@ -480,7 +492,9 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion] ) self._health_check_period_s = deployment_config.health_check_period_s self._health_check_timeout_s = deployment_config.health_check_timeout_s - self._node_id = ray.get(self._allocated_obj_ref) + self._pid, self._actor_id, self._node_id, self._node_ip = ray.get( + self._allocated_obj_ref + ) except Exception: logger.exception(f"Exception in deployment '{self._deployment_name}'") return ReplicaStartupStatus.FAILED, None @@ -694,6 +708,24 @@ def get_running_replica_info(self) -> RunningReplicaInfo: is_cross_language=self._actor.is_cross_language, ) + def get_replica_details(self, state: ReplicaState) -> ReplicaDetails: + """Get replica details. + + Args: + state: The state of the replica, which is not stored within a + DeploymentReplica object + """ + return ReplicaDetails( + replica_id=self.replica_tag, + state=state, + pid=self._actor.pid, + actor_name=self._actor._actor_name, + actor_id=self._actor.actor_id, + node_id=self._actor.node_id, + node_ip=self._actor.node_ip, + start_time_s=self._start_time, + ) + @property def replica_tag(self) -> ReplicaTag: return self._replica_tag @@ -1082,6 +1114,13 @@ def get_running_replica_infos(self) -> List[RunningReplicaInfo]: for replica in self._replicas.get([ReplicaState.RUNNING]) ] + def list_replica_details(self) -> List[ReplicaDetails]: + return [ + replica.get_replica_details(state) + for state in ReplicaState + for replica in self._replicas.get([state]) + ] + def _notify_running_replicas_changed(self): self._long_poll_host.notify_changed( (LongPollNamespace.RUNNING_REPLICAS, self._name), @@ -2034,6 +2073,7 @@ def get_deployment_details(self, deployment_name: str) -> DeploymentDetails: deployment_config=_deployment_info_to_schema( deployment_name, self.get_deployment(deployment_name) ), + replicas=self._deployment_states[deployment_name].list_replica_details(), ) def get_deployment_statuses( diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index eb645330fdd5..e51fe393d661 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -3,6 +3,7 @@ from importlib import import_module import inspect import logging +import os import pickle import time from typing import Any, Callable, Optional, Tuple, Dict @@ -210,9 +211,15 @@ async def is_allocated(self) -> str: At this time, the replica can transition from PENDING_ALLOCATION to PENDING_INITIALIZATION startup state. - Return the NodeID of this replica + Returns: + The PID, actor ID, node ID, node IP of the replica. """ - return ray.get_runtime_context().get_node_id() + return ( + os.getpid(), + ray.get_runtime_context().get_actor_id(), + ray.get_runtime_context().get_node_id(), + ray.util.get_node_ip_address(), + ) async def is_initialized( self, user_config: Optional[Any] = None, _after: Optional[Any] = None diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index a9372ca0eb51..6df57444c327 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -9,6 +9,7 @@ DeploymentStatus, StatusOverview, DeploymentInfo, + ReplicaState, ) from ray.serve._private.constants import DEPLOYMENT_NAME_PREFIX_SEPARATOR from ray.serve._private.utils import DEFAULT, dict_keys_snake_to_camel_case @@ -546,7 +547,35 @@ def get_empty_schema_dict() -> Dict: @PublicAPI(stability="alpha") -class DeploymentDetails(BaseModel, extra=Extra.forbid): +class ReplicaDetails(BaseModel, extra=Extra.forbid, frozen=True): + replica_id: str = Field( + description=( + "Unique ID for the replica. By default, this will be " + '"#", where the replica suffix is a ' + "randomly generated unique string." + ) + ) + state: ReplicaState = Field(description="Current state of the replica.") + pid: Optional[int] = Field(description="PID of the replica actor process.") + actor_name: str = Field(description="Name of the replica actor.") + actor_id: Optional[str] = Field(description="ID of the replica actor.") + node_id: Optional[str] = Field( + description="ID of the node that the replica actor is running on." + ) + node_ip: Optional[str] = Field( + description="IP address of the node that the replica actor is running on." + ) + start_time_s: float = Field( + description=( + "The time at which the replica actor was started. If the controller dies, " + "this is the time at which the controller recovers and retrieves replica " + "state from the running replica actor." + ) + ) + + +@PublicAPI(stability="alpha") +class DeploymentDetails(BaseModel, extra=Extra.forbid, frozen=True): name: str = Field(description="Deployment name.") status: DeploymentStatus = Field( description="The current status of the deployment." @@ -564,6 +593,9 @@ class DeploymentDetails(BaseModel, extra=Extra.forbid): "options, or Serve default values." ) ) + replicas: List[ReplicaDetails] = Field( + description="Details about the live replicas of this deployment." + ) @validator("deployment_config") def deployment_route_prefix_not_set(cls, v: DeploymentSchema): @@ -580,7 +612,7 @@ def deployment_route_prefix_not_set(cls, v: DeploymentSchema): @PublicAPI(stability="alpha") -class ApplicationDetails(BaseModel, extra=Extra.forbid): +class ApplicationDetails(BaseModel, extra=Extra.forbid, frozen=True): name: str = Field(description="Application name.") route_prefix: Optional[str] = Field( ..., @@ -659,7 +691,7 @@ def get_status_dict(self) -> Dict: @PublicAPI(stability="alpha") -class ServeInstanceDetails(BaseModel, extra=Extra.forbid): +class ServeInstanceDetails(BaseModel, extra=Extra.forbid, frozen=True): host: Optional[str] = Field( description="The host on which the HTTP server is listening for requests." ) diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index 7bfdc8050b9f..0bdcb4b7b75c 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -15,7 +15,7 @@ calculate_desired_num_replicas, ) from ray.serve._private.common import DeploymentInfo -from ray.serve._private.deployment_state import ReplicaState +from ray.serve._private.common import ReplicaState from ray.serve.config import AutoscalingConfig from ray.serve._private.constants import CONTROL_LOOP_PERIOD_S from ray.serve.controller import ServeController diff --git a/python/ray/serve/tests/test_cluster.py b/python/ray/serve/tests/test_cluster.py index bde670fb9ae9..97d8927894d1 100644 --- a/python/ray/serve/tests/test_cluster.py +++ b/python/ray/serve/tests/test_cluster.py @@ -11,7 +11,8 @@ from ray._private.test_utils import SignalActor, wait_for_condition from ray.cluster_utils import Cluster from ray.serve._private.constants import SERVE_NAMESPACE -from ray.serve._private.deployment_state import ReplicaStartupStatus, ReplicaState +from ray.serve._private.deployment_state import ReplicaStartupStatus +from ray.serve._private.common import ReplicaState @pytest.fixture diff --git a/python/ray/serve/tests/test_deployment_graph_autoscaling.py b/python/ray/serve/tests/test_deployment_graph_autoscaling.py index d75a22cd5592..662f25104bdc 100644 --- a/python/ray/serve/tests/test_deployment_graph_autoscaling.py +++ b/python/ray/serve/tests/test_deployment_graph_autoscaling.py @@ -7,7 +7,7 @@ from ray import serve from ray.serve.drivers import DAGDriver from ray.dag.input_node import InputNode -from ray.serve._private.deployment_state import ReplicaState +from ray.serve._private.common import ReplicaState from ray._private.test_utils import SignalActor, wait_for_condition # Magic number to use for speed up scale from 0 replica diff --git a/python/ray/serve/tests/test_deployment_state.py b/python/ray/serve/tests/test_deployment_state.py index fc8af8080d43..9082a74abed6 100644 --- a/python/ray/serve/tests/test_deployment_state.py +++ b/python/ray/serve/tests/test_deployment_state.py @@ -14,6 +14,7 @@ ReplicaConfig, ReplicaTag, ReplicaName, + ReplicaState, ) from ray.serve._private.deployment_state import ( DeploymentState, @@ -22,7 +23,6 @@ DeploymentVersion, DeploymentReplica, ReplicaStartupStatus, - ReplicaState, ReplicaStateContainer, VersionedReplica, rank_replicas_for_stopping,