Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] DeploymentResponse._to_object_ref() blocks untill final results from actor #46893

Open
kyle-v6x opened this issue Jul 31, 2024 · 6 comments
Assignees
Labels
bug Something that is supposed to be working; but isn't P0 Issues that should be fixed in short order serve Ray Serve Related Issue

Comments

@kyle-v6x
Copy link

kyle-v6x commented Jul 31, 2024

What happened + What you expected to happen

Since 2.10.0, DeploymentResponses._to_object_ref() functions await the final result from the task, rather than returning the ObjectRef of the running task once it is scheduled. This effectively prevents any Async task from being computed while previous tasks are running (i.e passing by ref).

You can find more details here.

Versions / Dependencies

This is an issue with Ray[Serve]>=2.10.0

Reproduction script

import asyncio
from ray import serve
from ray.serve.handle import DeploymentHandle
import time
from fastapi import FastAPI

app = FastAPI()


async def _to_object_ref_or_gen(
        self,
        hold=False
):
    """
    This replicates the behaviour locally for easier debugging.
    """
    obj_ref_or_gen = await asyncio.wrap_future(self._object_ref_future)
    if hold:
        obj_ref_or_gen = await obj_ref_or_gen.__anext__() # This call blocks until the result
    else:
        obj_ref_or_gen = obj_ref_or_gen.__anext__()

    self._object_ref_or_gen = obj_ref_or_gen

    return self._object_ref_or_gen

@serve.deployment(
    ray_actor_options={"num_cpus": 0},
    num_replicas=1
)
@serve.ingress(app)
class Dispatcher:
    def __init__(self, foo_handler: DeploymentHandle):
        self.foo_handler = foo_handler

    @app.post("/")
    async def entry(self, hold: bool):
        handle = None
        metrics = {}
        try:
            start = time.time()
            handle = self.foo_handler.remote()
            metrics["call_time"], start = time.time() - start, time.time()
            ref = await asyncio.wait_for(_to_object_ref_or_gen(handle, hold), timeout=30)
            metrics["scheduling_time"], start = time.time() - start, time.time()
            result = await ref
            metrics["worker_time"] = time.time() - start
        except TimeoutError:
            if handle is not None:
                handle.cancel()
            raise TimeoutError("Scheduler timeout error. All workers seemingly full.")
        finally:
            print(f"\n\nMetrics: {metrics}\n\n")
        return result


@serve.deployment(
    ray_actor_options={"num_cpus": 0},
    num_replicas=1
)
class Foo:
    def __call__(self):
        time.sleep(10)
        return True


foo = Foo.bind()
service = Dispatcher.bind(foo_handler=foo)


if __name__ == "__main__":
    from ray.cluster_utils import Cluster

    cluster = Cluster(
        initialize_head=True,
        head_node_args={
            "num_cpus": 4,
            "num_gpus": 0,
            "resources": {"head": 1},
            "dashboard_host": "0.0.0.0",
        },
    )
    worker_node = cluster.add_node(
        num_cpus=4,
        num_gpus=2,
        resources={"worker": 1},
    )

    cluster.wait_for_nodes(2)
    deployment = serve.run(service)

    print("\n\nTesting programmatically...\n\n")
    deployment.entry.remote(hold=False).result()
    deployment.entry.remote(hold=True).result()

    input("\n\nTest script completed. Press Enter to shutdown.\n\n")
    serve.shutdown()

Issue Severity

High: It blocks me from completing my task.

@kyle-v6x kyle-v6x added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Jul 31, 2024
@kyle-v6x kyle-v6x changed the title [Serve] DeploymentResponse._to_object_ref() waits for results to return rather than scheduling [Serve] DeploymentResponse._to_object_ref() blocks untill final results from actor Jul 31, 2024
@shrekris-anyscale
Copy link
Contributor

shrekris-anyscale commented Jul 31, 2024

I was able to reproduce the issue too:

Repro
# File name: repro.py

import asyncio
import time


from ray import serve


@serve.deployment(max_ongoing_requests=5)
async def sleepy():
    await asyncio.sleep(3)
    return "Finished!"

app = sleepy.bind()

async def experiment():
    handle = serve.run(app)
    print("Starting experiment.")

    t0 = time.perf_counter()
    deployment_response = handle.remote()
    t1 = time.perf_counter()
    print(f"Finished remote call: {(t1 - t0):.4f}s")

    t2 = time.perf_counter()
    assignment_ref = deployment_response._to_object_ref()
    t3 = time.perf_counter()
    print(f"Created assignment ref: {(t3 - t2):.4f}s")

    t4 = time.perf_counter()
    result_ref = await assignment_ref
    t5 = time.perf_counter()
    print(f"Awaited assignment ref: {(t5 - t4):.4f}s")

    t6 = time.perf_counter()
    result = await result_ref
    t7 = time.perf_counter()
    print(f"Awaited result ref: {(t7 - t6):.4f}s")

    print(f"Final results: {result}")


if __name__ == "__main__":
    asyncio.run(experiment())
% python repro.py

...
Starting experiment.
Finished remote call: 0.0023s
Created assignment ref: 0.0000s
Awaited assignment ref: 3.0111s
Awaited result ref: 0.0005s
Final results: Finished!

Awaiting the assignment_ref in the repro seems to also wait for the request to finish. Instead, it should return as soon as the request has been assigned to a replica.

@zcin
Copy link
Contributor

zcin commented Jul 31, 2024

This effectively prevents any Async task from being computed while previous tasks are running (i.e passing by ref).

Hi @kyle-v6x can you say more about this, and what you are trying to use the _to_object_ref() API for?

@anyscalesam anyscalesam added the serve Ray Serve Related Issue label Jul 31, 2024
@kyle-v6x
Copy link
Author

kyle-v6x commented Aug 1, 2024

@zcin In order to pass the result of a serve replica to an actor async, we need to retrieve the ObjectRef and send it to the actor, as the actor has no concept of a DeploymentResponse. Essentially the use case explained in the documentation here.

Further, if you want to run a task fully async (I know this is not supported officially yet) it makes sense to retrieve/return the task_id for tracking. Since we can't obtain the ObjectRef async, we can't get the task_id.

I'll link this one more time for visibility. I may have time next week to finally build locally and work on a solution, but I can't guaruntee!

@zcin
Copy link
Contributor

zcin commented Aug 6, 2024

Hi @kyle-v6x, your use case makes sense to me, and you're correct that currently that isn't supported in Serve because the second object ref won't be retrieved until the request finishes. We are currently working on a fix!

@GeneDer
Copy link
Contributor

GeneDer commented Aug 21, 2024

fixed by #47209

@GeneDer GeneDer closed this as completed Aug 21, 2024
@GeneDer
Copy link
Contributor

GeneDer commented Aug 21, 2024

Sorry, not resolved yet. This is depended on #46934

@GeneDer GeneDer reopened this Aug 21, 2024
@GeneDer GeneDer added P0 Issues that should be fixed in short order and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Aug 21, 2024
@GeneDer GeneDer self-assigned this Aug 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something that is supposed to be working; but isn't P0 Issues that should be fixed in short order serve Ray Serve Related Issue
Projects
None yet
Development

No branches or pull requests

5 participants