Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Add experimental support for StreamingResponse using RayObjectRefGenerator #35720

Merged
merged 108 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
108 commits
Select commit Hold shift + click to select a range
452ed1f
initial version
rkooo567 May 12, 2023
3ebe327
in progress.
rkooo567 May 12, 2023
c140a5c
finished basics.
rkooo567 May 12, 2023
b83af80
fix cpp error
rkooo567 May 13, 2023
509b311
working now.
rkooo567 May 13, 2023
d0795e5
Merge branch 'master' into streaming-generator-1
rkooo567 May 13, 2023
f8a90f6
fix a bug
rkooo567 May 13, 2023
0a9169d
Basic version finished.
rkooo567 May 14, 2023
05f468a
[Please Revert] Work e2e.
rkooo567 May 14, 2023
122b705
[Revert Please] Support core worker APIs and a generator.
rkooo567 May 14, 2023
7a8fe2c
fix a bug
rkooo567 May 14, 2023
d880763
Revert "[Revert Please] Support core worker APIs and a generator."
rkooo567 May 14, 2023
f501c22
Revert "[Please Revert] Work e2e."
rkooo567 May 14, 2023
1942394
Merge branch 'master' into streaming-generator-1
rkooo567 May 15, 2023
3e0212e
Fix failing tests.
rkooo567 May 15, 2023
c9a932e
Merge branch 'master' into streaming-generator-2
rkooo567 May 15, 2023
ffe20fd
Merge branch 'streaming-generator-1' into streaming-generator-2
rkooo567 May 15, 2023
0e89ad7
Merge branch 'master' into streaming-generator-3
rkooo567 May 15, 2023
d520e47
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 15, 2023
7610474
Fix
rkooo567 May 15, 2023
aaa0582
Fix a broken test.
rkooo567 May 15, 2023
a52f74b
Merge branch 'streaming-generator-1' into streaming-generator-2
rkooo567 May 15, 2023
37c3bdd
Merge branch 'master' into streaming-generator-1
rkooo567 May 15, 2023
fd83edd
Merge branch 'streaming-generator-1' into streaming-generator-2
rkooo567 May 15, 2023
ef08b64
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 15, 2023
74a2e31
Finished async actor.
rkooo567 May 16, 2023
8b9ba39
Add a unit test.
rkooo567 May 16, 2023
a4b62ac
done
rkooo567 May 16, 2023
d350b5d
Merge branch 'master' into streaming-generator-1
rkooo567 May 16, 2023
9ed05d9
Addressed code review.
rkooo567 May 16, 2023
e2f1980
removed a test file
rkooo567 May 16, 2023
de41fbe
Merge branch 'streaming-generator-1' into streaming-generator-2
rkooo567 May 16, 2023
177ff88
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 16, 2023
805c7bb
Updated
rkooo567 May 16, 2023
fba4a5d
Merge branch 'streaming-generator-3' into streaming-generator-4
rkooo567 May 16, 2023
fa7fe24
done
rkooo567 May 16, 2023
5dc6b98
Fixed a unit test.
rkooo567 May 16, 2023
6e51a5e
Merge branch 'streaming-generator-1' into streaming-generator-2
rkooo567 May 16, 2023
7c449be
fix apis
rkooo567 May 17, 2023
5afd081
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 17, 2023
0e87d73
Merge branch 'streaming-generator-3' into streaming-generator-4
rkooo567 May 17, 2023
d7ebad1
lint.
rkooo567 May 17, 2023
2b046b6
Ready for a benchmark.
rkooo567 May 17, 2023
c726484
Made it work.
rkooo567 May 18, 2023
1151a28
done.
rkooo567 May 18, 2023
f5d3956
Merge branch 'master' into streaming-generator-2
rkooo567 May 18, 2023
1f79ad8
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 18, 2023
5e09c4c
Merge branch 'streaming-generator-3' into streaming-generator-4
rkooo567 May 18, 2023
7113e3d
Merge branch 'streaming-generator-4' into streaming-generator-5
rkooo567 May 18, 2023
c72dc03
Merge branch 'master' into streaming-generator-2
rkooo567 May 19, 2023
b7be576
Addressed code review.
rkooo567 May 19, 2023
e0e74cb
Merge branch 'streaming-generator-2' into streaming-generator-3
rkooo567 May 19, 2023
11686d4
Addressed code review.
rkooo567 May 19, 2023
98d7292
lint
rkooo567 May 19, 2023
4d55f33
Merge branch 'streaming-generator-3' into streaming-generator-4
rkooo567 May 20, 2023
5397b78
Merge branch 'streaming-generator-4' into streaming-generator-5
rkooo567 May 20, 2023
391eb0f
addressed
rkooo567 May 20, 2023
f4415d1
Merge branch 'master' into streaming-generator-5
rkooo567 May 22, 2023
e49d1a5
working
rkooo567 May 22, 2023
ce1e79d
[Revert] Add more complicated tests
rkooo567 May 22, 2023
6c0448b
Addressed code review.
rkooo567 May 23, 2023
9450a07
Merge branch 'master' into streaming-generator-5
rkooo567 May 24, 2023
a827426
WIP
edoakes May 24, 2023
fe6c650
Merge remote-tracking branch 'sang/streaming-generator-5' into stream-fr
edoakes May 24, 2023
e4106b0
fix typing
edoakes May 24, 2023
2d58c66
stuff
edoakes May 24, 2023
a3b5397
add basic tests
edoakes May 24, 2023
ff16f54
nit
edoakes May 24, 2023
a789ecf
Merge branch 'master' of https://github.com/ray-project/ray into stre…
edoakes May 24, 2023
6402eb0
no signoff
edoakes May 24, 2023
aa8823a
replace _event_loop
edoakes May 24, 2023
f21f89c
add sender test
edoakes May 24, 2023
afa8699
more comments
edoakes May 24, 2023
4c3442f
weeee
edoakes May 24, 2023
3d48b01
more tests
edoakes May 24, 2023
88781fb
async/sync
edoakes May 24, 2023
4d6eace
more tests
edoakes May 24, 2023
7a050a7
fix fastapi test
edoakes May 24, 2023
5bde9fd
batch
edoakes May 24, 2023
d16c620
Merge branch 'master' of https://github.com/ray-project/ray into stre…
edoakes May 25, 2023
3aa69b9
fix
edoakes May 25, 2023
61acf41
fix tests
edoakes May 25, 2023
86b741a
fix doc import error
edoakes May 25, 2023
c16f437
skip windows
edoakes May 25, 2023
7c066f8
fix
edoakes May 25, 2023
9db27af
docstring
edoakes May 25, 2023
7f04906
nit
edoakes May 25, 2023
6bfa34f
s/_use_ray_streaming/_stream
edoakes May 25, 2023
5c08172
.format change
edoakes May 25, 2023
31c7820
fix
edoakes May 25, 2023
cc31c93
Merge branch 'master' of https://github.com/ray-project/ray into stre…
edoakes May 25, 2023
3751f89
add docs
edoakes May 25, 2023
8b2df0d
fix fmt
edoakes May 25, 2023
b047feb
fix
edoakes May 25, 2023
34f585e
fix
edoakes May 25, 2023
12271c6
revert
edoakes May 25, 2023
4efcf7b
fix BUILD
edoakes May 25, 2023
97611c5
clearer
edoakes May 25, 2023
5fdc3bc
fix
edoakes May 25, 2023
8617ad5
fix?
edoakes May 25, 2023
1cc5772
comment
edoakes May 25, 2023
d039bb4
fix
edoakes May 25, 2023
8dc379c
Update doc/source/serve/http-guide.md
edoakes May 25, 2023
d5ccabb
Update doc/source/serve/http-guide.md
edoakes May 25, 2023
2d21721
angelina's suggestions
edoakes May 25, 2023
e35fb13
angelina 2
edoakes May 25, 2023
8b5f9be
angelina3
edoakes May 25, 2023
dca76c6
angelina4
edoakes May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion doc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,22 @@ py_test_run_all_subdirectory(
exclude = [
"source/serve/doc_code/distilbert.py",
"source/serve/doc_code/stable_diffusion.py",
"source/serve/doc_code/object_detection.py",
"source/serve/doc_code/object_detection.py",
"source/serve/doc_code/streaming_example.py",
],
extra_srcs = [],
tags = ["exclusive", "team:serve"],
)

py_test_run_all_subdirectory(
size = "medium",
include = ["source/serve/doc_code/streaming_example.py"],
exclude = [],
extra_srcs = [],
tags = ["exclusive", "team:serve"],
env = {"RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING": "1"},
)

py_test_run_all_subdirectory(
size = "medium",
include = [
Expand All @@ -151,6 +161,7 @@ py_test_run_all_subdirectory(




# --------------------------------------------------------------------
# Test all doc/source/tune/doc_code code included in rst/md files.
# --------------------------------------------------------------------
Expand Down
40 changes: 40 additions & 0 deletions doc/source/serve/doc_code/streaming_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# flake8: noqa
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a real model as the example here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather not do that here, it complicates communicating the feature. we can add a full example later


# __begin_example__
import time
from typing import Generator

import requests
from starlette.responses import StreamingResponse
from starlette.requests import Request

from ray import serve


@serve.deployment
class StreamingResponder:
def generate_numbers(self, max: int) -> Generator[str, None, None]:
for i in range(max):
yield str(i)
time.sleep(0.1)

def __call__(self, request: Request) -> StreamingResponse:
max = request.query_params.get("max", "25")
gen = self.generate_numbers(int(max))
return StreamingResponse(gen, status_code=200, media_type="text/plain")


serve.run(StreamingResponder.bind())

r = requests.get("http://localhost:8000?max=10", stream=True)
start = time.time()
r.raise_for_status()
for chunk in r.iter_content(chunk_size=None, decode_unicode=True):
print(f"Got result {round(time.time()-start, 1)}s after start: '{chunk}'")
# __end_example__


r = requests.get("http://localhost:8000?max=10", stream=True)
r.raise_for_status()
for i, chunk in enumerate(r.iter_content(chunk_size=None, decode_unicode=True)):
assert chunk == str(i)
49 changes: 47 additions & 2 deletions doc/source/serve/http-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This section helps you understand how to:
- send HTTP requests to Serve deployments
- use Ray Serve to integrate with FastAPI
- use customized HTTP Adapters
- use customized HTTP adapters
- choose which feature to use for your use case

## Choosing the right HTTP feature
Expand Down Expand Up @@ -74,6 +74,51 @@ Existing middlewares, **automatic OpenAPI documentation generation**, and other
Serve currently does not support WebSockets. If you have a use case that requires it, please [let us know](https://github.com/ray-project/ray/issues/new/choose)!
```

(serve-http-streaming-response)=
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pull this in a separate page for discoverability (or rename this to something better than HTTP Guide)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a lot of user guides and this feels like the natural place for it (where we explain starlette & fastapi behaviors)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no preference on the naming if you have a better one suggest it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we add batching + a real-world example we can have a separate page for that. Not enough time before branch cut for those though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good, we have more time for docs changes btw, so we can do it in the follow-up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok I'll work on it first thing next week then

## Streaming Responses

```{warning}
Support for HTTP streaming responses is experimental. To enable this feature, set `RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING=1` on the cluster before starting Ray. If you encounter any issues, [file an issue on GitHub](https://github.com/ray-project/ray/issues/new/choose).
```

Some applications must stream incremental results back to the caller.
This is common for text generation using large language models (LLMs) or video processing applications.
The full forward pass may take multiple seconds, so providing incremental results as they're available provides a much better user experience.

To use HTTP response streaming, return a [StreamingResponse](https://www.starlette.io/responses/#streamingresponse) that wraps a generator from your HTTP handler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also include setting the env variable and how you would do it in yaml.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just above. it must be set on the cluster, can't be set in the yaml

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how would you set it up on Anyscale Services?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set the env variable in cluster env

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, definitely need to improve in 2.6. Let's document in Anyscale docs too once you're done with the ray changes.

This is supported for basic HTTP ingress deployments using a `__call__` method and when using the [FastAPI integration](serve-fastapi-http).

The code below defines a Serve application that incrementally streams numbers up to a provided `max`.
The client-side code is also updated to handle the streaming outputs.
This code uses the `stream=True` option to the [requests](https://requests.readthedocs.io/en/latest/user/advanced/#streaming-requests) library.

```{literalinclude} ../serve/doc_code/streaming_example.py
:start-after: __begin_example__
:end-before: __end_example__
:language: python
```

Save this code in `stream.py` and run it:

```bash
$ RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING=1 python stream.py
[2023-05-25 10:44:23] INFO ray._private.worker::Started a local Ray instance. View the dashboard at http://127.0.0.1:8265
(ServeController pid=40401) INFO 2023-05-25 10:44:25,296 controller 40401 deployment_state.py:1259 - Deploying new version of deployment default_StreamingResponder.
(HTTPProxyActor pid=40403) INFO: Started server process [40403]
(ServeController pid=40401) INFO 2023-05-25 10:44:25,333 controller 40401 deployment_state.py:1498 - Adding 1 replica to deployment default_StreamingResponder.
Got result 0.0s after start: '0'
Got result 0.1s after start: '1'
Got result 0.2s after start: '2'
Got result 0.3s after start: '3'
Got result 0.4s after start: '4'
Got result 0.5s after start: '5'
Got result 0.6s after start: '6'
Got result 0.7s after start: '7'
Got result 0.8s after start: '8'
Got result 0.9s after start: '9'
(ServeReplica:default_StreamingResponder pid=41052) INFO 2023-05-25 10:49:52,230 default_StreamingResponder default_StreamingResponder#qlZFCa yomKnJifNJ / default replica.py:634 - __CALL__ OK 1017.6ms
```

(serve-http-adapters)=

## HTTP Adapters
Expand Down Expand Up @@ -190,7 +235,7 @@ PredictorDeployment.deploy(..., http_adapter=User)
DAGDriver.bind(other_node, http_adapter=User)

```
### List of Built-in Adapters
### List of built-in adapters

Here is a list of adapters; please feel free to [contribute more](https://github.com/ray-project/ray/issues/new/choose)!

Expand Down
34 changes: 33 additions & 1 deletion python/ray/serve/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ py_test(
deps = [":serve_lib"],
)

py_test(
name = "test_http_util",
size = "small",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)

py_test(
name = "test_advanced",
size = "small",
Expand Down Expand Up @@ -448,6 +456,7 @@ py_test(
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)

# Runs test_api and test_failure with injected failures in the controller.
py_test(
name = "test_controller_crashes",
Expand All @@ -460,6 +469,29 @@ py_test(
deps = [":serve_lib"],
)

# Runs test_api, test_fastapi, and test_http_adapters with experimental streaming turned on.
py_test(
edoakes marked this conversation as resolved.
Show resolved Hide resolved
name = "test_experimental_streaming",
size = "large",
srcs = glob(["tests/test_experimental_streaming.py",
"tests/test_api.py",
"tests/test_fastapi.py",
"tests/test_http_adapters.py",
"**/conftest.py"]),
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
env = {"RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING": "1"},
)

py_test(
name = "test_streaming_response",
size = "large",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
env = {"RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING": "1"},
)

py_test(
name = "test_controller_recovery",
size = "medium",
Expand Down Expand Up @@ -581,4 +613,4 @@ py_test(
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)
)
7 changes: 7 additions & 0 deletions python/ray/serve/_private/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,7 @@ def get_handle(
missing_ok: Optional[bool] = False,
sync: bool = True,
_internal_pickled_http_request: bool = False,
_stream: bool = False,
) -> Union[RayServeHandle, RayServeSyncHandle]:
"""Retrieve RayServeHandle for service deployment to invoke it from Python.

Expand All @@ -450,6 +451,10 @@ def get_handle(
sync: If true, then Serve will return a ServeHandle that
works everywhere. Otherwise, Serve will return a ServeHandle
that's only usable in asyncio loop.
_internal_pickled_http_request: Indicates that this handle will be used
to send HTTP requests from the proxy to ingress deployment replicas.
_stream: Indicates that this handle should use
`num_returns="streaming"`.

Returns:
RayServeHandle
Expand All @@ -469,12 +474,14 @@ def get_handle(
self._controller,
deployment_name,
_internal_pickled_http_request=_internal_pickled_http_request,
_stream=_stream,
)
else:
handle = RayServeHandle(
self._controller,
deployment_name,
_internal_pickled_http_request=_internal_pickled_http_request,
_stream=_stream,
)

self.handle_cache[cache_key] = handle
Expand Down
6 changes: 6 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,9 @@ class ServeHandleType(str, Enum):

# Serve HTTP request header key for routing requests.
SERVE_MULTIPLEXED_MODEL_ID = "serve_multiplexed_model_id"

# Feature flag to enable StreamingResponse support.
# When turned on, *all* HTTP responses will use Ray streaming object refs.
RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING = (
edoakes marked this conversation as resolved.
Show resolved Hide resolved
os.environ.get("RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING", "0") == "1"
)
74 changes: 69 additions & 5 deletions python/ray/serve/_private/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import pickle
import socket
import time
from typing import Callable, List, Dict, Optional, Tuple
from typing import Any, Callable, Dict, List, Optional, Tuple
from ray._private.utils import get_or_create_event_loop

import uvicorn
import starlette.responses
import starlette.routing
from starlette.types import Receive, Scope, Send

import ray
from ray.exceptions import RayActorError, RayTaskError
Expand All @@ -29,9 +30,10 @@
from ray.serve._private.common import EndpointInfo, EndpointTag, ApplicationName
from ray.serve._private.constants import (
SERVE_LOGGER_NAME,
SERVE_MULTIPLEXED_MODEL_ID,
SERVE_NAMESPACE,
DEFAULT_LATENCY_BUCKET_MS,
SERVE_MULTIPLEXED_MODEL_ID,
RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING,
)
from ray.serve._private.long_poll import LongPollClient, LongPollNamespace
from ray.serve._private.logging_utils import (
Expand Down Expand Up @@ -72,6 +74,63 @@
)


async def _handle_streaming_response(
asgi_response_generator: "ray._raylet.StreamingObjectRefGenerator",
scope: Scope,
receive: Receive,
send: Send,
) -> str:
"""Consumes the `asgi_response_generator` and sends its data over `send`.

This function is a proxy for a downstream ASGI response. The passed
generator is expected to return a stream of pickled ASGI messages
(dictionaries) that are sent using the provided ASGI interface.

Exception handling depends on whether the first message has already been sent:
- if an exception happens *before* the first message, a 500 status is sent.
- if an exception happens *after* the first message, the response stream is
terminated.

The difference in behavior is because once the first message has been sent, the
client has already received the status code so we cannot send a `500` (internal
server error).

Returns:
status_code
"""

status_code = ""
try:
async for obj_ref in asgi_response_generator:
asgi_messages: List[Dict[str, Any]] = pickle.loads(await obj_ref)
for asgi_message in asgi_messages:
# There must be exactly one "http.response.start" message that
# always contains the "status" field.
if not status_code:
assert asgi_message["type"] == "http.response.start", (
"First response message must be 'http.response.start'",
)
assert "status" in asgi_message, (
"'http.response.start' message must contain 'status'",
)
status_code = str(asgi_message["status"])
edoakes marked this conversation as resolved.
Show resolved Hide resolved

await send(asgi_message)
except Exception as e:
edoakes marked this conversation as resolved.
Show resolved Hide resolved
error_message = f"Unexpected error, traceback: {e}."
logger.warning(error_message)

if status_code == "":
# If first message hasn't been sent, return 500 status.
await Response(error_message, status_code=500).send(scope, receive, send)
return "500"
else:
# If first message has been sent, terminate the response stream.
return status_code

return status_code


async def _send_request_to_handle(handle, scope, receive, send) -> str:
http_body_bytes = await receive_http_body(scope, receive, send)

Expand Down Expand Up @@ -112,6 +171,11 @@ async def _send_request_to_handle(handle, scope, receive, send) -> str:
try:
object_ref = await assignment_task

if isinstance(object_ref, ray._raylet.StreamingObjectRefGenerator):
return await _handle_streaming_response(
edoakes marked this conversation as resolved.
Show resolved Hide resolved
object_ref, scope, receive, send
)

# NOTE (shrekris-anyscale): when the gcs, Serve controller, and
# some replicas crash simultaneously (e.g. if the head node crashes),
# requests to the dead replicas hang until the gcs recovers.
Expand Down Expand Up @@ -139,8 +203,8 @@ async def _send_request_to_handle(handle, scope, receive, send) -> str:
# Here because the client disconnected, we will return a custom
# error code for metric tracking.
return DISCONNECT_ERROR_CODE
except RayTaskError as error:
error_message = "Task Error. Traceback: {}.".format(error)
except RayTaskError as e:
error_message = f"Unexpected error, traceback: {e}."
await Response(error_message, status_code=500).send(scope, receive, send)
return "500"
except RayActorError:
Expand Down Expand Up @@ -277,6 +341,7 @@ def get_handle(name):
sync=False,
missing_ok=True,
_internal_pickled_http_request=True,
_stream=RAY_SERVE_ENABLE_EXPERIMENTAL_STREAMING,
)

self.prefix_router = LongestPrefixRouter(get_handle)
Expand Down Expand Up @@ -439,7 +504,6 @@ async def __call__(self, scope, receive, send):
ray.serve.context.RequestContext(**request_context_info)
)
status_code = await _send_request_to_handle(handle, scope, receive, send)

self.request_counter.inc(
tags={
"route": route_path,
Expand Down
Loading