Skip to content

Commit

Permalink
[serve] split ReplicaStartupState.PENDING into PENDING_ALLOCATION and…
Browse files Browse the repository at this point in the history
… PENDING_INITIALIZATION (#19431)
  • Loading branch information
iasoon authored Nov 2, 2021
1 parent f1eedb1 commit 7e6ea9e
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 56 deletions.
9 changes: 9 additions & 0 deletions python/ray/serve/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,15 @@ py_test(
deps = [":serve_lib"],
)

py_test(
name = "test_warnings",
size = "small",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)


py_test(
name = "test_fastapi",
size = "medium",
Expand Down
108 changes: 73 additions & 35 deletions python/ray/serve/backend_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class ReplicaState(Enum):


class ReplicaStartupStatus(Enum):
PENDING = 1
PENDING_SLOW_START = 2
PENDING_ALLOCATION = 1
PENDING_INITIALIZATION = 2
SUCCEEDED = 3
FAILED = 4

Expand Down Expand Up @@ -74,8 +74,10 @@ def __init__(self, actor_name: str, detached: bool, controller_name: str,
self._replica_tag = replica_tag
self._backend_tag = backend_tag

# Populated in self.start().
# Populated in either self.start() or self.recover()
self._allocated_obj_ref: ObjectRef = None
self._ready_obj_ref: ObjectRef = None

self._actor_resources: Dict[str, float] = None
self._max_concurrent_queries: int = None
self._graceful_shutdown_timeout_s: float = 0.0
Expand Down Expand Up @@ -180,6 +182,7 @@ def start(self, backend_info: BackendInfo, version: DeploymentVersion):
backend_info.deployment_config.to_proto_bytes(), version,
self._controller_name, self._detached)

self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
self._ready_obj_ref = self._actor_handle.reconfigure.remote(
backend_info.deployment_config.user_config)

Expand Down Expand Up @@ -210,6 +213,9 @@ def recover(self):
self._placement_group = self.get_placement_group(
self._placement_group_name)

# Re-fetch initialization proof
self._allocated_obj_ref = self._actor_handle.is_allocated.remote()

# Running actor handle already has all info needed, thus successful
# starting simply means retrieving replica version hash from actor
self._ready_obj_ref = self._actor_handle.get_metadata.remote()
Expand All @@ -222,7 +228,9 @@ def check_ready(
Returns:
state (ReplicaStartupStatus):
PENDING:
PENDING_ALLOCATION:
- replica is waiting for a worker to start
PENDING_INITIALIZATION
- replica reconfigure() haven't returned.
FAILED:
- replica __init__() failed.
Expand All @@ -235,11 +243,18 @@ def check_ready(
version:
- replica __init__() and reconfigure() succeeded.
"""

# check whether the replica has been allocated
ready, _ = ray.wait([self._allocated_obj_ref], timeout=0)
if len(ready) == 0:
return ReplicaStartupStatus.PENDING_ALLOCATION, None

# check whether relica initialization has completed
ready, _ = ray.wait([self._ready_obj_ref], timeout=0)
# In case of deployment constructor failure, ray.get will help to
# surface exception to each update() cycle.
if len(ready) == 0:
return ReplicaStartupStatus.PENDING, None
return ReplicaStartupStatus.PENDING_INITIALIZATION, None
elif len(ready) > 0:
try:
deployment_config, version = ray.get(ready)[0]
Expand Down Expand Up @@ -406,11 +421,8 @@ def check_started(self) -> ReplicaStartupStatus:
"""
status, version = self._actor.check_ready()

if status == ReplicaStartupStatus.PENDING:
if time.time() - self._start_time > SLOW_STARTUP_WARNING_S:
status = ReplicaStartupStatus.PENDING_SLOW_START
elif status == ReplicaStartupStatus.SUCCEEDED:
# Re-assign DeploymentVersion if start / update / recover succeeded
if status == ReplicaStartupStatus.SUCCEEDED:
# Re-assign BackendVersion if start / update / recover succeeded
# by reading re-computed version in RayServeReplica
if version is not None:
self._version = version
Expand Down Expand Up @@ -1003,10 +1015,9 @@ def _check_curr_goal_status(self) -> GoalStatus:

return GoalStatus.PENDING

def _check_startup_replicas(self,
original_state: ReplicaState,
stop_on_slow=False
) -> Tuple[List[BackendReplica], bool]:
def _check_startup_replicas(
self, original_state: ReplicaState, stop_on_slow=False
) -> Tuple[List[Tuple[BackendReplica, ReplicaStartupStatus]], bool]:
"""
Common helper function for startup actions tracking and status
transition: STARTING, UPDATING and RECOVERING.
Expand All @@ -1032,18 +1043,24 @@ def _check_startup_replicas(self,

replica.stop(graceful=False)
self._replicas.add(ReplicaState.STOPPING, replica)
elif start_status == ReplicaStartupStatus.PENDING:
# Not done yet, remain at same state
self._replicas.add(original_state, replica)
else:
# Slow start, remain at same state but also add to
# slow start replicas.
if not stop_on_slow:
self._replicas.add(original_state, replica)
else:
elif start_status in [
ReplicaStartupStatus.PENDING_ALLOCATION,
ReplicaStartupStatus.PENDING_INITIALIZATION,
]:

is_slow = time.time(
) - replica._start_time > SLOW_STARTUP_WARNING_S

if is_slow:
slow_replicas.append((replica, start_status))

# Does it make sense to stop replicas in PENDING_ALLOCATION
# state?
if is_slow and stop_on_slow:
replica.stop(graceful=False)
self._replicas.add(ReplicaState.STOPPING, replica)
slow_replicas.append(replica)
else:
self._replicas.add(original_state, replica)

return slow_replicas, transitioned_to_running

Expand Down Expand Up @@ -1086,17 +1103,38 @@ def _check_and_update_replicas(self) -> bool:
if (len(slow_start_replicas)
and time.time() - self._prev_startup_warning >
SLOW_STARTUP_WARNING_PERIOD_S):
required, available = slow_start_replicas[
0].resource_requirements()
logger.warning(
f"Deployment '{self._name}' has "
f"{len(slow_start_replicas)} replicas that have taken "
f"more than {SLOW_STARTUP_WARNING_S}s to start up. This "
"may be caused by waiting for the cluster to auto-scale, "
"waiting for a runtime environment to install, or a slow "
"constructor. Resources required "
f"for each replica: {required}, resources available: "
f"{available}. component=serve deployment={self._name}")

pending_allocation = []
pending_initialization = []

for replica, startup_status in slow_start_replicas:
if startup_status == ReplicaStartupStatus.PENDING_ALLOCATION:
pending_allocation.append(replica)
if startup_status \
== ReplicaStartupStatus.PENDING_INITIALIZATION:
pending_initialization.append(replica)

if len(pending_allocation) > 0:
required, available = slow_start_replicas[0][
0].resource_requirements()
logger.warning(
f"Deployment '{self._name}' has "
f"{len(pending_allocation)} replicas that have taken "
f"more than {SLOW_STARTUP_WARNING_S}s to be scheduled. "
f"This may be caused by waiting for the cluster to "
f"auto-scale, or waiting for a runtime environment "
f"to install. "
f"Resources required for each replica: {required}, "
f"resources available: {available}. "
f"component=serve deployment={self._name}")

if len(pending_initialization) > 0:
logger.warning(
f"Deployment '{self._name}' has "
f"{len(pending_initialization)} replicas that have taken "
f"more than {SLOW_STARTUP_WARNING_S}s to initialize. This "
f"may be caused by a slow __init__ or reconfigure method."
f"component=serve deployment={self._name}")

self._prev_startup_warning = time.time()

Expand Down
67 changes: 49 additions & 18 deletions python/ray/serve/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,48 @@ async def __init__(self, backend_tag, replica_tag, init_args,
replica_tag,
controller_name,
servable_object=None)
if is_function:
_callable = backend
else:
# This allows backends to define an async __init__ method
# (required for FastAPI backend definition).
_callable = backend.__new__(backend)
await sync_to_async(_callable.__init__)(*init_args,
**init_kwargs)
# Setting the context again to update the servable_object.
ray.serve.api._set_internal_replica_context(
backend_tag,
replica_tag,
controller_name,
servable_object=_callable)

assert controller_name, "Must provide a valid controller_name"

controller_namespace = ray.serve.api._get_controller_namespace(
detached)
controller_handle = ray.get_actor(
controller_name, namespace=controller_namespace)
self.backend = RayServeReplica(
_callable, backend_tag, replica_tag, deployment_config,
deployment_config.user_config, version, is_function,
controller_handle)

# This closure initializes user code and finalizes replica
# startup. By splitting the initialization step like this,
# we can already access this actor before the user code
# has finished initializing.
# The supervising state manager can then wait
# for allocation of this replica by using the `is_allocated`
# method. After that, it calls `reconfigure` to trigger
# user code initialization.
async def initialize_backend():
if is_function:
_callable = backend
else:
# This allows backends to define an async __init__ method
# (required for FastAPI backend definition).
_callable = backend.__new__(backend)
await sync_to_async(_callable.__init__)(*init_args,
**init_kwargs)
# Setting the context again to update the servable_object.
ray.serve.api._set_internal_replica_context(
backend_tag,
replica_tag,
controller_name,
servable_object=_callable)

self.backend = RayServeReplica(
_callable, backend_tag, replica_tag, deployment_config,
deployment_config.user_config, version, is_function,
controller_handle)

# Is it fine that backend is None here?
# Should we add a check in all methods that use self.backend
# or, alternatively, create an async get_backend() method?
self.backend = None
self._initialize_backend = initialize_backend

# asyncio.Event used to signal that the replica is shutting down.
self.shutdown_event = asyncio.Event()
Expand All @@ -108,8 +126,21 @@ async def handle_request(
query = Query(request_args, request_kwargs, request_metadata)
return await self.backend.handle_request(query)

async def is_allocated(self):
"""poke the replica to check whether it's alive.
When calling this method on an ActorHandle, it will complete as
soon as the actor has started running. We use this mechanism to
detect when a replica has been allocated a worker slot.
At this time, the replica can transition from PENDING_ALLOCATION
to PENDING_INITIALIZATION startup state.
"""
pass

async def reconfigure(self, user_config: Optional[Any] = None
) -> Tuple[DeploymentConfig, DeploymentVersion]:
if self.backend is None:
await self._initialize_backend()
if user_config is not None:
await self.backend.reconfigure(user_config)

Expand Down
6 changes: 3 additions & 3 deletions python/ray/serve/tests/test_backend_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ def __init__(self, actor_name: str, detached: bool, controller_name: str,
self.recovering = False
# Will be set when `start()` is called.
self.version = None
# Expected to be set in the test.
self.ready = False
# Initial state for a replica is PENDING_ALLOCATION.
self.ready = ReplicaStartupStatus.PENDING_ALLOCATION
# Will be set when `graceful_stop()` is called.
self.stopped = False
# Expected to be set in the test.
Expand Down Expand Up @@ -106,7 +106,7 @@ def recover(self):

def check_ready(self) -> ReplicaStartupStatus:
ready = self.ready
self.ready = ReplicaStartupStatus.PENDING
self.ready = ReplicaStartupStatus.PENDING_INITIALIZATION
if ready == ReplicaStartupStatus.SUCCEEDED and self.recovering:
self.recovering = False
self.started = True
Expand Down
47 changes: 47 additions & 0 deletions python/ray/serve/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from ray import serve
from ray.cluster_utils import Cluster

from ray.serve.backend_state import ReplicaStartupStatus, ReplicaState
from ray._private.test_utils import SignalActor, wait_for_condition


@pytest.fixture
def ray_cluster():
Expand Down Expand Up @@ -113,5 +116,49 @@ def get_pids(expected, timeout=30):
assert pids3.issubset(pids4)


@pytest.mark.skipif(sys.platform == "win32", reason="Flaky on Windows.")
def test_replica_startup_status_transitions(ray_cluster):
cluster = ray_cluster
cluster.add_node(num_cpus=1)
cluster.connect(namespace="serve")
serve_instance = serve.start()

signal = SignalActor.remote()

@serve.deployment(version="1", ray_actor_options={"num_cpus": 2})
class D:
def __init__(self):
ray.get(signal.wait.remote())

D.deploy(_blocking=False)

def get_replicas(replica_state):
controller = serve_instance._controller
replicas = ray.get(
controller._dump_replica_states_for_testing.remote(D.name))
return replicas.get([replica_state])

# wait for serve to start the replica, and catch a reference to it.
wait_for_condition(lambda: len(get_replicas(ReplicaState.STARTING)) > 0)
replica = get_replicas(ReplicaState.STARTING)[0]

# declare shorthands as yapf doesn't like long lambdas
PENDING_ALLOCATION = ReplicaStartupStatus.PENDING_ALLOCATION
PENDING_INITIALIZATION = ReplicaStartupStatus.PENDING_INITIALIZATION
SUCCEEDED = ReplicaStartupStatus.SUCCEEDED

# currently there are no resources to allocate the replica
assert replica.check_started() == PENDING_ALLOCATION

# add the necessary resources to allocate the replica
cluster.add_node(num_cpus=4)
wait_for_condition(
lambda: (replica.check_started() == PENDING_INITIALIZATION))

# send signal to complete replica intialization
signal.send.remote()
wait_for_condition(lambda: replica.check_started() == SUCCEEDED)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))
Loading

0 comments on commit 7e6ea9e

Please sign in to comment.