From 669043a30568880fa8b2e7bc72fa159813b8af52 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Mon, 19 Dec 2022 10:47:16 -0800 Subject: [PATCH 1/9] add initial health check Signed-off-by: Cindy Zhang --- python/ray/serve/_private/deployment_state.py | 86 ++++++++++++++----- 1 file changed, 64 insertions(+), 22 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index b432d2eb2992..afe31d4a22a1 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -69,8 +69,9 @@ class ReplicaState(Enum): class ReplicaStartupStatus(Enum): PENDING_ALLOCATION = 1 PENDING_INITIALIZATION = 2 - SUCCEEDED = 3 - FAILED = 4 + PENDING_INITIAL_HEALTH_CHECK = 3 + SUCCEEDED = 4 + FAILED = 5 class ReplicaHealthCheckResponse(Enum): @@ -459,23 +460,43 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion] # surface exception to each update() cycle. if not replica_ready: return ReplicaStartupStatus.PENDING_INITIALIZATION, None - else: - try: - # TODO(simon): fully implement reconfigure for Java replicas. - if self._is_cross_language: - return ReplicaStartupStatus.SUCCEEDED, None - - deployment_config, version = ray.get(self._ready_obj_ref) - self._max_concurrent_queries = deployment_config.max_concurrent_queries - self._graceful_shutdown_timeout_s = ( - deployment_config.graceful_shutdown_timeout_s - ) - self._health_check_period_s = deployment_config.health_check_period_s - self._health_check_timeout_s = deployment_config.health_check_timeout_s - self._node_id = ray.get(self._allocated_obj_ref).hex() - except Exception: - logger.exception(f"Exception in deployment '{self._deployment_name}'") - return ReplicaStartupStatus.FAILED, None + + # Once replica initialization completed, start initial health check + if self._health_check_ref is None: + self._health_check_ref = self._actor_handle.check_health.remote() + # Wait for health check to complete without blocking + if not self._check_obj_ref_ready(self._health_check_ref): + return ReplicaStartupStatus.PENDING_INITIAL_HEALTH_CHECK, None + + try: + # TODO(simon): fully implement reconfigure for Java replicas. + if self._is_cross_language: + return ReplicaStartupStatus.SUCCEEDED, None + + deployment_config, version = ray.get(self._ready_obj_ref) + self._max_concurrent_queries = deployment_config.max_concurrent_queries + self._graceful_shutdown_timeout_s = ( + deployment_config.graceful_shutdown_timeout_s + ) + self._health_check_period_s = deployment_config.health_check_period_s + self._health_check_timeout_s = deployment_config.health_check_timeout_s + self._node_id = ray.get(self._allocated_obj_ref).hex() + except Exception: + logger.exception(f"Exception in deployment '{self._deployment_name}'") + return ReplicaStartupStatus.FAILED, None + + # A new replica should not be considered healthy until it passes an initial + # health check. Before marking a replica as RUNNING, if an initial health check + # fails, mark deployment as UNHEALTHY and restart replica. + try: + ray.get(self._health_check_ref) + self._health_check_ref = None + except Exception: + logger.exception( + f"Deployment {self._deployment_name} failed initial health check." + ) + return ReplicaStartupStatus.FAILED, None + if self._deployment_is_cross_language: # todo: The replica's userconfig whitch java client created # is different from the controller's userconfig @@ -1406,9 +1427,10 @@ def _check_curr_status(self) -> bool: name=self._name, status=DeploymentStatus.UNHEALTHY, message=( - "The Deployment constructor failed " - f"{failed_to_start_count} times in a row. See " - "logs for details." + f"The Deployment failed to start {failed_to_start_count} " + "times in a row. This may be due to a problem with the " + "deployment constructor or the initial health check failing. " + "See logs for details." ), ) return False @@ -1469,6 +1491,7 @@ def _check_startup_replicas( elif start_status in [ ReplicaStartupStatus.PENDING_ALLOCATION, ReplicaStartupStatus.PENDING_INITIALIZATION, + ReplicaStartupStatus.PENDING_INITIAL_HEALTH_CHECK, ]: is_slow = time.time() - replica._start_time > SLOW_STARTUP_WARNING_S @@ -1546,12 +1569,15 @@ def _check_and_update_replicas(self) -> bool: pending_allocation = [] pending_initialization = [] + pending_initial_health_check = [] for replica, startup_status in slow_start_replicas: if startup_status == ReplicaStartupStatus.PENDING_ALLOCATION: pending_allocation.append(replica) if startup_status == ReplicaStartupStatus.PENDING_INITIALIZATION: pending_initialization.append(replica) + if startup_status == ReplicaStartupStatus.PENDING_INITIAL_HEALTH_CHECK: + pending_initial_health_check.append(replica) if len(pending_allocation) > 0: required, available = pending_allocation[0].resource_requirements() @@ -1595,6 +1621,22 @@ def _check_and_update_replicas(self) -> bool: status=DeploymentStatus.UPDATING, message=message, ) + + if len(pending_initial_health_check) > 0: + message = ( + f"Deployment {self._name} has {len(pending_initial_health_check)} " + f"replicas that have taken more than {SLOW_STARTUP_WARNING_S}s" + f"to receive a health check response. This may be caused by a slow " + f"check_health method." + ) + logger.warning(message) + if self._curr_status_info.status != DeploymentStatus.UNHEALTHY: + self._curr_status_info = DeploymentStatusInfo( + name=self._name, + status=DeploymentStatus.UPDATING, + message=message, + ) + self._prev_startup_warning = time.time() From 83d18f5d4c898928ab46fdf9c651ad5962b890c9 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Mon, 19 Dec 2022 14:33:15 -0800 Subject: [PATCH 2/9] add test Signed-off-by: Cindy Zhang --- python/ray/serve/_private/deployment_state.py | 3 +-- python/ray/serve/tests/test_healthcheck.py | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index afe31d4a22a1..60318d727d11 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -1621,7 +1621,7 @@ def _check_and_update_replicas(self) -> bool: status=DeploymentStatus.UPDATING, message=message, ) - + if len(pending_initial_health_check) > 0: message = ( f"Deployment {self._name} has {len(pending_initial_health_check)} " @@ -1636,7 +1636,6 @@ def _check_and_update_replicas(self) -> bool: status=DeploymentStatus.UPDATING, message=message, ) - self._prev_startup_warning = time.time() diff --git a/python/ray/serve/tests/test_healthcheck.py b/python/ray/serve/tests/test_healthcheck.py index a0899548a805..45ce20084324 100644 --- a/python/ray/serve/tests/test_healthcheck.py +++ b/python/ray/serve/tests/test_healthcheck.py @@ -4,6 +4,7 @@ from ray.exceptions import RayError from ray._private.test_utils import wait_for_condition from ray import serve +from ray.dashboard.modules.serve.sdk import ServeSubmissionClient from ray.serve._private.constants import REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD @@ -202,6 +203,27 @@ def check_fails_3_times(): check_fails_3_times() +def test_health_check_failure_makes_deployment_unhealthy(serve_instance): + """If a deployment always fails health check, the deployment should be unhealthy.""" + + @serve.deployment + class AlwaysUnhealthy: + def check_health(self): + raise Exception("intended to fail") + + def __call__(self, *args): + return ray.get_runtime_context().current_actor + + with pytest.raises(RuntimeError): + serve.run(AlwaysUnhealthy.bind()) + + app_status = ServeSubmissionClient("http://localhost:52365").get_status() + assert ( + app_status["deployment_statuses"][0]["name"] == "AlwaysUnhealthy" + and app_status["deployment_statuses"][0]["status"] == "UNHEALTHY" + ) + + if __name__ == "__main__": import sys From b9fd82f0545ccba14eee7a8564342650de0dcca8 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Tue, 3 Jan 2023 14:43:59 -0800 Subject: [PATCH 3/9] unify health check with initialization Signed-off-by: Cindy Zhang --- python/ray/serve/_private/deployment_state.py | 47 ++----------------- python/ray/serve/_private/replica.py | 20 ++++++-- 2 files changed, 19 insertions(+), 48 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 60318d727d11..290f0701e35f 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -69,9 +69,8 @@ class ReplicaState(Enum): class ReplicaStartupStatus(Enum): PENDING_ALLOCATION = 1 PENDING_INITIALIZATION = 2 - PENDING_INITIAL_HEALTH_CHECK = 3 - SUCCEEDED = 4 - FAILED = 5 + SUCCEEDED = 3 + FAILED = 4 class ReplicaHealthCheckResponse(Enum): @@ -376,10 +375,10 @@ def start(self, deployment_info: DeploymentInfo, version: DeploymentVersion): if self._is_cross_language: self._actor_handle = JavaActorHandleProxy(self._actor_handle) self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - self._ready_obj_ref = self._actor_handle.reconfigure.remote(user_config) + self._ready_obj_ref = self._actor_handle.is_ready.remote(user_config) else: self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - self._ready_obj_ref = self._actor_handle.reconfigure.remote( + self._ready_obj_ref = self._actor_handle.is_ready.remote( user_config, # Ensure that `is_allocated` will execute before `reconfigure`, # because `reconfigure` runs user code that could block the replica @@ -461,13 +460,6 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion] if not replica_ready: return ReplicaStartupStatus.PENDING_INITIALIZATION, None - # Once replica initialization completed, start initial health check - if self._health_check_ref is None: - self._health_check_ref = self._actor_handle.check_health.remote() - # Wait for health check to complete without blocking - if not self._check_obj_ref_ready(self._health_check_ref): - return ReplicaStartupStatus.PENDING_INITIAL_HEALTH_CHECK, None - try: # TODO(simon): fully implement reconfigure for Java replicas. if self._is_cross_language: @@ -485,18 +477,6 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion] logger.exception(f"Exception in deployment '{self._deployment_name}'") return ReplicaStartupStatus.FAILED, None - # A new replica should not be considered healthy until it passes an initial - # health check. Before marking a replica as RUNNING, if an initial health check - # fails, mark deployment as UNHEALTHY and restart replica. - try: - ray.get(self._health_check_ref) - self._health_check_ref = None - except Exception: - logger.exception( - f"Deployment {self._deployment_name} failed initial health check." - ) - return ReplicaStartupStatus.FAILED, None - if self._deployment_is_cross_language: # todo: The replica's userconfig whitch java client created # is different from the controller's userconfig @@ -1491,7 +1471,6 @@ def _check_startup_replicas( elif start_status in [ ReplicaStartupStatus.PENDING_ALLOCATION, ReplicaStartupStatus.PENDING_INITIALIZATION, - ReplicaStartupStatus.PENDING_INITIAL_HEALTH_CHECK, ]: is_slow = time.time() - replica._start_time > SLOW_STARTUP_WARNING_S @@ -1569,15 +1548,12 @@ def _check_and_update_replicas(self) -> bool: pending_allocation = [] pending_initialization = [] - pending_initial_health_check = [] for replica, startup_status in slow_start_replicas: if startup_status == ReplicaStartupStatus.PENDING_ALLOCATION: pending_allocation.append(replica) if startup_status == ReplicaStartupStatus.PENDING_INITIALIZATION: pending_initialization.append(replica) - if startup_status == ReplicaStartupStatus.PENDING_INITIAL_HEALTH_CHECK: - pending_initial_health_check.append(replica) if len(pending_allocation) > 0: required, available = pending_allocation[0].resource_requirements() @@ -1622,21 +1598,6 @@ def _check_and_update_replicas(self) -> bool: message=message, ) - if len(pending_initial_health_check) > 0: - message = ( - f"Deployment {self._name} has {len(pending_initial_health_check)} " - f"replicas that have taken more than {SLOW_STARTUP_WARNING_S}s" - f"to receive a health check response. This may be caused by a slow " - f"check_health method." - ) - logger.warning(message) - if self._curr_status_info.status != DeploymentStatus.UNHEALTHY: - self._curr_status_info = DeploymentStatusInfo( - name=self._name, - status=DeploymentStatus.UPDATING, - message=message, - ) - self._prev_startup_warning = time.time() for replica in self._replicas.pop(states=[ReplicaState.STOPPING]): diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index ddedaaf166a0..34c04acce9ee 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -136,7 +136,7 @@ async def __init__( # for allocation of this replica by using the `is_allocated` # method. After that, it calls `reconfigure` to trigger # user code initialization. - async def initialize_replica(): + async def initialize_replica(user_config): if is_function: _callable = deployment_def else: @@ -158,7 +158,7 @@ async def initialize_replica(): deployment_name, replica_tag, deployment_config, - deployment_config.user_config, + user_config, version, is_function, controller_handle, @@ -215,13 +215,20 @@ async def is_allocated(self) -> str: """ return ray.get_runtime_context().node_id + async def is_ready(self, user_config, _after: Optional[Any] = None): + await self._initialize_replica(user_config) + + # A new replica should not be considered healthy until it passes an + # initial health check. If an initial health check fails, consider + # it an initialization failure. + await self.check_health() + return self.get_metadata() + async def reconfigure( - self, user_config: Optional[Any] = None, _after: Optional[Any] = None + self, user_config: Optional[Any] = None ) -> Tuple[DeploymentConfig, DeploymentVersion]: # Unused `_after` argument is for scheduling: passing an ObjectRef # allows delaying reconfiguration until after this call has returned. - if self.replica is None: - await self._initialize_replica() if user_config is not None: await self.replica.reconfigure(user_config) @@ -345,6 +352,9 @@ def user_health_check(): metrics_process_func=process_remote_func, ) + if self.user_config is not None: + self.reconfigure(self.user_config) + # NOTE(edoakes): we used to recommend that users use the "ray" logger # and tagged the logs with metadata as below. We now recommend using # the "ray.serve" 'component logger' (as of Ray 1.13). This is left to From 3e1d8d6b209f82e5b97f3e6e741a59352f963b07 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Tue, 3 Jan 2023 15:30:25 -0800 Subject: [PATCH 4/9] fix Signed-off-by: Cindy Zhang --- python/ray/serve/_private/replica.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index c4e958918e07..d009d23dfd5d 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -135,7 +135,7 @@ async def __init__( # for allocation of this replica by using the `is_allocated` # method. After that, it calls `reconfigure` to trigger # user code initialization. - async def initialize_replica(user_config): + async def initialize_replica(): if is_function: _callable = deployment_def else: @@ -157,7 +157,7 @@ async def initialize_replica(user_config): deployment_name, replica_tag, deployment_config, - user_config, + deployment_config.user_config, version, is_function, controller_handle, @@ -215,7 +215,10 @@ async def is_allocated(self) -> str: return ray.get_runtime_context().node_id async def is_ready(self, user_config, _after: Optional[Any] = None): - await self._initialize_replica(user_config) + await self._initialize_replica() + + if user_config is not None: + await self.reconfigure(user_config) # A new replica should not be considered healthy until it passes an # initial health check. If an initial health check fails, consider @@ -351,9 +354,6 @@ def user_health_check(): metrics_process_func=process_remote_func, ) - if self.user_config is not None: - self.reconfigure(self.user_config) - # NOTE(edoakes): we used to recommend that users use the "ray" logger # and tagged the logs with metadata as below. We now recommend using # the "ray.serve" 'component logger' (as of Ray 1.13). This is left to From a8c8d46855c1f80ea64fec3fe929e06d425ff1b8 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Tue, 3 Jan 2023 17:53:43 -0800 Subject: [PATCH 5/9] add test + update comments Signed-off-by: Cindy Zhang --- python/ray/serve/_private/deployment_state.py | 14 +++-- python/ray/serve/tests/test_healthcheck.py | 55 +++++++++++++++++-- 2 files changed, 59 insertions(+), 10 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 290f0701e35f..3b6c392e2f05 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -431,22 +431,24 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion] Check if current replica has started by making ray API calls on relevant actor / object ref. + Replica initialization calls __init__(), reconfigure(), and check_health(). + Returns: state (ReplicaStartupStatus): PENDING_ALLOCATION: - replica is waiting for a worker to start PENDING_INITIALIZATION - - replica reconfigure() haven't returned. + - replica initialization hasn't finished. FAILED: - - replica __init__() failed. + - replica initialization failed. SUCCEEDED: - - replica __init__() and reconfigure() succeeded. + - replica initialization succeeded. version: None: - - replica reconfigure() haven't returned OR - - replica __init__() failed. + - replica is waiting for a worker to start, or replica + initialization hasn't finished, or replica initialization failed. version: - - replica __init__() and reconfigure() succeeded. + - replica initialization succeeded. """ # Check whether the replica has been allocated. diff --git a/python/ray/serve/tests/test_healthcheck.py b/python/ray/serve/tests/test_healthcheck.py index 45ce20084324..f9952df5ea2f 100644 --- a/python/ray/serve/tests/test_healthcheck.py +++ b/python/ray/serve/tests/test_healthcheck.py @@ -163,7 +163,7 @@ def __call__(self, *args): def test_consecutive_failures(serve_instance): - # Test that the health check must fail N times before being marked unhealthy. + # Test that the health check must fail N times before being restarted. counter = ray.remote(Counter).remote() @@ -217,13 +217,60 @@ def __call__(self, *args): with pytest.raises(RuntimeError): serve.run(AlwaysUnhealthy.bind()) - app_status = ServeSubmissionClient("http://localhost:52365").get_status() + app_status = serve_instance.get_serve_status() assert ( - app_status["deployment_statuses"][0]["name"] == "AlwaysUnhealthy" - and app_status["deployment_statuses"][0]["status"] == "UNHEALTHY" + app_status.deployment_statuses[0].name == "AlwaysUnhealthy" + and app_status.deployment_statuses[0].status == "UNHEALTHY" ) +def test_health_check_failure_makes_deployment_unhealthy2(serve_instance): + """If a deployment always fails health check, the deployment should be unhealthy.""" + class Toggle: + def __init__(self): + self._should_fail = False + + def set_should_fail(self): + self._should_fail = True + + def should_fail(self): + return self._should_fail + + @serve.deployment(health_check_period_s=1, health_check_timeout_s=1) + class WillBeUnhealthy: + def __init__(self, toggle): + self._toggle = toggle + + def check_health(self): + if ray.get(self._toggle.should_fail.remote()): + raise Exception("intended to fail") + + def __call__(self, *args): + return ray.get_runtime_context().current_actor + + def check_status(expected_status): + app_status = serve_instance.get_serve_status() + return ( + app_status.deployment_statuses[0].name == "WillBeUnhealthy" + and app_status.deployment_statuses[0].status == expected_status + ) + + toggle = ray.remote(Toggle).remote() + serve.run(WillBeUnhealthy.bind(toggle)) + + # Check that deployment is healthy initially + assert check_status("HEALTHY") + + ray.get(toggle.set_should_fail.remote()) + + # Check that deployment is now unhealthy + wait_for_condition(check_status, expected_status="UNHEALTHY", timeout=5) + + # Check that deployment stays unhealthy + for _ in range(5): + assert check_status("UNHEALTHY") + + if __name__ == "__main__": import sys From c25e0241d2065c91ea7d35259338e89e2867ee5a Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Wed, 4 Jan 2023 09:45:48 -0800 Subject: [PATCH 6/9] small changes Signed-off-by: Cindy Zhang --- python/ray/serve/_private/deployment_state.py | 37 +++++++++---------- python/ray/serve/_private/replica.py | 4 +- python/ray/serve/tests/test_healthcheck.py | 9 +++-- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 3b6c392e2f05..a6ca21a252ef 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -445,7 +445,7 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion] - replica initialization succeeded. version: None: - - replica is waiting for a worker to start, or replica + - replica is waiting for a worker to start, or replica initialization hasn't finished, or replica initialization failed. version: - replica initialization succeeded. @@ -461,24 +461,23 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion] # surface exception to each update() cycle. if not replica_ready: return ReplicaStartupStatus.PENDING_INITIALIZATION, None - - try: - # TODO(simon): fully implement reconfigure for Java replicas. - if self._is_cross_language: - return ReplicaStartupStatus.SUCCEEDED, None - - deployment_config, version = ray.get(self._ready_obj_ref) - self._max_concurrent_queries = deployment_config.max_concurrent_queries - self._graceful_shutdown_timeout_s = ( - deployment_config.graceful_shutdown_timeout_s - ) - self._health_check_period_s = deployment_config.health_check_period_s - self._health_check_timeout_s = deployment_config.health_check_timeout_s - self._node_id = ray.get(self._allocated_obj_ref).hex() - except Exception: - logger.exception(f"Exception in deployment '{self._deployment_name}'") - return ReplicaStartupStatus.FAILED, None - + else: + try: + # TODO(simon): fully implement reconfigure for Java replicas. + if self._is_cross_language: + return ReplicaStartupStatus.SUCCEEDED, None + + deployment_config, version = ray.get(self._ready_obj_ref) + self._max_concurrent_queries = deployment_config.max_concurrent_queries + self._graceful_shutdown_timeout_s = ( + deployment_config.graceful_shutdown_timeout_s + ) + self._health_check_period_s = deployment_config.health_check_period_s + self._health_check_timeout_s = deployment_config.health_check_timeout_s + self._node_id = ray.get(self._allocated_obj_ref).hex() + except Exception: + logger.exception(f"Exception in deployment '{self._deployment_name}'") + return ReplicaStartupStatus.FAILED, None if self._deployment_is_cross_language: # todo: The replica's userconfig whitch java client created # is different from the controller's userconfig diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index d009d23dfd5d..e2c8e1ddb42a 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -214,7 +214,9 @@ async def is_allocated(self) -> str: """ return ray.get_runtime_context().node_id - async def is_ready(self, user_config, _after: Optional[Any] = None): + async def is_ready( + self, user_config: Optional[Any] = None, _after: Optional[Any] = None + ): await self._initialize_replica() if user_config is not None: diff --git a/python/ray/serve/tests/test_healthcheck.py b/python/ray/serve/tests/test_healthcheck.py index f9952df5ea2f..d1e68fb726d6 100644 --- a/python/ray/serve/tests/test_healthcheck.py +++ b/python/ray/serve/tests/test_healthcheck.py @@ -4,7 +4,6 @@ from ray.exceptions import RayError from ray._private.test_utils import wait_for_condition from ray import serve -from ray.dashboard.modules.serve.sdk import ServeSubmissionClient from ray.serve._private.constants import REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD @@ -225,7 +224,11 @@ def __call__(self, *args): def test_health_check_failure_makes_deployment_unhealthy2(serve_instance): - """If a deployment always fails health check, the deployment should be unhealthy.""" + """ + If a deployment continues to fail health check after being restarted, the + deployment should be unhealthy. + """ + class Toggle: def __init__(self): self._should_fail = False @@ -247,7 +250,7 @@ def check_health(self): def __call__(self, *args): return ray.get_runtime_context().current_actor - + def check_status(expected_status): app_status = serve_instance.get_serve_status() return ( From 31ac90bf41480ef24b8f48d82a177792311c2038 Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Wed, 4 Jan 2023 09:50:05 -0800 Subject: [PATCH 7/9] small changes Signed-off-by: Cindy Zhang --- python/ray/serve/_private/deployment_state.py | 5 ++--- python/ray/serve/tests/test_healthcheck.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index a6ca21a252ef..318905d380e4 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -445,10 +445,9 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion] - replica initialization succeeded. version: None: - - replica is waiting for a worker to start, or replica - initialization hasn't finished, or replica initialization failed. + - for PENDING_ALLOCATION, PENDING_INITIALIZATION, or FAILED states version: - - replica initialization succeeded. + - for SUCCEEDED state """ # Check whether the replica has been allocated. diff --git a/python/ray/serve/tests/test_healthcheck.py b/python/ray/serve/tests/test_healthcheck.py index d1e68fb726d6..142b2862cd26 100644 --- a/python/ray/serve/tests/test_healthcheck.py +++ b/python/ray/serve/tests/test_healthcheck.py @@ -267,7 +267,7 @@ def check_status(expected_status): ray.get(toggle.set_should_fail.remote()) # Check that deployment is now unhealthy - wait_for_condition(check_status, expected_status="UNHEALTHY", timeout=5) + wait_for_condition(check_status, expected_status="UNHEALTHY") # Check that deployment stays unhealthy for _ in range(5): From 00b19cec90a9d657e70ef1efa5fbf9432f31e62f Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Thu, 5 Jan 2023 13:31:05 -0800 Subject: [PATCH 8/9] apply changes Signed-off-by: Cindy Zhang --- python/ray/serve/_private/deployment_state.py | 4 ++-- python/ray/serve/_private/replica.py | 6 +++--- python/ray/serve/tests/test_healthcheck.py | 11 ++++++----- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/python/ray/serve/_private/deployment_state.py b/python/ray/serve/_private/deployment_state.py index 318905d380e4..7b12e4db82d6 100644 --- a/python/ray/serve/_private/deployment_state.py +++ b/python/ray/serve/_private/deployment_state.py @@ -375,10 +375,10 @@ def start(self, deployment_info: DeploymentInfo, version: DeploymentVersion): if self._is_cross_language: self._actor_handle = JavaActorHandleProxy(self._actor_handle) self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - self._ready_obj_ref = self._actor_handle.is_ready.remote(user_config) + self._ready_obj_ref = self._actor_handle.is_initialized.remote(user_config) else: self._allocated_obj_ref = self._actor_handle.is_allocated.remote() - self._ready_obj_ref = self._actor_handle.is_ready.remote( + self._ready_obj_ref = self._actor_handle.is_initialized.remote( user_config, # Ensure that `is_allocated` will execute before `reconfigure`, # because `reconfigure` runs user code that could block the replica diff --git a/python/ray/serve/_private/replica.py b/python/ray/serve/_private/replica.py index e2c8e1ddb42a..22f22b9220b7 100644 --- a/python/ray/serve/_private/replica.py +++ b/python/ray/serve/_private/replica.py @@ -214,9 +214,11 @@ async def is_allocated(self) -> str: """ return ray.get_runtime_context().node_id - async def is_ready( + async def is_initialized( self, user_config: Optional[Any] = None, _after: Optional[Any] = None ): + # Unused `_after` argument is for scheduling: passing an ObjectRef + # allows delaying reconfiguration until after this call has returned. await self._initialize_replica() if user_config is not None: @@ -231,8 +233,6 @@ async def is_ready( async def reconfigure( self, user_config: Optional[Any] = None ) -> Tuple[DeploymentConfig, DeploymentVersion]: - # Unused `_after` argument is for scheduling: passing an ObjectRef - # allows delaying reconfiguration until after this call has returned. if user_config is not None: await self.replica.reconfigure(user_config) diff --git a/python/ray/serve/tests/test_healthcheck.py b/python/ray/serve/tests/test_healthcheck.py index 142b2862cd26..a99719c05610 100644 --- a/python/ray/serve/tests/test_healthcheck.py +++ b/python/ray/serve/tests/test_healthcheck.py @@ -4,6 +4,7 @@ from ray.exceptions import RayError from ray._private.test_utils import wait_for_condition from ray import serve +from ray.serve._private.common import DeploymentStatus from ray.serve._private.constants import REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD @@ -219,7 +220,7 @@ def __call__(self, *args): app_status = serve_instance.get_serve_status() assert ( app_status.deployment_statuses[0].name == "AlwaysUnhealthy" - and app_status.deployment_statuses[0].status == "UNHEALTHY" + and app_status.deployment_statuses[0].status == DeploymentStatus.UNHEALTHY ) @@ -251,7 +252,7 @@ def check_health(self): def __call__(self, *args): return ray.get_runtime_context().current_actor - def check_status(expected_status): + def check_status(expected_status: DeploymentStatus): app_status = serve_instance.get_serve_status() return ( app_status.deployment_statuses[0].name == "WillBeUnhealthy" @@ -262,16 +263,16 @@ def check_status(expected_status): serve.run(WillBeUnhealthy.bind(toggle)) # Check that deployment is healthy initially - assert check_status("HEALTHY") + assert check_status(DeploymentStatus.HEALTHY) ray.get(toggle.set_should_fail.remote()) # Check that deployment is now unhealthy - wait_for_condition(check_status, expected_status="UNHEALTHY") + wait_for_condition(check_status, expected_status=DeploymentStatus.UNHEALTHY) # Check that deployment stays unhealthy for _ in range(5): - assert check_status("UNHEALTHY") + assert check_status(DeploymentStatus.UNHEALTHY) if __name__ == "__main__": From c46e7e9b17bad2c2ae858aa4013aa0d9ffe9e5ae Mon Sep 17 00:00:00 2001 From: Cindy Zhang Date: Thu, 5 Jan 2023 13:35:06 -0800 Subject: [PATCH 9/9] improve test comment Signed-off-by: Cindy Zhang --- python/ray/serve/tests/test_healthcheck.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/serve/tests/test_healthcheck.py b/python/ray/serve/tests/test_healthcheck.py index a99719c05610..03c6dbf5fe5c 100644 --- a/python/ray/serve/tests/test_healthcheck.py +++ b/python/ray/serve/tests/test_healthcheck.py @@ -224,10 +224,10 @@ def __call__(self, *args): ) -def test_health_check_failure_makes_deployment_unhealthy2(serve_instance): +def test_health_check_failure_makes_deployment_unhealthy_transition(serve_instance): """ - If a deployment continues to fail health check after being restarted, the - deployment should be unhealthy. + If a deployment transitions to unhealthy, then continues to fail health check after + being restarted, the deployment should be unhealthy. """ class Toggle: