Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Add initial health check before marking a replica as RUNNING #31189

Merged
merged 13 commits into from
Jan 9, 2023
24 changes: 13 additions & 11 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,10 @@ def start(self, deployment_info: DeploymentInfo, version: DeploymentVersion):
if self._is_cross_language:
self._actor_handle = JavaActorHandleProxy(self._actor_handle)
self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
self._ready_obj_ref = self._actor_handle.reconfigure.remote(user_config)
self._ready_obj_ref = self._actor_handle.is_initialized.remote(user_config)
else:
self._allocated_obj_ref = self._actor_handle.is_allocated.remote()
self._ready_obj_ref = self._actor_handle.reconfigure.remote(
self._ready_obj_ref = self._actor_handle.is_initialized.remote(
user_config,
# Ensure that `is_allocated` will execute before `reconfigure`,
# because `reconfigure` runs user code that could block the replica
Expand Down Expand Up @@ -431,22 +431,23 @@ def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[DeploymentVersion]
Check if current replica has started by making ray API calls on
relevant actor / object ref.

Replica initialization calls __init__(), reconfigure(), and check_health().

Returns:
state (ReplicaStartupStatus):
PENDING_ALLOCATION:
- replica is waiting for a worker to start
PENDING_INITIALIZATION
- replica reconfigure() haven't returned.
- replica initialization hasn't finished.
FAILED:
- replica __init__() failed.
- replica initialization failed.
SUCCEEDED:
- replica __init__() and reconfigure() succeeded.
- replica initialization succeeded.
version:
None:
- replica reconfigure() haven't returned OR
- replica __init__() failed.
- for PENDING_ALLOCATION, PENDING_INITIALIZATION, or FAILED states
version:
- replica __init__() and reconfigure() succeeded.
- for SUCCEEDED state
"""

# Check whether the replica has been allocated.
Expand Down Expand Up @@ -1406,9 +1407,10 @@ def _check_curr_status(self) -> bool:
name=self._name,
status=DeploymentStatus.UNHEALTHY,
message=(
"The Deployment constructor failed "
f"{failed_to_start_count} times in a row. See "
"logs for details."
f"The Deployment failed to start {failed_to_start_count} "
"times in a row. This may be due to a problem with the "
"deployment constructor or the initial health check failing. "
"See logs for details."
),
)
return False
Expand Down
20 changes: 16 additions & 4 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,25 @@ async def is_allocated(self) -> str:
"""
return ray.get_runtime_context().node_id

async def reconfigure(
async def is_initialized(
self, user_config: Optional[Any] = None, _after: Optional[Any] = None
) -> Tuple[DeploymentConfig, DeploymentVersion]:
):
# Unused `_after` argument is for scheduling: passing an ObjectRef
# allows delaying reconfiguration until after this call has returned.
if self.replica is None:
await self._initialize_replica()
await self._initialize_replica()

if user_config is not None:
await self.reconfigure(user_config)

# A new replica should not be considered healthy until it passes an
# initial health check. If an initial health check fails, consider
# it an initialization failure.
await self.check_health()
return self.get_metadata()

async def reconfigure(
self, user_config: Optional[Any] = None
) -> Tuple[DeploymentConfig, DeploymentVersion]:
if user_config is not None:
await self.replica.reconfigure(user_config)

Expand Down
75 changes: 74 additions & 1 deletion python/ray/serve/tests/test_healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ray.exceptions import RayError
from ray._private.test_utils import wait_for_condition
from ray import serve
from ray.serve._private.common import DeploymentStatus
from ray.serve._private.constants import REPLICA_HEALTH_CHECK_UNHEALTHY_THRESHOLD


Expand Down Expand Up @@ -162,7 +163,7 @@ def __call__(self, *args):


def test_consecutive_failures(serve_instance):
# Test that the health check must fail N times before being marked unhealthy.
# Test that the health check must fail N times before being restarted.

counter = ray.remote(Counter).remote()

Expand Down Expand Up @@ -202,6 +203,78 @@ def check_fails_3_times():
check_fails_3_times()


def test_health_check_failure_makes_deployment_unhealthy(serve_instance):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a test:
The deployment is HEALTHY -> UNHEALTHY when there is a replica failing the health check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

"""If a deployment always fails health check, the deployment should be unhealthy."""

@serve.deployment
class AlwaysUnhealthy:
def check_health(self):
raise Exception("intended to fail")

def __call__(self, *args):
return ray.get_runtime_context().current_actor

with pytest.raises(RuntimeError):
serve.run(AlwaysUnhealthy.bind())

app_status = serve_instance.get_serve_status()
assert (
app_status.deployment_statuses[0].name == "AlwaysUnhealthy"
and app_status.deployment_statuses[0].status == DeploymentStatus.UNHEALTHY
)


def test_health_check_failure_makes_deployment_unhealthy_transition(serve_instance):
"""
If a deployment transitions to unhealthy, then continues to fail health check after
being restarted, the deployment should be unhealthy.
"""

class Toggle:
def __init__(self):
self._should_fail = False

def set_should_fail(self):
self._should_fail = True

def should_fail(self):
return self._should_fail

@serve.deployment(health_check_period_s=1, health_check_timeout_s=1)
class WillBeUnhealthy:
def __init__(self, toggle):
self._toggle = toggle

def check_health(self):
if ray.get(self._toggle.should_fail.remote()):
raise Exception("intended to fail")

def __call__(self, *args):
return ray.get_runtime_context().current_actor

def check_status(expected_status: DeploymentStatus):
app_status = serve_instance.get_serve_status()
return (
app_status.deployment_statuses[0].name == "WillBeUnhealthy"
and app_status.deployment_statuses[0].status == expected_status
)

toggle = ray.remote(Toggle).remote()
serve.run(WillBeUnhealthy.bind(toggle))

# Check that deployment is healthy initially
assert check_status(DeploymentStatus.HEALTHY)

ray.get(toggle.set_should_fail.remote())

# Check that deployment is now unhealthy
wait_for_condition(check_status, expected_status=DeploymentStatus.UNHEALTHY)

# Check that deployment stays unhealthy
for _ in range(5):
assert check_status(DeploymentStatus.UNHEALTHY)


if __name__ == "__main__":
import sys

Expand Down