Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Better surfacing of errors in serve status #34773

Merged
merged 13 commits into from
May 4, 2023
42 changes: 30 additions & 12 deletions python/ray/serve/_private/deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
record_extra_usage_tag,
)
from ray.actor import ActorHandle
from ray.exceptions import RayActorError, RayError
from ray.exceptions import RayActorError, RayError, RayTaskError

from ray.serve._private.autoscaling_metrics import InMemoryMetricsStore
from ray.serve._private.common import (
Expand Down Expand Up @@ -482,7 +482,7 @@ def recover(self):
else:
self._ready_obj_ref = self._actor_handle.get_metadata.remote()

def check_ready(self) -> ReplicaStartupStatus:
def check_ready(self) -> Tuple[ReplicaStartupStatus, Optional[str]]:
"""
Check if current replica has started by making ray API calls on
relevant actor / object ref.
Expand All @@ -499,23 +499,28 @@ def check_ready(self) -> ReplicaStartupStatus:
- replica initialization failed.
SUCCEEDED:
- replica initialization succeeded.
error_msg:
None:
- for PENDING_ALLOCATION, PENDING_INITIALIZATION or SUCCEEDED states
str:
- for FAILED state
"""

# Check whether the replica has been allocated.
if not self._check_obj_ref_ready(self._allocated_obj_ref):
return ReplicaStartupStatus.PENDING_ALLOCATION
return ReplicaStartupStatus.PENDING_ALLOCATION, None

# Check whether relica initialization has completed.
replica_ready = self._check_obj_ref_ready(self._ready_obj_ref)
# In case of deployment constructor failure, ray.get will help to
# surface exception to each update() cycle.
if not replica_ready:
return ReplicaStartupStatus.PENDING_INITIALIZATION
return ReplicaStartupStatus.PENDING_INITIALIZATION, None
else:
try:
# TODO(simon): fully implement reconfigure for Java replicas.
if self._is_cross_language:
return ReplicaStartupStatus.SUCCEEDED
return ReplicaStartupStatus.SUCCEEDED, None

# todo: The replica's userconfig whitch java client created
# is different from the controller's userconfig
Expand All @@ -525,14 +530,23 @@ def check_ready(self) -> ReplicaStartupStatus:
self._pid, self._actor_id, self._node_id, self._node_ip = ray.get(
self._allocated_obj_ref
)
except Exception:
except RayTaskError as e:
logger.exception(
f"Exception in replica '{self._replica_tag}', "
"the replica will be stopped."
)
return ReplicaStartupStatus.FAILED
# NOTE(zcin): we should use str(e) instead of traceback.format_exc()
# here because the full details of the error is not displayed properly
# with traceback.format_exc().
return ReplicaStartupStatus.FAILED, str(e.as_instanceof_cause())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice :)

except Exception as e:
logger.exception(
f"Exception in replica '{self._replica_tag}', "
"the replica will be stopped."
)
return ReplicaStartupStatus.FAILED, repr(e)

return ReplicaStartupStatus.SUCCEEDED
return ReplicaStartupStatus.SUCCEEDED, None

@property
def actor_resources(self) -> Optional[Dict[str, float]]:
Expand Down Expand Up @@ -804,7 +818,7 @@ def recover(self):
# Replica version is fetched from recovered replica dynamically in
# check_started() below

def check_started(self) -> ReplicaStartupStatus:
def check_started(self) -> Tuple[ReplicaStartupStatus, Optional[str]]:
"""Check if the replica has started. If so, transition to RUNNING.

Should handle the case where the replica has already stopped.
Expand Down Expand Up @@ -1046,6 +1060,7 @@ def __init__(
self._last_retry: float = 0.0
self._backoff_time_s: int = 1
self._replica_constructor_retry_counter: int = 0
self._replica_constructor_error_msg: Optional[str] = None
self._replicas: ReplicaStateContainer = ReplicaStateContainer()
self._curr_status_info: DeploymentStatusInfo = DeploymentStatusInfo(
self._name, DeploymentStatus.UPDATING
Expand Down Expand Up @@ -1543,8 +1558,10 @@ def _check_curr_status(self) -> Tuple[bool, bool]:
message=(
f"The Deployment failed to start {failed_to_start_count} times "
"in a row. This may be due to a problem with the deployment "
"constructor or the initial health check failing. See logs for "
f"details. Retrying after {self._backoff_time_s} seconds."
"constructor or the initial health check failing. See "
"controller logs for details. Retrying after "
f"{self._backoff_time_s} seconds. Error:\n"
f"{self._replica_constructor_error_msg}"
),
)
return False, any_replicas_recovering
Expand Down Expand Up @@ -1589,7 +1606,7 @@ def _check_startup_replicas(
transitioned_to_running = False
replicas_failed = False
for replica in self._replicas.pop(states=[original_state]):
start_status = replica.check_started()
start_status, error_msg = replica.check_started()
if start_status == ReplicaStartupStatus.SUCCEEDED:
# This replica should be now be added to handle's replica
# set.
Expand All @@ -1604,6 +1621,7 @@ def _check_startup_replicas(
if self._replica_constructor_retry_counter >= 0:
# Increase startup failure counter if we're tracking it
self._replica_constructor_retry_counter += 1
self._replica_constructor_error_msg = error_msg

replicas_failed = True
self._stop_replica(replica)
Expand Down
28 changes: 17 additions & 11 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pickle
import time
from typing import Any, Callable, Optional, Tuple, Dict
import traceback

import starlette.responses

Expand Down Expand Up @@ -236,21 +237,26 @@ async def is_initialized(
):
# Unused `_after` argument is for scheduling: passing an ObjectRef
# allows delaying reconfiguration until after this call has returned.
await self._initialize_replica()

metadata = await self.reconfigure(deployment_config)

# A new replica should not be considered healthy until it passes an
# initial health check. If an initial health check fails, consider
# it an initialization failure.
await self.check_health()
return metadata
try:
await self._initialize_replica()
metadata = await self.reconfigure(deployment_config)

# A new replica should not be considered healthy until it passes an
# initial health check. If an initial health check fails, consider
# it an initialization failure.
await self.check_health()
return metadata
except Exception:
raise RuntimeError(traceback.format_exc()) from None

async def reconfigure(
self, deployment_config: DeploymentConfig
) -> Tuple[DeploymentConfig, DeploymentVersion]:
await self.replica.reconfigure(deployment_config)
return await self.get_metadata()
try:
await self.replica.reconfigure(deployment_config)
return await self.get_metadata()
except Exception:
raise RuntimeError(traceback.format_exc()) from None

async def get_metadata(
self,
Expand Down
2 changes: 2 additions & 0 deletions python/ray/serve/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ def process_dict_for_yaml_dump(data):
for k, v in data.items():
if isinstance(v, dict):
data[k] = process_dict_for_yaml_dump(v)
if isinstance(v, list):
data[k] = [process_dict_for_yaml_dump(item) for item in v]
elif isinstance(v, str):
data[k] = remove_ansi_escape_sequences(v)

Expand Down
66 changes: 57 additions & 9 deletions python/ray/serve/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,22 +517,20 @@ def test_status_error_msg_format(ray_start_stop):

subprocess.check_output(["serve", "deploy", config_file_name])

status_response = subprocess.check_output(
["serve", "status", "-a", "http://localhost:52365/"]
)
serve_status = yaml.safe_load(status_response)
print("serve_status", serve_status)

def check_for_failed_deployment():
serve_status = yaml.safe_load(
subprocess.check_output(
["serve", "status", "-a", "http://localhost:52365/"]
)
)
app_status = ServeSubmissionClient("http://localhost:52365").get_status()
return (
len(serve_status["deployment_statuses"]) == 0
and serve_status["app_status"]["status"] == "DEPLOY_FAILED"
serve_status["app_status"]["status"] == "DEPLOY_FAILED"
and remove_ansi_escape_sequences(app_status["app_status"]["message"])
in serve_status["app_status"]["message"]
)

wait_for_condition(check_for_failed_deployment, timeout=2)
wait_for_condition(check_for_failed_deployment)


@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
Expand Down Expand Up @@ -579,6 +577,56 @@ def check_for_failed_deployment():
wait_for_condition(check_for_failed_deployment)


@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_status_constructor_error(ray_start_stop):
"""Deploys Serve deployment that errors out in constructor, checks that the
traceback is surfaced.
"""

config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "deployment_fail.yaml"
)

subprocess.check_output(["serve", "deploy", config_file_name])

def check_for_failed_deployment():
status_response = subprocess.check_output(
["serve", "status", "-a", "http://localhost:52365/"]
)
serve_status = yaml.safe_load(status_response)
return (
serve_status["app_status"]["status"] == "DEPLOY_FAILED"
and "ZeroDivisionError" in serve_status["deployment_statuses"][0]["message"]
)

wait_for_condition(check_for_failed_deployment)


@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_status_package_unavailable_in_controller(ray_start_stop):
"""Test that exceptions raised from packages that are installed on deployment actors
but not on controller is serialized and surfaced properly.
"""

config_file_name = os.path.join(
os.path.dirname(__file__), "test_config_files", "sqlalchemy.yaml"
)

subprocess.check_output(["serve", "deploy", config_file_name])

def check_for_failed_deployment():
status_response = subprocess.check_output(
["serve", "status", "-a", "http://localhost:52365/"]
)
serve_status = yaml.safe_load(status_response)
return (
serve_status["app_status"]["status"] == "DEPLOY_FAILED"
and "some_wrong_url" in serve_status["deployment_statuses"][0]["message"]
)

wait_for_condition(check_for_failed_deployment, timeout=15)


@pytest.mark.skipif(sys.platform == "win32", reason="File path incorrect on Windows.")
def test_status_multi_app(ray_start_stop):
"""Deploys a multi-app config file and checks their status."""
Expand Down
6 changes: 3 additions & 3 deletions python/ray/serve/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,15 @@ def get_replicas(replica_state):
replica = get_replicas(ReplicaState.STARTING)[0]

# currently there are no resources to allocate the replica
assert replica.check_started() == ReplicaStartupStatus.PENDING_ALLOCATION
assert replica.check_started()[0] == ReplicaStartupStatus.PENDING_ALLOCATION

# add the necessary resources to allocate the replica
cluster.add_node(num_cpus=4)
wait_for_condition(lambda: (ray.cluster_resources().get("CPU", 0) >= 4))
wait_for_condition(lambda: (ray.available_resources().get("CPU", 0) >= 2))

def is_replica_pending_initialization():
status = replica.check_started()
status, _ = replica.check_started()
print(status)
return status == ReplicaStartupStatus.PENDING_INITIALIZATION

Expand All @@ -169,7 +169,7 @@ def is_replica_pending_initialization():
# send signal to complete replica intialization
signal.send.remote()
wait_for_condition(
lambda: replica.check_started() == ReplicaStartupStatus.SUCCEEDED
lambda: replica.check_started()[0] == ReplicaStartupStatus.SUCCEEDED
)


Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
import_path: fail.node
import_path: ray.serve.tests.test_config_files.fail.node
11 changes: 10 additions & 1 deletion python/ray/serve/tests/test_config_files/fail.py
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
1 / 0
from ray import serve


@serve.deployment
class A:
def __init__(self):
1 / 0


node = A.bind()
15 changes: 15 additions & 0 deletions python/ray/serve/tests/test_config_files/sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from ray import serve


@serve.deployment
class TestDeployment:
def __init__(self):
from sqlalchemy import create_engine
import pymysql

pymysql.install_as_MySQLdb()

create_engine("mysql://some_wrong_url:3306").connect()


app = TestDeployment.bind()
13 changes: 13 additions & 0 deletions python/ray/serve/tests/test_config_files/sqlalchemy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import_path: ray.serve.tests.test_config_files.sqlalchemy.app

host: 127.0.0.1
port: 8000

deployments:
- name: TestDeployment
num_replicas: 1
ray_actor_options:
runtime_env:
pip:
- PyMySQL
- sqlalchemy==1.3.19
2 changes: 1 addition & 1 deletion python/ray/serve/tests/test_deployment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def check_ready(self) -> ReplicaStartupStatus:
self.recovering = False
self.started = True
self.version = self.starting_version
return ready
return ready, None

def resource_requirements(self) -> Tuple[str, str]:
assert self.started
Expand Down