Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Add health checking for http proxy actors #34944

Merged
merged 13 commits into from
May 8, 2023
86 changes: 50 additions & 36 deletions dashboard/modules/serve/tests/test_serve_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
from ray.serve._private.constants import SERVE_NAMESPACE, MULTI_APP_MIGRATION_MESSAGE
from ray.serve.tests.conftest import * # noqa: F401 F403
from ray.serve.schema import ServeInstanceDetails
from ray.serve._private.common import ApplicationStatus, DeploymentStatus, ReplicaState
from ray.serve._private.common import (
ApplicationStatus,
DeploymentStatus,
ReplicaState,
HTTPProxyStatus,
)
from ray.serve._private.constants import (
SERVE_DEFAULT_APP_NAME,
DEPLOYMENT_NAME_PREFIX_SEPARATOR,
Expand Down Expand Up @@ -493,17 +498,29 @@ def test_get_serve_instance_details(ray_start_stop, f_deployment_options):
"applications": [
{
"name": "app1",
"route_prefix": "/app1",
"route_prefix": "/apple",
"import_path": world_import_path,
"deployments": [f_deployment_options],
},
{
"name": "app2",
"route_prefix": "/app2",
"route_prefix": "/banana",
"import_path": fastapi_import_path,
},
],
}
expected_values = {
"app1": {
"route_prefix": "/apple",
"docs_path": None,
"deployments": {"app1_f", "app1_BasicDriver"},
},
"app2": {
"route_prefix": "/banana",
"docs_path": "/my_docs",
"deployments": {"app2_FastAPIDeployment"},
},
}

deploy_config_multi_app(config1)

Expand All @@ -526,43 +543,40 @@ def applications_running():
assert serve_details.http_options.host == "127.0.0.1"
assert serve_details.http_options.port == 8005
print("Confirmed fetched proxy location, host and port metadata correct.")
# Check HTTP Proxy statuses
assert all(
proxy.status == HTTPProxyStatus.HEALTHY
for proxy in serve_details.http_proxy_details.values()
)
print("Confirmed HTTP Proxies are healthy.")

app_details = serve_details.applications
# CHECK: application details
for i, app in enumerate(["app1", "app2"]):
# CHECK: app configs are equal
assert (
app_details[app].deployed_app_config.dict(exclude_unset=True)
== config1["applications"][i]
)
print(f"Checked the deployed app config for {app} from the fetched metadata.")

# CHECK: app configs are equal
assert (
app_details["app1"].deployed_app_config.dict(exclude_unset=True)
== config1["applications"][0]
)
assert (
app_details["app2"].deployed_app_config.dict(exclude_unset=True)
== config1["applications"][1]
)
print("Confirmed the deployed app configs from the fetched metadata is correct.")

# CHECK: deployment timestamp
assert app_details["app1"].last_deployed_time_s > 0
assert app_details["app2"].last_deployed_time_s > 0
print("Confirmed deployment timestamps are nonzero.")

# CHECK: docs path
assert app_details["app1"].docs_path is None
assert app_details["app2"].docs_path == "/my_docs"
print("Confirmed docs paths are correct.")

# CHECK: all deployments are present
assert app_details["app1"].deployments.keys() == {
"app1_f",
"app1_BasicDriver",
}
assert app_details["app2"].deployments.keys() == {
"app2_FastAPIDeployment",
}
print("Metadata for all deployed deployments are present.")
# CHECK: deployment timestamp
assert app_details[app].last_deployed_time_s > 0
print(f"Confirmed deployment timestamp for {app} is nonzero.")

# CHECK: route prefix
assert app_details[app].route_prefix == expected_values[app]["route_prefix"]
print(f"Checked route prefix for {app}.")

# CHECK: docs path
assert app_details[app].docs_path == expected_values[app]["docs_path"]
print(f"Checked docs path for {app}.")

# CHECK: all deployments are present
assert (
app_details[app].deployments.keys() == expected_values[app]["deployments"]
)

# CHECK: application details
for app in ["app1", "app2"]:
assert app_details[app].route_prefix == f"/{app}"
for deployment in app_details[app].deployments.values():
assert deployment.status == DeploymentStatus.HEALTHY
# Route prefix should be app level options eventually
Expand Down
6 changes: 6 additions & 0 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,9 @@ class ServeDeployMode(str, Enum):
UNSET = "UNSET"
SINGLE_APP = "SINGLE_APP"
MULTI_APP = "MULTI_APP"


class HTTPProxyStatus(str, Enum):
STARTING = "STARTING"
HEALTHY = "HEALTHY"
UNHEALTHY = "UNHEALTHY"
3 changes: 3 additions & 0 deletions python/ray/serve/_private/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,6 @@ async def run(self):

self.setup_complete.set()
await server.serve(sockets=[sock])

async def check_health(self):
zcin marked this conversation as resolved.
Show resolved Hide resolved
pass
77 changes: 76 additions & 1 deletion python/ray/serve/_private/http_state.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import random
import time
from typing import Dict, List, Tuple

import ray
Expand All @@ -20,11 +21,75 @@
format_actor_name,
get_all_node_ids,
)
from ray.serve._private.common import EndpointTag, NodeId
from ray.serve._private.common import EndpointTag, NodeId, HTTPProxyStatus
from ray.serve.schema import HTTPProxyDetails

logger = logging.getLogger(SERVE_LOGGER_NAME)


PROXY_HEALTH_CHECK_PERIOD_S = 10
zcin marked this conversation as resolved.
Show resolved Hide resolved
class HTTPProxyState:
def __init__(self, actor_handle: ActorHandle, actor_name: str):
self._actor_handle = actor_handle
self._actor_name = actor_name

self._status = HTTPProxyStatus.STARTING
self._actor_started = False
self._health_check_obj_ref = self._actor_handle.check_health.remote()
self._last_health_check_time: float = time.time()

@property
def status(self) -> HTTPProxyStatus:
return self._status

def _check_health_obj_ref_result(self):
"""Check on the result of the health check.

Should only be called after confirming the object ref is ready.
Resets _health_check_obj_ref to None at the end.
"""
assert len(ray.wait([self._health_check_obj_ref], timeout=0)[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need to have this assert?

try:
ray.get(self._health_check_obj_ref)
self._status = HTTPProxyStatus.HEALTHY
except Exception as e:
logger.warning(
f"Health check for HTTP proxy {self._actor_name} failed: {e}"
)
self._status = HTTPProxyStatus.UNHEALTHY

self._health_check_obj_ref = None

def update(self):
# Wait for first no-op health check to finish, indicating the actor has started
if not self._actor_started:
finished, _ = ray.wait([self._health_check_obj_ref], timeout=0)
if not finished:
return
self._check_health_obj_ref_result()
self._actor_started = True

randomized_period_s = PROXY_HEALTH_CHECK_PERIOD_S * random.uniform(0.9, 1.1)

if self._health_check_obj_ref:
finished, _ = ray.wait([self._health_check_obj_ref], timeout=0)
if finished:
self._check_health_obj_ref_result()
# If there's no active in-progress health check and it has been more than 10
# seconds since the last health check, perform another health check
if time.time() - self._last_health_check_time > randomized_period_s:
# If the HTTP Proxy has been blocked for more than 5 seconds, mark unhealthy
if time.time() - self._last_health_check_time > 5:
zcin marked this conversation as resolved.
Show resolved Hide resolved
self._status = HTTPProxyStatus.UNHEALTHY
logger.warning(
f"Health check for HTTP Proxy {self._actor_name} took more than "
f"{PROXY_HEALTH_CHECK_PERIOD_S} seconds."
)

self._health_check_obj_ref = self._actor_handle.check_health.remote()
self._last_health_check_time = time.time()


class HTTPState:
"""Manages all state for HTTP proxies in the system.

Expand All @@ -50,6 +115,7 @@ def __init__(
self._config = HTTPOptions()
self._proxy_actors: Dict[NodeId, ActorHandle] = dict()
self._proxy_actor_names: Dict[NodeId, str] = dict()
self._proxy_states: Dict[NodeId, HTTPProxyState] = dict()
self._head_node_id: str = head_node_id

self._gcs_client = gcs_client
Expand All @@ -73,9 +139,17 @@ def get_http_proxy_handles(self) -> Dict[NodeId, ActorHandle]:
def get_http_proxy_names(self) -> Dict[NodeId, str]:
return self._proxy_actor_names

def get_http_proxy_details(self) -> Dict[NodeId, HTTPProxyDetails]:
return {
node_id: HTTPProxyDetails(status=state.status)
for node_id, state in self._proxy_states.items()
}

def update(self):
self._start_proxies_if_needed()
self._stop_proxies_if_needed()
for proxy_state in self._proxy_states.values():
proxy_state.update()

def _get_target_nodes(self) -> List[Tuple[str, str]]:
"""Return the list of (node_id, ip_address) to deploy HTTP servers on."""
Expand Down Expand Up @@ -155,6 +229,7 @@ def _start_proxies_if_needed(self) -> None:

self._proxy_actors[node_id] = proxy
self._proxy_actor_names[node_id] = name
self._proxy_states[node_id] = HTTPProxyState(proxy, name)

def _stop_proxies_if_needed(self) -> bool:
"""Removes proxy actors from any nodes that no longer exist."""
Expand Down
3 changes: 3 additions & 0 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,9 @@ def get_serve_instance_details(self) -> Dict:
host=http_config.host,
port=http_config.port,
),
http_proxy_details=self.http_state.get_http_proxy_details()
if self.http_state
else None,
deploy_mode=self.deploy_mode,
applications=applications,
).dict(exclude_unset=True)
Expand Down
9 changes: 9 additions & 0 deletions python/ray/serve/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
DeploymentInfo,
ReplicaState,
ServeDeployMode,
HTTPProxyStatus,
)
from ray.serve.config import DeploymentMode
from ray.serve._private.utils import DEFAULT, dict_keys_snake_to_camel_case
Expand Down Expand Up @@ -758,6 +759,11 @@ def get_status_dict(self) -> Dict:
)


@PublicAPI(stability="alpha")
class HTTPProxyDetails(BaseModel):
status: HTTPProxyStatus
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add node_id, ip_address here?

@alanwguo anything else that there would be useful to show in the UI?

oh... maybe logs path?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any metrics related to HTTP Proxy today? I don't think so.

If we did, it would be useful to have some sort of identifier for the HTTP Proxy so i could link or filter the metrics down to the metrics related to a particular HTTP proxy.

For HTTPProxyStatus, do we have any sort of string that can provide more details about errors or anything?

Creation date could be useful I guess also

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we do have some metrics. The unique identifier would be the node_id, though that doesn't handle restarts if they were to happen. So maybe we should add actor_id here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



@PublicAPI(stability="alpha")
class ServeInstanceDetails(BaseModel, extra=Extra.forbid):
"""
Expand All @@ -776,6 +782,9 @@ class ServeInstanceDetails(BaseModel, extra=Extra.forbid):
),
)
http_options: Optional[HTTPOptionsSchema] = Field(description="HTTP Proxy options.")
http_proxy_details: Optional[Dict[str, HTTPProxyDetails]] = Field(
description="Info about HTTP Proxies."
)
deploy_mode: ServeDeployMode = Field(
description=(
"Whether a single-app config of format ServeApplicationSchema or multi-app "
Expand Down