Skip to content

Commit

Permalink
[serve] Add replica info to metadata rest api (ray-project#33292)
Browse files Browse the repository at this point in the history
This adds details about the live replicas for each deployment to be fetched from the new GET endpoint.

Sample replica detail from running a test application:
```
replica_id: app2_BasicDriver#KhlXQe
state: RUNNING
pid: 25853
actor_name: SERVE_REPLICA::app2_BasicDriver#KhlXQe
actor_id: 2355af670b023966af79501501000000
node_id: 3631e75fc5312752c54b567ee66491a1e58a0420f0abc5b1c44e70cf
node_ip: 192.168.0.141
start_time_s: 1678818083.039281
```

Details:
* `is_allocated` on each replica used to return just the node id for the controller to confirm the replica has been placed on a node and started. Now, it returns a tuple of runtime-context-related info:
  * `pid`
  * `actor_id`
  * `node_id`
  * `node_ip`
* The four fields listed above that are retrieved from the replica actor may be `None` before the actor is actually scheduled, so they are marked optional in the schema. (The rest of the fields are filled in immediately when the replica is created to be tracked in the controller)
```
class ReplicaDetails(BaseModel, extra=Extra.forbid):
    replica_id: str
    state: ReplicaState
    pid: Optional[int]
    actor_name: str
    actor_id: Optional[str]
    node_id: Optional[str]
    node_ip: Optional[str]
    start_time_s: float
```

Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
zcin authored and edoakes committed Mar 22, 2023
1 parent 7d80a34 commit 1e658a5
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 23 deletions.
20 changes: 15 additions & 5 deletions dashboard/modules/serve/tests/test_serve_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ray.serve._private.constants import SERVE_NAMESPACE
from ray.serve.tests.conftest import * # noqa: F401 F403
from ray.serve.schema import ServeInstanceDetails
from ray.serve._private.common import ApplicationStatus, DeploymentStatus
from ray.serve._private.common import ApplicationStatus, DeploymentStatus, ReplicaState

GET_OR_PUT_URL = "http://localhost:52365/api/serve/deployments/"
STATUS_URL = "http://localhost:52365/api/serve/deployments/status"
Expand Down Expand Up @@ -547,13 +547,23 @@ def applications_running():
# CHECK: application details
for app in ["app1", "app2"]:
assert app_details[app].route_prefix == f"/{app}"
for dep_details in app_details[app].deployments.values():
assert dep_details.status == DeploymentStatus.HEALTHY

for deployment in app_details[app].deployments.values():
assert deployment.status == DeploymentStatus.HEALTHY
# Route prefix should be app level options eventually
assert "route_prefix" not in dep_details.deployment_config.dict(
assert "route_prefix" not in deployment.deployment_config.dict(
exclude_unset=True
)
assert len(deployment.replicas) == deployment.deployment_config.num_replicas

for replica in deployment.replicas:
assert replica.state == ReplicaState.RUNNING
assert (
deployment.name in replica.replica_id
and deployment.name in replica.actor_name
)
assert replica.actor_id and replica.node_id and replica.node_ip
assert replica.start_time_s > app_details[app].last_deployed_time_s

print("Finished checking application details.")


Expand Down
8 changes: 8 additions & 0 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ class EndpointInfo:
route: str


class ReplicaState(str, Enum):
STARTING = "STARTING"
UPDATING = "UPDATING"
RECOVERING = "RECOVERING"
RUNNING = "RUNNING"
STOPPING = "STOPPING"


class ApplicationStatus(str, Enum):
NOT_STARTED = "NOT_STARTED"
DEPLOYING = "DEPLOYING"
Expand Down
58 changes: 49 additions & 9 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
ReplicaName,
ReplicaTag,
RunningReplicaInfo,
ReplicaState,
)
from ray.serve.schema import (
DeploymentDetails,
ReplicaDetails,
_deployment_info_to_schema,
)
from ray.serve.config import DeploymentConfig
Expand Down Expand Up @@ -62,14 +64,6 @@
logger = logging.getLogger(SERVE_LOGGER_NAME)


class ReplicaState(Enum):
STARTING = 1
UPDATING = 2
RECOVERING = 3
RUNNING = 4
STOPPING = 5


class ReplicaStartupStatus(Enum):
PENDING_ALLOCATION = 1
PENDING_INITIALIZATION = 2
Expand Down Expand Up @@ -217,11 +211,14 @@ def __init__(
# the non-detached case.
self._actor_handle: ActorHandle = None

self._pid: int = None
self._actor_id: str = None
if isinstance(scheduling_strategy, NodeAffinitySchedulingStrategy):
self._node_id = scheduling_strategy.node_id
else:
# Populated after replica is allocated.
self._node_id: str = None
self._node_ip: str = None

# Populated in self.stop().
self._graceful_shutdown_ref: ObjectRef = None
Expand Down Expand Up @@ -264,11 +261,26 @@ def actor_handle(self) -> Optional[ActorHandle]:
def max_concurrent_queries(self) -> int:
return self._max_concurrent_queries

@property
def pid(self) -> Optional[int]:
"""Returns the pid of the actor, None if not started."""
return self._pid

@property
def actor_id(self) -> Optional[str]:
"""Returns the actor id, None if not started."""
return self._actor_id

@property
def node_id(self) -> Optional[str]:
"""Returns the node id of the actor, None if not placed."""
return self._node_id

@property
def node_ip(self) -> Optional[str]:
"""Returns the node ip of the actor, None if not placed."""
return self._node_ip

def _check_obj_ref_ready(self, obj_ref: ObjectRef) -> bool:
ready, _ = ray.wait([obj_ref], timeout=0)
return len(ready) == 1
Expand Down Expand Up @@ -480,7 +492,9 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion]
)
self._health_check_period_s = deployment_config.health_check_period_s
self._health_check_timeout_s = deployment_config.health_check_timeout_s
self._node_id = ray.get(self._allocated_obj_ref)
self._pid, self._actor_id, self._node_id, self._node_ip = ray.get(
self._allocated_obj_ref
)
except Exception:
logger.exception(f"Exception in deployment '{self._deployment_name}'")
return ReplicaStartupStatus.FAILED, None
Expand Down Expand Up @@ -694,6 +708,24 @@ def get_running_replica_info(self) -> RunningReplicaInfo:
is_cross_language=self._actor.is_cross_language,
)

def get_replica_details(self, state: ReplicaState) -> ReplicaDetails:
"""Get replica details.
Args:
state: The state of the replica, which is not stored within a
DeploymentReplica object
"""
return ReplicaDetails(
replica_id=self.replica_tag,
state=state,
pid=self._actor.pid,
actor_name=self._actor._actor_name,
actor_id=self._actor.actor_id,
node_id=self._actor.node_id,
node_ip=self._actor.node_ip,
start_time_s=self._start_time,
)

@property
def replica_tag(self) -> ReplicaTag:
return self._replica_tag
Expand Down Expand Up @@ -1082,6 +1114,13 @@ def get_running_replica_infos(self) -> List[RunningReplicaInfo]:
for replica in self._replicas.get([ReplicaState.RUNNING])
]

def list_replica_details(self) -> List[ReplicaDetails]:
return [
replica.get_replica_details(state)
for state in ReplicaState
for replica in self._replicas.get([state])
]

def _notify_running_replicas_changed(self):
self._long_poll_host.notify_changed(
(LongPollNamespace.RUNNING_REPLICAS, self._name),
Expand Down Expand Up @@ -2034,6 +2073,7 @@ def get_deployment_details(self, deployment_name: str) -> DeploymentDetails:
deployment_config=_deployment_info_to_schema(
deployment_name, self.get_deployment(deployment_name)
),
replicas=self._deployment_states[deployment_name].list_replica_details(),
)

def get_deployment_statuses(
Expand Down
11 changes: 9 additions & 2 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from importlib import import_module
import inspect
import logging
import os
import pickle
import time
from typing import Any, Callable, Optional, Tuple, Dict
Expand Down Expand Up @@ -210,9 +211,15 @@ async def is_allocated(self) -> str:
At this time, the replica can transition from PENDING_ALLOCATION
to PENDING_INITIALIZATION startup state.
Return the NodeID of this replica
Returns:
The PID, actor ID, node ID, node IP of the replica.
"""
return ray.get_runtime_context().get_node_id()
return (
os.getpid(),
ray.get_runtime_context().get_actor_id(),
ray.get_runtime_context().get_node_id(),
ray.util.get_node_ip_address(),
)

async def is_initialized(
self, user_config: Optional[Any] = None, _after: Optional[Any] = None
Expand Down
38 changes: 35 additions & 3 deletions python/ray/serve/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
DeploymentStatus,
StatusOverview,
DeploymentInfo,
ReplicaState,
)
from ray.serve._private.constants import DEPLOYMENT_NAME_PREFIX_SEPARATOR
from ray.serve._private.utils import DEFAULT, dict_keys_snake_to_camel_case
Expand Down Expand Up @@ -546,7 +547,35 @@ def get_empty_schema_dict() -> Dict:


@PublicAPI(stability="alpha")
class DeploymentDetails(BaseModel, extra=Extra.forbid):
class ReplicaDetails(BaseModel, extra=Extra.forbid, frozen=True):
replica_id: str = Field(
description=(
"Unique ID for the replica. By default, this will be "
'"<deployment name>#<replica suffix>", where the replica suffix is a '
"randomly generated unique string."
)
)
state: ReplicaState = Field(description="Current state of the replica.")
pid: Optional[int] = Field(description="PID of the replica actor process.")
actor_name: str = Field(description="Name of the replica actor.")
actor_id: Optional[str] = Field(description="ID of the replica actor.")
node_id: Optional[str] = Field(
description="ID of the node that the replica actor is running on."
)
node_ip: Optional[str] = Field(
description="IP address of the node that the replica actor is running on."
)
start_time_s: float = Field(
description=(
"The time at which the replica actor was started. If the controller dies, "
"this is the time at which the controller recovers and retrieves replica "
"state from the running replica actor."
)
)


@PublicAPI(stability="alpha")
class DeploymentDetails(BaseModel, extra=Extra.forbid, frozen=True):
name: str = Field(description="Deployment name.")
status: DeploymentStatus = Field(
description="The current status of the deployment."
Expand All @@ -564,6 +593,9 @@ class DeploymentDetails(BaseModel, extra=Extra.forbid):
"options, or Serve default values."
)
)
replicas: List[ReplicaDetails] = Field(
description="Details about the live replicas of this deployment."
)

@validator("deployment_config")
def deployment_route_prefix_not_set(cls, v: DeploymentSchema):
Expand All @@ -580,7 +612,7 @@ def deployment_route_prefix_not_set(cls, v: DeploymentSchema):


@PublicAPI(stability="alpha")
class ApplicationDetails(BaseModel, extra=Extra.forbid):
class ApplicationDetails(BaseModel, extra=Extra.forbid, frozen=True):
name: str = Field(description="Application name.")
route_prefix: Optional[str] = Field(
...,
Expand Down Expand Up @@ -659,7 +691,7 @@ def get_status_dict(self) -> Dict:


@PublicAPI(stability="alpha")
class ServeInstanceDetails(BaseModel, extra=Extra.forbid):
class ServeInstanceDetails(BaseModel, extra=Extra.forbid, frozen=True):
host: Optional[str] = Field(
description="The host on which the HTTP server is listening for requests."
)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
calculate_desired_num_replicas,
)
from ray.serve._private.common import DeploymentInfo
from ray.serve._private.deployment_state import ReplicaState
from ray.serve._private.common import ReplicaState
from ray.serve.config import AutoscalingConfig
from ray.serve._private.constants import CONTROL_LOOP_PERIOD_S
from ray.serve.controller import ServeController
Expand Down
3 changes: 2 additions & 1 deletion python/ray/serve/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from ray._private.test_utils import SignalActor, wait_for_condition
from ray.cluster_utils import Cluster
from ray.serve._private.constants import SERVE_NAMESPACE
from ray.serve._private.deployment_state import ReplicaStartupStatus, ReplicaState
from ray.serve._private.deployment_state import ReplicaStartupStatus
from ray.serve._private.common import ReplicaState


@pytest.fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ray import serve
from ray.serve.drivers import DAGDriver
from ray.dag.input_node import InputNode
from ray.serve._private.deployment_state import ReplicaState
from ray.serve._private.common import ReplicaState
from ray._private.test_utils import SignalActor, wait_for_condition

# Magic number to use for speed up scale from 0 replica
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ReplicaConfig,
ReplicaTag,
ReplicaName,
ReplicaState,
)
from ray.serve._private.deployment_state import (
DeploymentState,
Expand All @@ -22,7 +23,6 @@
DeploymentVersion,
DeploymentReplica,
ReplicaStartupStatus,
ReplicaState,
ReplicaStateContainer,
VersionedReplica,
rank_replicas_for_stopping,
Expand Down

0 comments on commit 1e658a5

Please sign in to comment.