Skip to content

Commit

Permalink
[serve] Better surfacing of errors in serve status (ray-project#34773)
Browse files Browse the repository at this point in the history
1. Surface tracebacks from constructor failures.
Output from `serve status`:
```
name: default
app_status:
  status: DEPLOY_FAILED
  message: |-
    Deploying app 'default' failed:
    ray::deploy_serve_application() (pid=15878, ip=192.168.1.14)
      File "/Users/cindyz/ray/python/ray/serve/controller.py", line 947, in deploy_serve_application
        serve.run(app, name=name, route_prefix=route_prefix)
      File "/Users/cindyz/ray/python/ray/serve/api.py", line 539, in run
        client.deploy_application(
      File "/Users/cindyz/ray/python/ray/serve/_private/client.py", line 43, in check
        return f(self, *args, **kwargs)
      File "/Users/cindyz/ray/python/ray/serve/_private/client.py", line 299, in deploy_application
        self._wait_for_deployment_healthy(deployment_name)
      File "/Users/cindyz/ray/python/ray/serve/_private/client.py", line 183, in _wait_for_deployment_healthy
        raise RuntimeError(
    RuntimeError: Deployment default_Fail is UNHEALTHY: The Deployment failed to start 3 times in a row. This may be due to a problem with the deployment constructor or the initial health check failing. See controller logs for details. Retrying after 1 seconds. Error:
    ray::ServeReplica:default_Fail.is_initialized() (pid=15919, ip=192.168.1.14, repr=<ray.serve._private.replica.ServeReplica:default_Fail object at 0x1257772e0>)
      File "/Users/cindyz/miniforge3/envs/ray/lib/python3.8/concurrent/futures/_base.py", line 437, in result
        return self.__get_result()
      File "/Users/cindyz/miniforge3/envs/ray/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
        raise self._exception
      File "/Users/cindyz/ray/python/ray/serve/_private/replica.py", line 234, in is_initialized
        await self._initialize_replica()
      File "/Users/cindyz/ray/python/ray/serve/_private/replica.py", line 150, in initialize_replica
        await sync_to_async(_callable.__init__)(*init_args, **init_kwargs)
      File "/Users/cindyz/Desktop/constructor_fail.py", line 16, in __init__
        raise Exception("I need to know about this!")
    Exception: I need to know about this!
  deployment_timestamp: 1682476137.8513532
deployment_statuses:
- name: default_Fail
  status: UNHEALTHY
  message: |-
    The Deployment failed to start 3 times in a row. This may be due to a problem with the deployment constructor or the initial health check failing. See controller logs for details. Retrying after 1 seconds. Error:
    ray::ServeReplica:default_Fail.is_initialized() (pid=15919, ip=192.168.1.14, repr=<ray.serve._private.replica.ServeReplica:default_Fail object at 0x1257772e0>)
      File "/Users/cindyz/miniforge3/envs/ray/lib/python3.8/concurrent/futures/_base.py", line 437, in result
        return self.__get_result()
      File "/Users/cindyz/miniforge3/envs/ray/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
        raise self._exception
      File "/Users/cindyz/ray/python/ray/serve/_private/replica.py", line 234, in is_initialized
        await self._initialize_replica()
      File "/Users/cindyz/ray/python/ray/serve/_private/replica.py", line 150, in initialize_replica
        await sync_to_async(_callable.__init__)(*init_args, **init_kwargs)
      File "/Users/cindyz/Desktop/constructor_fail.py", line 16, in __init__
        raise Exception("I need to know about this!")
    Exception: I need to know about this!
```

2. Serializes exceptions from the replica actor, so that they are displayed properly when surfaced through the controller.
  • Loading branch information
zcin authored and architkulkarni committed May 16, 2023
1 parent 3139e84 commit 26f3c35
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 38 deletions.
42 changes: 30 additions & 12 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
record_extra_usage_tag,
)
from ray.actor import ActorHandle
from ray.exceptions import RayActorError, RayError
from ray.exceptions import RayActorError, RayError, RayTaskError

from ray.serve._private.autoscaling_metrics import InMemoryMetricsStore
from ray.serve._private.common import (
Expand Down Expand Up @@ -482,7 +482,7 @@ def recover(self):
else:
self._ready_obj_ref = self._actor_handle.get_metadata.remote()

def check_ready(self) -> ReplicaStartupStatus:
def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]:
"""
Check if current replica has started by making ray API calls on
relevant actor / object ref.
Expand All @@ -499,23 +499,28 @@ def check_ready(self) -> ReplicaStartupStatus:
- replica initialization failed.
SUCCEEDED:
- replica initialization succeeded.
error_msg:
None:
- for PENDING_ALLOCATION, PENDING_INITIALIZATION or SUCCEEDED states
str:
- for FAILED state
"""

# Check whether the replica has been allocated.
if not self._check_obj_ref_ready(self._allocated_obj_ref):
return ReplicaStartupStatus.PENDING_ALLOCATION
return ReplicaStartupStatus.PENDING_ALLOCATION, None

# Check whether relica initialization has completed.
replica_ready = self._check_obj_ref_ready(self._ready_obj_ref)
# In case of deployment constructor failure, ray.get will help to
# surface exception to each update() cycle.
if not replica_ready:
return ReplicaStartupStatus.PENDING_INITIALIZATION
return ReplicaStartupStatus.PENDING_INITIALIZATION, None
else:
try:
# TODO(simon): fully implement reconfigure for Java replicas.
if self._is_cross_language:
return ReplicaStartupStatus.SUCCEEDED
return ReplicaStartupStatus.SUCCEEDED, None

# todo: The replica's userconfig whitch java client created
# is different from the controller's userconfig
Expand All @@ -525,14 +530,23 @@ def check_ready(self) -> ReplicaStartupStatus:
self._pid, self._actor_id, self._node_id, self._node_ip = ray.get(
self._allocated_obj_ref
)
except Exception:
except RayTaskError as e:
logger.exception(
f"Exception in replica '{self._replica_tag}', "
"the replica will be stopped."
)
return ReplicaStartupStatus.FAILED
# NOTE(zcin): we should use str(e) instead of traceback.format_exc()
# here because the full details of the error is not displayed properly
# with traceback.format_exc().
return ReplicaStartupStatus.FAILED, str(e.as_instanceof_cause())
except Exception as e:
logger.exception(
f"Exception in replica '{self._replica_tag}', "
"the replica will be stopped."
)
return ReplicaStartupStatus.FAILED, repr(e)

return ReplicaStartupStatus.SUCCEEDED
return ReplicaStartupStatus.SUCCEEDED, None

@property
def actor_resources(self) -> Optional[Dict[str, float]]:
Expand Down Expand Up @@ -804,7 +818,7 @@ def recover(self):
# Replica version is fetched from recovered replica dynamically in
# check_started() below

def check_started(self) -> ReplicaStartupStatus:
def check_started(self) -> Tuple[ReplicaStartupStatus, Optional[str]]:
"""Check if the replica has started. If so, transition to RUNNING.
Should handle the case where the replica has already stopped.
Expand Down Expand Up @@ -1046,6 +1060,7 @@ def __init__(
self._last_retry: float = 0.0
self._backoff_time_s: int = 1
self._replica_constructor_retry_counter: int = 0
self._replica_constructor_error_msg: Optional[str] = None
self._replicas: ReplicaStateContainer = ReplicaStateContainer()
self._curr_status_info: DeploymentStatusInfo = DeploymentStatusInfo(
self._name, DeploymentStatus.UPDATING
Expand Down Expand Up @@ -1543,8 +1558,10 @@ def _check_curr_status(self) -> Tuple[bool, bool]:
message=(
f"The Deployment failed to start {failed_to_start_count} times "
"in a row. This may be due to a problem with the deployment "
"constructor or the initial health check failing. See logs for "
f"details. Retrying after {self._backoff_time_s} seconds."
"constructor or the initial health check failing. See "
"controller logs for details. Retrying after "
f"{self._backoff_time_s} seconds. Error:\n"
f"{self._replica_constructor_error_msg}"
),
)
return False, any_replicas_recovering
Expand Down Expand Up @@ -1589,7 +1606,7 @@ def _check_startup_replicas(
transitioned_to_running = False
replicas_failed = False
for replica in self._replicas.pop(states=[original_state]):
start_status = replica.check_started()
start_status, error_msg = replica.check_started()
if start_status == ReplicaStartupStatus.SUCCEEDED:
# This replica should be now be added to handle's replica
# set.
Expand All @@ -1604,6 +1621,7 @@ def _check_startup_replicas(
if self._replica_constructor_retry_counter >= 0:
# Increase startup failure counter if we're tracking it
self._replica_constructor_retry_counter += 1
self._replica_constructor_error_msg = error_msg

replicas_failed = True
self._stop_replica(replica)
Expand Down
28 changes: 17 additions & 11 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pickle
import time
from typing import Any, Callable, Optional, Tuple, Dict
import traceback

import starlette.responses

Expand Down Expand Up @@ -236,21 +237,26 @@ async def is_initialized(
):
# Unused `_after` argument is for scheduling: passing an ObjectRef
# allows delaying reconfiguration until after this call has returned.
await self._initialize_replica()

metadata = await self.reconfigure(deployment_config)

# A new replica should not be considered healthy until it passes an
# initial health check. If an initial health check fails, consider
# it an initialization failure.
await self.check_health()
return metadata
try:
await self._initialize_replica()
metadata = await self.reconfigure(deployment_config)

# A new replica should not be considered healthy until it passes an
# initial health check. If an initial health check fails, consider
# it an initialization failure.
await self.check_health()
return metadata
except Exception:
raise RuntimeError(traceback.format_exc()) from None

async def reconfigure(
self, deployment_config: DeploymentConfig
) -> Tuple[DeploymentConfig, DeploymentVersion]:
await self.replica.reconfigure(deployment_config)
return await self.get_metadata()
try:
await self.replica.reconfigure(deployment_config)
return await self.get_metadata()
except Exception:
raise RuntimeError(traceback.format_exc()) from None

async def get_metadata(
self,
Expand Down
2 changes: 2 additions & 0 deletions python/ray/serve/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def process_dict_for_yaml_dump(data):
for k, v in data.items():
if isinstance(v, dict):
data[k] = process_dict_for_yaml_dump(v)
if isinstance(v, list):
data[k] = [process_dict_for_yaml_dump(item) for item in v]
elif isinstance(v, str):
data[k] = remove_ansi_escape_sequences(v)

Expand Down
66 changes: 57 additions & 9 deletions python/ray/serve/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,22 +517,20 @@ def test_status_error_msg_format(ray_start_stop):

subprocess.check_output(["serve", "deploy", config_file_name])

status_response = subprocess.check_output(
["serve", "status", "-a", "http://localhost:52365/"]
)
serve_status = yaml.safe_load(status_response)
print("serve_status", serve_status)

def check_for_failed_deployment():
serve_status = yaml.safe_load(
subprocess.check_output(
["serve", "status", "-a", "http://localhost:52365/"]
)
)
app_status = ServeSubmissionClient("http://localhost:52365").get_status()
return (
len(serve_status["deployment_statuses"]) == 0
and serve_status["app_status"]["status"] == "DEPLOY_FAILED"
serve_status["app_status"]["status"] == "DEPLOY_FAILED"
and remove_ansi_escape_sequences(app_status["app_status"]["message"])
in serve_status["app_status"]["message"]
)

wait_for_condition(check_for_failed_deployment, timeout=2)
wait_for_condition(check_for_failed_deployment)


@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
Expand Down Expand Up @@ -579,6 +577,56 @@ def check_for_failed_deployment():
wait_for_condition(check_for_failed_deployment)


@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_status_constructor_error(ray_start_stop):
"""Deploys Serve deployment that errors out in constructor, checks that the
traceback is surfaced.
"""

config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "deployment_fail.yaml"
)

subprocess.check_output(["serve", "deploy", config_file_name])

def check_for_failed_deployment():
status_response = subprocess.check_output(
["serve", "status", "-a", "http://localhost:52365/"]
)
serve_status = yaml.safe_load(status_response)
return (
serve_status["app_status"]["status"] == "DEPLOY_FAILED"
and "ZeroDivisionError" in serve_status["deployment_statuses"][0]["message"]
)

wait_for_condition(check_for_failed_deployment)


@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_status_package_unavailable_in_controller(ray_start_stop):
"""Test that exceptions raised from packages that are installed on deployment actors
but not on controller is serialized and surfaced properly.
"""

config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "sqlalchemy.yaml"
)

subprocess.check_output(["serve", "deploy", config_file_name])

def check_for_failed_deployment():
status_response = subprocess.check_output(
["serve", "status", "-a", "http://localhost:52365/"]
)
serve_status = yaml.safe_load(status_response)
return (
serve_status["app_status"]["status"] == "DEPLOY_FAILED"
and "some_wrong_url" in serve_status["deployment_statuses"][0]["message"]
)

wait_for_condition(check_for_failed_deployment, timeout=15)


@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_status_multi_app(ray_start_stop):
"""Deploys a multi-app config file and checks their status."""
Expand Down
6 changes: 3 additions & 3 deletions python/ray/serve/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,15 @@ def get_replicas(replica_state):
replica = get_replicas(ReplicaState.STARTING)[0]

# currently there are no resources to allocate the replica
assert replica.check_started() == ReplicaStartupStatus.PENDING_ALLOCATION
assert replica.check_started()[0] == ReplicaStartupStatus.PENDING_ALLOCATION

# add the necessary resources to allocate the replica
cluster.add_node(num_cpus=4)
wait_for_condition(lambda: (ray.cluster_resources().get("CPU", 0) >= 4))
wait_for_condition(lambda: (ray.available_resources().get("CPU", 0) >= 2))

def is_replica_pending_initialization():
status = replica.check_started()
status, _ = replica.check_started()
print(status)
return status == ReplicaStartupStatus.PENDING_INITIALIZATION

Expand All @@ -169,7 +169,7 @@ def is_replica_pending_initialization():
# send signal to complete replica intialization
signal.send.remote()
wait_for_condition(
lambda: replica.check_started() == ReplicaStartupStatus.SUCCEEDED
lambda: replica.check_started()[0] == ReplicaStartupStatus.SUCCEEDED
)


Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
import_path: fail.node
import_path: ray.serve.tests.test_config_files.fail.node
11 changes: 10 additions & 1 deletion python/ray/serve/tests/test_config_files/fail.py
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
1 / 0
from ray import serve


@serve.deployment
class A:
def __init__(self):
1 / 0


node = A.bind()
15 changes: 15 additions & 0 deletions python/ray/serve/tests/test_config_files/sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from ray import serve


@serve.deployment
class TestDeployment:
def __init__(self):
from sqlalchemy import create_engine
import pymysql

pymysql.install_as_MySQLdb()

create_engine("mysql://some_wrong_url:3306").connect()


app = TestDeployment.bind()
13 changes: 13 additions & 0 deletions python/ray/serve/tests/test_config_files/sqlalchemy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import_path: ray.serve.tests.test_config_files.sqlalchemy.app

host: 127.0.0.1
port: 8000

deployments:
- name: TestDeployment
num_replicas: 1
ray_actor_options:
runtime_env:
pip:
- PyMySQL
- sqlalchemy==1.3.19
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def check_ready(self) -> ReplicaStartupStatus:
self.recovering = False
self.started = True
self.version = self.starting_version
return ready
return ready, None

def resource_requirements(self) -> Tuple[str, str]:
assert self.started
Expand Down

0 comments on commit 26f3c35

Please sign in to comment.