Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] remove support for nested DeploymentResponses #47209

Merged
merged 8 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

22 changes: 2 additions & 20 deletions doc/source/serve/model_composition.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,15 @@ Example:
:language: python
```

## Advanced: Pass a DeploymentResponse in a nested object [DEPRECATED]
## Advanced: Pass a DeploymentResponse in a nested object [FULLY DEPRECATED]

:::{warning}
Passing a `DeploymentResponse` to downstream handle calls in nested objects is deprecated and will be removed in the next release.
Ray Serve will no longer handle converting them to Ray `ObjectRef`s for you.
Passing a `DeploymentResponse` to downstream handle calls in nested objects is fully deprecated and no longer supported.
Please manually use `DeploymentResponse._to_object_ref()` instead to pass the corresponding object reference in nested objects.

Passing a `DeploymentResponse` object as a top-level argument or keyword argument is still supported.
:::

By default, when you pass a `DeploymentResponse` to another `DeploymentHandle` call, Ray Serve passes the result of the `DeploymentResponse` directly to the downstream method once it's ready.
However, in some cases you might want to start executing the downstream call before the result is ready. For example, to do some preprocessing or fetch a file from remote storage.
To accomplish this behavior, pass the `DeploymentResponse` embedded in another Python object, such as a list or dictionary.
When you pass responses in a nested object, Ray Serve replaces them with Ray `ObjectRef`s instead of the resulting value and they can start executing before the result is ready.

The example below has two deployments: a preprocessor and a downstream model that takes the output of the preprocessor.
The downstream model has two methods:

- `pass_by_value` takes the output of the preprocessor "by value," so it doesn't execute until the preprocessor finishes.
- `pass_by_reference` takes the output "by reference," so it gets an `ObjectRef` and executes eagerly.

```{literalinclude} doc_code/model_composition/response_by_reference_example.py
:start-after: __response_by_reference_example_start__
:end-before: __response_by_reference_example_end__
:language: python
```

## Advanced: Convert a DeploymentResponse to a Ray ObjectRef

Under the hood, each `DeploymentResponse` corresponds to a Ray `ObjectRef`, or an `ObjectRefGenerator` for streaming calls.
Expand Down
95 changes: 46 additions & 49 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import ray
from ray.actor import ActorHandle
from ray.dag.py_obj_scanner import _PyObjScanner
from ray.exceptions import ActorDiedError, ActorUnavailableError
from ray.serve._private.common import (
DeploymentHandleSource,
Expand Down Expand Up @@ -437,60 +436,58 @@ def update_deployment_config(self, deployment_config: DeploymentConfig):
async def _resolve_deployment_responses(
self, request_args: Tuple[Any], request_kwargs: Dict[str, Any]
) -> Tuple[Tuple[Any], Dict[str, Any]]:
"""Replaces `DeploymentResponse` objects with their resolved object refs.
"""Replaces top-level `DeploymentResponse` objects with resolved object refs.

Uses the `_PyObjScanner` to find and replace the objects. This
enables composition without explicitly calling `_to_object_ref`.
This enables composition without explicitly calling `_to_object_ref`.
"""
from ray.serve.handle import (
DeploymentResponse,
DeploymentResponseGenerator,
_DeploymentResponseBase,
)
from ray.serve.handle import DeploymentResponse, DeploymentResponseGenerator

scanner = _PyObjScanner(
source_type=(_DeploymentResponseBase, ray.ObjectRef, ray.ObjectRefGenerator)
generator_not_supported_message = (
"Streaming deployment handle results cannot be passed to "
"downstream handle calls. If you have a use case requiring "
"this feature, please file a feature request on GitHub."
)

try:
responses = []
replacement_table = {}
objs = scanner.find_nodes((request_args, request_kwargs))
for obj in objs:
if isinstance(obj, DeploymentResponseGenerator):
raise RuntimeError(
"Streaming deployment handle results cannot be passed to "
"downstream handle calls. If you have a use case requiring "
"this feature, please file a feature request on GitHub."
)
elif isinstance(obj, DeploymentResponse):
responses.append(obj)
if obj not in request_args and obj not in request_kwargs.values():
logger.warning(
"Passing `DeploymentResponse` objects in nested objects to "
"downstream handle calls is deprecated and will not be "
"supported in the future. Pass them as top-level "
"args or kwargs instead."
)

# This is no-op replacing the object with itself. The purpose is to make
# sure both object refs and object ref generator are not getting pinned
# to memory by the scanner and cause memory leak.
# See: https://github.com/ray-project/ray/issues/43248
elif isinstance(obj, (ray.ObjectRef, ray.ObjectRefGenerator)):
replacement_table[obj] = obj

# Gather `DeploymentResponse` object refs concurrently.
if len(responses) > 0:
obj_refs = await asyncio.gather(
*[r._to_object_ref() for r in responses]
)
replacement_table.update((zip(responses, obj_refs)))
new_args = [None for _ in range(len(request_args))]
new_kwargs = {}

arg_tasks = []
response_indices = []
for i, obj in enumerate(request_args):
if isinstance(obj, DeploymentResponseGenerator):
raise RuntimeError(generator_not_supported_message)
elif isinstance(obj, DeploymentResponse):
# Launch async task to convert DeploymentResponse to an object ref, and
# keep track of the argument index in the original `request_args`
response_indices.append(i)
arg_tasks.append(asyncio.create_task(obj._to_object_ref()))
else:
new_args[i] = obj

kwarg_tasks = []
response_keys = []
for k, obj in request_kwargs.items():
if isinstance(obj, DeploymentResponseGenerator):
raise RuntimeError(generator_not_supported_message)
elif isinstance(obj, DeploymentResponse):
# Launch async task to convert DeploymentResponse to an object ref, and
# keep track of the corresponding key in the original `request_kwargs`
response_keys.append(k)
kwarg_tasks.append(asyncio.create_task(obj._to_object_ref()))
else:
new_kwargs[k] = obj

return scanner.replace_nodes(replacement_table)
finally:
# Make the scanner GC-able to avoid memory leaks.
scanner.clear()
# Gather `DeploymentResponse` object refs concurrently.
arg_obj_refs = await asyncio.gather(*arg_tasks)
kwarg_obj_refs = await asyncio.gather(*kwarg_tasks)

# Update new args and new kwargs with resolved object refs
for index, obj_ref in zip(response_indices, arg_obj_refs):
new_args[index] = obj_ref
new_kwargs.update((zip(response_keys, kwarg_obj_refs)))

# Return new args and new kwargs
return new_args, new_kwargs

async def schedule_and_send_request(
self, pr: PendingRequest
Expand Down
9 changes: 9 additions & 0 deletions python/ray/serve/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
inside_ray_client_context,
is_running_in_asyncio_loop,
)
from ray.serve.exceptions import RayServeException
from ray.util import metrics
from ray.util.annotations import DeveloperAPI, PublicAPI

Expand Down Expand Up @@ -530,6 +531,14 @@ def __await__(self):
result = yield from obj_ref.__await__()
return result

def __reduce__(self):
raise RayServeException(
"`DeploymentResponse` is not serializable. If you are passing the "
"`DeploymentResponse` in a nested object (e.g. a list or dictionary) to a "
"downstream deployment handle call, that is no longer supported. Please "
"only pass `DeploymentResponse` objects as top level arguments."
)

def result(self, *, timeout_s: Optional[float] = None) -> Any:
"""Fetch the result of the handle call synchronously.

Expand Down
88 changes: 84 additions & 4 deletions python/ray/serve/tests/test_handle_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ray._private.test_utils import SignalActor, async_wait_for_condition
from ray._private.utils import get_or_create_event_loop
from ray.serve._private.constants import RAY_SERVE_ENABLE_STRICT_MAX_ONGOING_REQUESTS
from ray.serve.exceptions import RayServeException
from ray.serve.handle import (
DeploymentHandle,
DeploymentResponse,
Expand Down Expand Up @@ -97,7 +98,8 @@ async def check_get_deployment_handle(self):
assert handle.remote().result() == "hello"


def test_compose_deployments_in_app(serve_instance):
@pytest.mark.parametrize("arg_type", ["args", "kwargs"])
def test_compose_deployments_in_app(serve_instance, arg_type: str):
"""Test composing deployment handle refs within a deployment."""

@serve.deployment
Expand All @@ -115,7 +117,10 @@ def __init__(self, handle1: DeploymentHandle, handle2: DeploymentHandle):
self._handle2 = handle2

async def __call__(self):
result = await self._handle1.remote(self._handle2.remote("hi"))
if arg_type == "args":
result = await self._handle1.remote(self._handle2.remote("hi"))
else:
result = await self._handle1.remote(inp=self._handle2.remote(inp="hi"))
return f"driver|{result}"

handle = serve.run(
Expand All @@ -127,7 +132,8 @@ async def __call__(self):
assert handle.remote().result() == "driver|downstream1|downstream2|hi"


def test_compose_apps(serve_instance):
@pytest.mark.parametrize("arg_type", ["args", "kwargs"])
def test_compose_apps(serve_instance, arg_type):
"""Test composing deployment handle refs outside of a deployment."""

@serve.deployment
Expand All @@ -141,7 +147,81 @@ def __call__(self, inp: str):
handle1 = serve.run(Deployment.bind("app1"), name="app1", route_prefix="/app1")
handle2 = serve.run(Deployment.bind("app2"), name="app2", route_prefix="/app2")

assert handle1.remote(handle2.remote("hi")).result() == "app1|app2|hi"
if arg_type == "args":
assert handle1.remote(handle2.remote("hi")).result() == "app1|app2|hi"
else:
assert handle1.remote(inp=handle2.remote(inp="hi")).result() == "app1|app2|hi"


def test_compose_args_and_kwargs(serve_instance):
"""Test composing deployment handle refs outside of a deployment."""

@serve.deployment
class Downstream:
def __init__(self, msg: str):
self._msg = msg

def __call__(self, *args, **kwargs):
return {"args": args, "kwargs": kwargs}

@serve.deployment
class Deployment:
def __init__(self, handle1: DeploymentHandle, handle2: DeploymentHandle):
self._handle1 = handle1
self._handle2 = handle2

async def __call__(self):
return await self._handle1.remote(
"0",
"100",
self._handle2.remote("1", "2", a="a", b="b"),
x="x",
y=self._handle2.remote("3", "4", c="c", d="d"),
z="z",
)

handle = serve.run(
Deployment.bind(
Downstream.bind("downstream1"),
Downstream.bind("downstream2"),
),
)

result = handle.remote().result()
assert result["args"] == (
"0",
"100",
{
"args": ("1", "2"),
"kwargs": {"a": "a", "b": "b"},
},
)
assert result["kwargs"] == {
"x": "x",
"y": {
"args": ("3", "4"),
"kwargs": {"c": "c", "d": "d"},
},
"z": "z",
}


def test_nested_deployment_response_error(serve_instance):
"""Test that passing a deployment response in a nested object to a downstream
handle call errors, and with an informative error message."""

@serve.deployment
class Deployment:
def __call__(self, inp: str):
return inp

handle1 = serve.run(Deployment.bind(), name="app1", route_prefix="/app1")
handle2 = serve.run(Deployment.bind(), name="app2", route_prefix="/app2")

with pytest.raises(
RayServeException, match="`DeploymentResponse` is not serializable"
):
handle1.remote([handle2.remote("hi")]).result()


def test_convert_to_object_ref(serve_instance):
Expand Down