Skip to content

Commit

Permalink
[Serve] Recover PENDING_INITIALIZATION status actor (ray-project#33890)
Browse files Browse the repository at this point in the history
Then replica is under initializing state and the controller is dead, user will see
```
(ServeController pid=458318) ERROR 2023-03-29 08:44:42,547 controller 458318 deployment_state.py:500 - Exception in deployment 'xqHWInctmH'
(ServeController pid=458318) Traceback (most recent call last):
(ServeController pid=458318)   File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/serve/_private/deployment_state.py", line 489, in check_ready
(ServeController pid=458318)     deployment_config, version = ray.get(self._ready_obj_ref)
(ServeController pid=458318)   File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
(ServeController pid=458318)     return func(*args, **kwargs)
(ServeController pid=458318)   File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/worker.py", line 2508, in get
(ServeController pid=458318)     raise value.as_instanceof_cause()
(ServeController pid=458318) ray.exceptions.RayTaskError(AttributeError): ray::ServeReplica:xqHWInctmH.get_metadata() (pid=458148, ip=172.31.126.222, repr=<ray.serve._private.replica.ServeReplica:xqHWInctmH object at 0x7f57ae0f0110>)
(ServeController pid=458318)   File "/home/ray/anaconda3/lib/python3.7/concurrent/futures/_base.py", line 428, in result
(ServeController pid=458318)     return self.__get_result()
(ServeController pid=458318)   File "/home/ray/anaconda3/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
(ServeController pid=458318)     raise self._exception
(ServeController pid=458318)   File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/serve/_private/replica.py", line 249, in get_metadata
(ServeController pid=458318)     return self.replica.deployment_config, self.replica.version
(ServeController pid=458318) AttributeError: 'NoneType' object has no attribute 'deployment_config'
```

- Fix the NoneType bug when recover happens, no matter the actor is under any state, user would not see the traceback. Instead user will see the
1.  slow startup warning, and then replica will be terminated, and new replica will be provisioned.
2. If actor succeed in `PENDING_INITIALIZATION`, no error pops out.

This is observed from long_running_serve_failure: https://console.anyscale-staging.com/o/anyscale-internal/projects/prj_qC3ZfndQWYYjx2cz8KWGNUL4/clusters/ses_u7xeve33e2djg9grgr9qcs9l4x?command-history-section=command_history

**Note**: If user constructor initialization just needs very long time to finish, it is recommended to increase the `SERVE_SLOW_STARTUP_WARNING_S`, (if the user doesn't change, the deployment manager will terminate the old replica and start new replica after the timeout.)

Co-authored-by: shrekris-anyscale <[email protected]>
Signed-off-by: elliottower <[email protected]>
  • Loading branch information
2 people authored and elliottower committed Apr 22, 2023
1 parent 84a880c commit 717d403
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 3 deletions.
5 changes: 4 additions & 1 deletion python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,10 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion]
self._allocated_obj_ref
)
except Exception:
logger.exception(f"Exception in deployment '{self._deployment_name}'")
logger.exception(
f"Exception in replica '{self._replica_tag}', "
"the replica will be stopped."
)
return ReplicaStartupStatus.FAILED, None
if self._deployment_is_cross_language:
# todo: The replica's userconfig whitch java client created
Expand Down
12 changes: 10 additions & 2 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ async def __init__(
controller_name, namespace=SERVE_NAMESPACE
)

# Indicates whether the replica has finished initializing.
self._init_finish_event = asyncio.Event()

# This closure initializes user code and finalizes replica
# startup. By splitting the initialization step like this,
# we can already access this actor before the user code
Expand Down Expand Up @@ -164,6 +167,7 @@ async def initialize_replica():
is_function,
controller_handle,
)
self._init_finish_event.set()

# Is it fine that replica is None here?
# Should we add a check in all methods that use self.replica
Expand Down Expand Up @@ -243,9 +247,13 @@ async def reconfigure(
if user_config is not None:
await self.replica.reconfigure(user_config)

return self.get_metadata()
return await self.get_metadata()

def get_metadata(self) -> Tuple[DeploymentConfig, DeploymentVersion]:
async def get_metadata(
self,
) -> Tuple[DeploymentConfig, DeploymentVersion]:
# Wait for replica initialization to finish
await self._init_finish_event.wait()
return self.replica.deployment_config, self.replica.version

async def prepare_for_shutdown(self):
Expand Down
48 changes: 48 additions & 0 deletions python/ray/serve/tests/test_controller_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import pytest
from collections import defaultdict
from ray._private.test_utils import wait_for_condition

import ray
from ray._private.test_utils import SignalActor
Expand Down Expand Up @@ -200,5 +201,52 @@ def make_nonblocking_calls(expected, expect_blocking=False, num_returns=1):
make_nonblocking_calls({"2": 2}, num_returns=2)


def test_controller_recover_initializing_actor(serve_instance):
"""Recover the actor which is under PENDING_INITIALIZATION"""

signal = SignalActor.remote()
signal2 = SignalActor.remote()
client = serve_instance

@ray.remote
def pending_init_indicator():
ray.get(signal2.wait.remote())
return True

@serve.deployment
class V1:
async def __init__(self):
ray.get(signal2.send.remote())
await signal.wait.remote()

def __call__(self, request):
return f"1|{os.getpid()}"

serve.run(V1.bind(), _blocking=False)
ray.get(pending_init_indicator.remote())

def get_actor_info(name: str):
all_current_actors = list_actors(filters=[("state", "=", "ALIVE")])
for actor in all_current_actors:
if SERVE_PROXY_NAME in actor["name"]:
continue
if name in actor["name"]:
print(actor)
return actor["name"], actor["pid"]

actor_tag, _ = get_actor_info(V1.name)
_, controller1_pid = get_actor_info(SERVE_CONTROLLER_NAME)
ray.kill(serve.context._global_client._controller, no_restart=False)
# wait for controller is alive again
wait_for_condition(get_actor_info, name=SERVE_CONTROLLER_NAME)
assert controller1_pid != get_actor_info(SERVE_CONTROLLER_NAME)[1]

# Let the actor proceed initialization
ray.get(signal.send.remote())
client._wait_for_deployment_healthy(V1.name)
# Make sure the actor before controller dead is staying alive.
assert actor_tag == get_actor_info(V1.name)[0]


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))

0 comments on commit 717d403

Please sign in to comment.