Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Unset health replica stats when replica is under STOPPING #34761

Merged
merged 5 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -1283,8 +1283,7 @@ def _stop_or_update_outdated_version_replicas(self, max_to_stop=math.inf) -> int
# normal scale-up process.
if replica.version.requires_actor_restart(self._target_state.version):
code_version_changes += 1
replica.stop()
self._replicas.add(ReplicaState.STOPPING, replica)
self._stop_replica(replica)
replicas_changed = True
# Otherwise, only lightweight options in deployment config is a mismatch, so
# we update it dynamically without restarting the replica.
Expand Down Expand Up @@ -1463,8 +1462,7 @@ def _scale_deployment_replicas(self) -> bool:
f"Adding STOPPING to replica_tag: {replica}, "
f"deployment_name: {self._name}"
)
replica.stop()
self._replicas.add(ReplicaState.STOPPING, replica)
self._stop_replica(replica)

return replicas_changed

Expand Down Expand Up @@ -1583,8 +1581,7 @@ def _check_startup_replicas(
self._replica_constructor_retry_counter += 1

replicas_failed = True
replica.stop(graceful=False)
self._replicas.add(ReplicaState.STOPPING, replica)
self._stop_replica(replica)
elif start_status in [
ReplicaStartupStatus.PENDING_ALLOCATION,
ReplicaStartupStatus.PENDING_INITIALIZATION,
Expand All @@ -1597,8 +1594,7 @@ def _check_startup_replicas(
# Does it make sense to stop replicas in PENDING_ALLOCATION
# state?
if is_slow and stop_on_slow:
replica.stop(graceful=False)
self._replicas.add(ReplicaState.STOPPING, replica)
self._stop_replica(replica, graceful_stop=False)
else:
self._replicas.add(original_state, replica)

Expand All @@ -1619,6 +1615,18 @@ def _check_startup_replicas(

return slow_replicas, transitioned_to_running

def _stop_replica(self, replica, graceful_stop=True):
"""Stop replica
1. Stop the replica.
2. Change the replica into stopping state.
3. Set the health replica stats to 0.
"""
replica.stop(graceful=graceful_stop)
self._replicas.add(ReplicaState.STOPPING, replica)
self.health_check_gauge.set(
0, tags={"deployment": self._name, "replica": replica.replica_tag}
)

def _check_and_update_replicas(self) -> bool:
"""
Check current state of all DeploymentReplica being tracked, and compare
Expand All @@ -1644,8 +1652,7 @@ def _check_and_update_replicas(self) -> bool:
self.health_check_gauge.set(
0, tags={"deployment": self._name, "replica": replica.replica_tag}
)
replica.stop(graceful=False)
self._replicas.add(ReplicaState.STOPPING, replica)
self._stop_replica(replica, graceful_stop=False)
# If this is a replica of the target version, the deployment
# enters the "UNHEALTHY" status until the replica is
# recovered or a new deploy happens.
Expand Down Expand Up @@ -1855,8 +1862,7 @@ def _stop_all_replicas(self) -> bool:
ReplicaState.RECOVERING,
]
):
replica.stop()
self._replicas.add(ReplicaState.STOPPING, replica)
self._stop_replica(replica)
replica_changed = True
return replica_changed

Expand Down
40 changes: 40 additions & 0 deletions python/ray/serve/tests/test_standalone3.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ray.exceptions import RayActorError
from ray.serve._private.constants import (
SYNC_HANDLE_IN_DAG_FEATURE_FLAG_ENV_KEY,
SERVE_DEFAULT_APP_NAME,
)
from ray.serve.context import get_global_client
from ray.tests.conftest import call_ray_stop_only # noqa: F401
Expand Down Expand Up @@ -196,6 +197,45 @@ def verify_metrics():
serve.shutdown()


def test_replica_health_metric(ray_instance):
"""Test replica health metrics"""

@serve.deployment(num_replicas=2)
def f():
return "hello"

serve.run(f.bind())

def count_live_replica_metrics():
resp = requests.get("http://127.0.0.1:9999").text
resp = resp.split("\n")
count = 0
for metrics in resp:
if "# HELP" in metrics or "# TYPE" in metrics:
continue
if "serve_deployment_replica_healthy" in metrics:
if "1.0" in metrics:
count += 1
return count

wait_for_condition(
lambda: count_live_replica_metrics() == 2, timeout=120, retry_interval_ms=500
)

# Add more replicas
serve.run(f.options(num_replicas=10).bind())
wait_for_condition(
lambda: count_live_replica_metrics() == 10, timeout=120, retry_interval_ms=500
)

# delete the application
serve.delete(SERVE_DEFAULT_APP_NAME)
wait_for_condition(
lambda: count_live_replica_metrics() == 0, timeout=120, retry_interval_ms=500
)
serve.shutdown()


def test_shutdown_remote(start_and_shutdown_ray_cli_function):
"""Check that serve.shutdown() works on a remote Ray cluster."""

Expand Down