diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index b432d2eb2992..7b12e4db82d6 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -375,10 +375,10 @@ def start(self, deployment_info: DeploymentInfo, version: DeploymentVersion): if self._is_cross_language: self._actor_handle = JavaActorHandleProxy(self._actor_handle) self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - self._ready_obj_ref = self._actor_handle.reconfigure.remote(user_config) + self._ready_obj_ref = self._actor_handle.is_initialized.remote(user_config) else: self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - self._ready_obj_ref = self._actor_handle.reconfigure.remote( + self._ready_obj_ref = self._actor_handle.is_initialized.remote( user_config, # Ensure that `is_allocated` will execute before `reconfigure`, # because `reconfigure` runs user code that could block the replica @@ -431,22 +431,23 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion] Check if current replica has started by making ray API calls on relevant actor / object ref. + Replica initialization calls __init__(), reconfigure(), and check_health(). + Returns: state (ReplicaStartupStatus): PENDING_ALLOCATION: - replica is waiting for a worker to start PENDING_INITIALIZATION - - replica reconfigure() haven't returned. + - replica initialization hasn't finished. FAILED: - - replica __init__() failed. + - replica initialization failed. SUCCEEDED: - - replica __init__() and reconfigure() succeeded. + - replica initialization succeeded. version: None: - - replica reconfigure() haven't returned OR - - replica __init__() failed. + - for PENDING_ALLOCATION, PENDING_INITIALIZATION, or FAILED states version: - - replica __init__() and reconfigure() succeeded. + - for SUCCEEDED state """ # Check whether the replica has been allocated. @@ -1406,9 +1407,10 @@ def _check_curr_status(self) -> bool: name=self._name, status=DeploymentStatus.UNHEALTHY, message=( - "The Deployment constructor failed " - f"{failed_to_start_count} times in a row. See " - "logs for details." + f"The Deployment failed to start {failed_to_start_count} " + "times in a row. This may be due to a problem with the " + "deployment constructor or the initial health check failing. " + "See logs for details." ), ) return False diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index 2cf5c77b4183..22f22b9220b7 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -214,13 +214,25 @@ async def is_allocated(self) -> str: """ return ray.get_runtime_context().node_id - async def reconfigure( + async def is_initialized( self, user_config: Optional[Any] = None, _after: Optional[Any] = None - ) -> Tuple[DeploymentConfig, DeploymentVersion]: + ): # Unused `_after` argument is for scheduling: passing an ObjectRef # allows delaying reconfiguration until after this call has returned. - if self.replica is None: - await self._initialize_replica() + await self._initialize_replica() + + if user_config is not None: + await self.reconfigure(user_config) + + # A new replica should not be considered healthy until it passes an + # initial health check. If an initial health check fails, consider + # it an initialization failure. + await self.check_health() + return self.get_metadata() + + async def reconfigure( + self, user_config: Optional[Any] = None + ) -> Tuple[DeploymentConfig, DeploymentVersion]: if user_config is not None: await self.replica.reconfigure(user_config) diff --git a/python/ray/serve/tests/test_healthcheck.py b/python/ray/serve/tests/test_healthcheck.py index a0899548a805..03c6dbf5fe5c 100644 --- a/python/ray/serve/tests/test_healthcheck.py +++ b/python/ray/serve/tests/test_healthcheck.py @@ -4,6 +4,7 @@ from ray.exceptions import RayError from ray._private.test_utils import wait_for_condition from ray import serve +from ray.serve._private.common import DeploymentStatus from ray.serve._private.constants import REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD @@ -162,7 +163,7 @@ def __call__(self, *args): def test_consecutive_failures(serve_instance): - # Test that the health check must fail N times before being marked unhealthy. + # Test that the health check must fail N times before being restarted. counter = ray.remote(Counter).remote() @@ -202,6 +203,78 @@ def check_fails_3_times(): check_fails_3_times() +def test_health_check_failure_makes_deployment_unhealthy(serve_instance): + """If a deployment always fails health check, the deployment should be unhealthy.""" + + @serve.deployment + class AlwaysUnhealthy: + def check_health(self): + raise Exception("intended to fail") + + def __call__(self, *args): + return ray.get_runtime_context().current_actor + + with pytest.raises(RuntimeError): + serve.run(AlwaysUnhealthy.bind()) + + app_status = serve_instance.get_serve_status() + assert ( + app_status.deployment_statuses[0].name == "AlwaysUnhealthy" + and app_status.deployment_statuses[0].status == DeploymentStatus.UNHEALTHY + ) + + +def test_health_check_failure_makes_deployment_unhealthy_transition(serve_instance): + """ + If a deployment transitions to unhealthy, then continues to fail health check after + being restarted, the deployment should be unhealthy. + """ + + class Toggle: + def __init__(self): + self._should_fail = False + + def set_should_fail(self): + self._should_fail = True + + def should_fail(self): + return self._should_fail + + @serve.deployment(health_check_period_s=1, health_check_timeout_s=1) + class WillBeUnhealthy: + def __init__(self, toggle): + self._toggle = toggle + + def check_health(self): + if ray.get(self._toggle.should_fail.remote()): + raise Exception("intended to fail") + + def __call__(self, *args): + return ray.get_runtime_context().current_actor + + def check_status(expected_status: DeploymentStatus): + app_status = serve_instance.get_serve_status() + return ( + app_status.deployment_statuses[0].name == "WillBeUnhealthy" + and app_status.deployment_statuses[0].status == expected_status + ) + + toggle = ray.remote(Toggle).remote() + serve.run(WillBeUnhealthy.bind(toggle)) + + # Check that deployment is healthy initially + assert check_status(DeploymentStatus.HEALTHY) + + ray.get(toggle.set_should_fail.remote()) + + # Check that deployment is now unhealthy + wait_for_condition(check_status, expected_status=DeploymentStatus.UNHEALTHY) + + # Check that deployment stays unhealthy + for _ in range(5): + assert check_status(DeploymentStatus.UNHEALTHY) + + if __name__ == "__main__": import sys