diff --git a/dashboard/modules/serve/tests/test_serve_agent.py b/dashboard/modules/serve/tests/test_serve_agent.py index c11c9eb4f4e5..8a5cc19fdacc 100644 --- a/dashboard/modules/serve/tests/test_serve_agent.py +++ b/dashboard/modules/serve/tests/test_serve_agent.py @@ -13,7 +13,7 @@ from ray.serve._private.constants import SERVE_NAMESPACE from ray.serve.tests.conftest import * # noqa: F401 F403 from ray.serve.schema import ServeInstanceDetails -from ray.serve._private.common import ApplicationStatus, DeploymentStatus +from ray.serve._private.common import ApplicationStatus, DeploymentStatus, ReplicaState GET_OR_PUT_URL = "http://localhost:52365/api/serve/deployments/" STATUS_URL = "http://localhost:52365/api/serve/deployments/status" @@ -547,13 +547,23 @@ def applications_running(): # CHECK: application details for app in ["app1", "app2"]: assert app_details[app].route_prefix == f"/{app}" - for dep_details in app_details[app].deployments.values(): - assert dep_details.status == DeploymentStatus.HEALTHY - + for deployment in app_details[app].deployments.values(): + assert deployment.status == DeploymentStatus.HEALTHY # Route prefix should be app level options eventually - assert "route_prefix" not in dep_details.deployment_config.dict( + assert "route_prefix" not in deployment.deployment_config.dict( exclude_unset=True ) + assert len(deployment.replicas) == deployment.deployment_config.num_replicas + + for replica in deployment.replicas: + assert replica.state == ReplicaState.RUNNING + assert ( + deployment.name in replica.replica_id + and deployment.name in replica.actor_name + ) + assert replica.actor_id and replica.node_id and replica.node_ip + assert replica.start_time_s > app_details[app].last_deployed_time_s + print("Finished checking application details.") diff --git a/python/ray/serve/_private/common.py b/python/ray/serve/_private/common.py index c14493b488d3..c5f239d3a9d1 100644 --- a/python/ray/serve/_private/common.py +++ b/python/ray/serve/_private/common.py @@ -28,6 +28,14 @@ class EndpointInfo: route: str +class ReplicaState(str, Enum): + STARTING = "STARTING" + UPDATING = "UPDATING" + RECOVERING = "RECOVERING" + RUNNING = "RUNNING" + STOPPING = "STOPPING" + + class ApplicationStatus(str, Enum): NOT_STARTED = "NOT_STARTED" DEPLOYING = "DEPLOYING" diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 6b9d4ff012b3..982b990a6687 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -29,9 +29,11 @@ ReplicaName, ReplicaTag, RunningReplicaInfo, + ReplicaState, ) from ray.serve.schema import ( DeploymentDetails, + ReplicaDetails, _deployment_info_to_schema, ) from ray.serve.config import DeploymentConfig @@ -62,14 +64,6 @@ logger = logging.getLogger(SERVE_LOGGER_NAME) -class ReplicaState(Enum): - STARTING = 1 - UPDATING = 2 - RECOVERING = 3 - RUNNING = 4 - STOPPING = 5 - - class ReplicaStartupStatus(Enum): PENDING_ALLOCATION = 1 PENDING_INITIALIZATION = 2 @@ -217,11 +211,14 @@ def __init__( # the non-detached case. self._actor_handle: ActorHandle = None + self._pid: int = None + self._actor_id: str = None if isinstance(scheduling_strategy, NodeAffinitySchedulingStrategy): self._node_id = scheduling_strategy.node_id else: # Populated after replica is allocated. self._node_id: str = None + self._node_ip: str = None # Populated in self.stop(). self._graceful_shutdown_ref: ObjectRef = None @@ -264,11 +261,26 @@ def actor_handle(self) -> Optional[ActorHandle]: def max_concurrent_queries(self) -> int: return self._max_concurrent_queries + @property + def pid(self) -> Optional[int]: + """Returns the pid of the actor, None if not started.""" + return self._pid + + @property + def actor_id(self) -> Optional[str]: + """Returns the actor id, None if not started.""" + return self._actor_id + @property def node_id(self) -> Optional[str]: """Returns the node id of the actor, None if not placed.""" return self._node_id + @property + def node_ip(self) -> Optional[str]: + """Returns the node ip of the actor, None if not placed.""" + return self._node_ip + def _check_obj_ref_ready(self, obj_ref: ObjectRef) -> bool: ready, _ = ray.wait([obj_ref], timeout=0) return len(ready) == 1 @@ -480,7 +492,9 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion] ) self._health_check_period_s = deployment_config.health_check_period_s self._health_check_timeout_s = deployment_config.health_check_timeout_s - self._node_id = ray.get(self._allocated_obj_ref) + self._pid, self._actor_id, self._node_id, self._node_ip = ray.get( + self._allocated_obj_ref + ) except Exception: logger.exception(f"Exception in deployment '{self._deployment_name}'") return ReplicaStartupStatus.FAILED, None @@ -694,6 +708,24 @@ def get_running_replica_info(self) -> RunningReplicaInfo: is_cross_language=self._actor.is_cross_language, ) + def get_replica_details(self, state: ReplicaState) -> ReplicaDetails: + """Get replica details. + + Args: + state: The state of the replica, which is not stored within a + DeploymentReplica object + """ + return ReplicaDetails( + replica_id=self.replica_tag, + state=state, + pid=self._actor.pid, + actor_name=self._actor._actor_name, + actor_id=self._actor.actor_id, + node_id=self._actor.node_id, + node_ip=self._actor.node_ip, + start_time_s=self._start_time, + ) + @property def replica_tag(self) -> ReplicaTag: return self._replica_tag @@ -1082,6 +1114,13 @@ def get_running_replica_infos(self) -> List[RunningReplicaInfo]: for replica in self._replicas.get([ReplicaState.RUNNING]) ] + def list_replica_details(self) -> List[ReplicaDetails]: + return [ + replica.get_replica_details(state) + for state in ReplicaState + for replica in self._replicas.get([state]) + ] + def _notify_running_replicas_changed(self): self._long_poll_host.notify_changed( (LongPollNamespace.RUNNING_REPLICAS, self._name), @@ -2034,6 +2073,7 @@ def get_deployment_details(self, deployment_name: str) -> DeploymentDetails: deployment_config=_deployment_info_to_schema( deployment_name, self.get_deployment(deployment_name) ), + replicas=self._deployment_states[deployment_name].list_replica_details(), ) def get_deployment_statuses( diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index eb645330fdd5..e51fe393d661 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -3,6 +3,7 @@ from importlib import import_module import inspect import logging +import os import pickle import time from typing import Any, Callable, Optional, Tuple, Dict @@ -210,9 +211,15 @@ async def is_allocated(self) -> str: At this time, the replica can transition from PENDING_ALLOCATION to PENDING_INITIALIZATION startup state. - Return the NodeID of this replica + Returns: + The PID, actor ID, node ID, node IP of the replica. """ - return ray.get_runtime_context().get_node_id() + return ( + os.getpid(), + ray.get_runtime_context().get_actor_id(), + ray.get_runtime_context().get_node_id(), + ray.util.get_node_ip_address(), + ) async def is_initialized( self, user_config: Optional[Any] = None, _after: Optional[Any] = None diff --git a/python/ray/serve/schema.py b/python/ray/serve/schema.py index a9372ca0eb51..6df57444c327 100644 --- a/python/ray/serve/schema.py +++ b/python/ray/serve/schema.py @@ -9,6 +9,7 @@ DeploymentStatus, StatusOverview, DeploymentInfo, + ReplicaState, ) from ray.serve._private.constants import DEPLOYMENT_NAME_PREFIX_SEPARATOR from ray.serve._private.utils import DEFAULT, dict_keys_snake_to_camel_case @@ -546,7 +547,35 @@ def get_empty_schema_dict() -> Dict: @PublicAPI(stability="alpha") -class DeploymentDetails(BaseModel, extra=Extra.forbid): +class ReplicaDetails(BaseModel, extra=Extra.forbid, frozen=True): + replica_id: str = Field( + description=( + "Unique ID for the replica. By default, this will be " + '"#", where the replica suffix is a ' + "randomly generated unique string." + ) + ) + state: ReplicaState = Field(description="Current state of the replica.") + pid: Optional[int] = Field(description="PID of the replica actor process.") + actor_name: str = Field(description="Name of the replica actor.") + actor_id: Optional[str] = Field(description="ID of the replica actor.") + node_id: Optional[str] = Field( + description="ID of the node that the replica actor is running on." + ) + node_ip: Optional[str] = Field( + description="IP address of the node that the replica actor is running on." + ) + start_time_s: float = Field( + description=( + "The time at which the replica actor was started. If the controller dies, " + "this is the time at which the controller recovers and retrieves replica " + "state from the running replica actor." + ) + ) + + +@PublicAPI(stability="alpha") +class DeploymentDetails(BaseModel, extra=Extra.forbid, frozen=True): name: str = Field(description="Deployment name.") status: DeploymentStatus = Field( description="The current status of the deployment." @@ -564,6 +593,9 @@ class DeploymentDetails(BaseModel, extra=Extra.forbid): "options, or Serve default values." ) ) + replicas: List[ReplicaDetails] = Field( + description="Details about the live replicas of this deployment." + ) @validator("deployment_config") def deployment_route_prefix_not_set(cls, v: DeploymentSchema): @@ -580,7 +612,7 @@ def deployment_route_prefix_not_set(cls, v: DeploymentSchema): @PublicAPI(stability="alpha") -class ApplicationDetails(BaseModel, extra=Extra.forbid): +class ApplicationDetails(BaseModel, extra=Extra.forbid, frozen=True): name: str = Field(description="Application name.") route_prefix: Optional[str] = Field( ..., @@ -659,7 +691,7 @@ def get_status_dict(self) -> Dict: @PublicAPI(stability="alpha") -class ServeInstanceDetails(BaseModel, extra=Extra.forbid): +class ServeInstanceDetails(BaseModel, extra=Extra.forbid, frozen=True): host: Optional[str] = Field( description="The host on which the HTTP server is listening for requests." ) diff --git a/python/ray/serve/tests/test_autoscaling_policy.py b/python/ray/serve/tests/test_autoscaling_policy.py index 7bfdc8050b9f..0bdcb4b7b75c 100644 --- a/python/ray/serve/tests/test_autoscaling_policy.py +++ b/python/ray/serve/tests/test_autoscaling_policy.py @@ -15,7 +15,7 @@ calculate_desired_num_replicas, ) from ray.serve._private.common import DeploymentInfo -from ray.serve._private.deployment_state import ReplicaState +from ray.serve._private.common import ReplicaState from ray.serve.config import AutoscalingConfig from ray.serve._private.constants import CONTROL_LOOP_PERIOD_S from ray.serve.controller import ServeController diff --git a/python/ray/serve/tests/test_cluster.py b/python/ray/serve/tests/test_cluster.py index bde670fb9ae9..97d8927894d1 100644 --- a/python/ray/serve/tests/test_cluster.py +++ b/python/ray/serve/tests/test_cluster.py @@ -11,7 +11,8 @@ from ray._private.test_utils import SignalActor, wait_for_condition from ray.cluster_utils import Cluster from ray.serve._private.constants import SERVE_NAMESPACE -from ray.serve._private.deployment_state import ReplicaStartupStatus, ReplicaState +from ray.serve._private.deployment_state import ReplicaStartupStatus +from ray.serve._private.common import ReplicaState @pytest.fixture diff --git a/python/ray/serve/tests/test_deployment_graph_autoscaling.py b/python/ray/serve/tests/test_deployment_graph_autoscaling.py index d75a22cd5592..662f25104bdc 100644 --- a/python/ray/serve/tests/test_deployment_graph_autoscaling.py +++ b/python/ray/serve/tests/test_deployment_graph_autoscaling.py @@ -7,7 +7,7 @@ from ray import serve from ray.serve.drivers import DAGDriver from ray.dag.input_node import InputNode -from ray.serve._private.deployment_state import ReplicaState +from ray.serve._private.common import ReplicaState from ray._private.test_utils import SignalActor, wait_for_condition # Magic number to use for speed up scale from 0 replica diff --git a/python/ray/serve/tests/test_deployment_state.py b/python/ray/serve/tests/test_deployment_state.py index fc8af8080d43..9082a74abed6 100644 --- a/python/ray/serve/tests/test_deployment_state.py +++ b/python/ray/serve/tests/test_deployment_state.py @@ -14,6 +14,7 @@ ReplicaConfig, ReplicaTag, ReplicaName, + ReplicaState, ) from ray.serve._private.deployment_state import ( DeploymentState, @@ -22,7 +23,6 @@ DeploymentVersion, DeploymentReplica, ReplicaStartupStatus, - ReplicaState, ReplicaStateContainer, VersionedReplica, rank_replicas_for_stopping,