Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Add a timeout parameter for scheduling ray tasks to replicas #40995

Open
kyle-v6x opened this issue Nov 7, 2023 · 11 comments
Open

[Serve] Add a timeout parameter for scheduling ray tasks to replicas #40995

kyle-v6x opened this issue Nov 7, 2023 · 11 comments
Labels
enhancement Request for new feature and/or capability @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. P1 Issue that should be fixed within a few weeks serve Ray Serve Related Issue

Comments

@kyle-v6x
Copy link

kyle-v6x commented Nov 7, 2023

Description

Currently, requests waiting to be scheduled to a replica will retry infinitely with a given backoff sequence. While we can set an HTTP timeout in the http settings, this can result in a cluster falling too far behind input requests and critically failing all requests until it scales appropriately.

It would be nice to set an additional timeout for each deployment which represents the maximum time a request can spend waiting to be sent to a deployment. We can then set this value to (HTTP_TIMEOUT - PROCESSING_TIME) and ensure that we are not wasting any time on requests that are going to timeout anyway.

I'm also curious if anyone has implimented this without a separate dispatcher deployment.

class PowerOfTwoChoicesReplicaScheduler(ReplicaScheduler):

Use case

Machine learning inference server with processing time in the seconds. There is a failure mode where the load increases faster than the cold-start times can cope with, and the server get's stuck working on requests it never finishes.

@kyle-v6x kyle-v6x added enhancement Request for new feature and/or capability triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Nov 7, 2023
@shrekris-anyscale shrekris-anyscale added P1 Issue that should be fixed within a few weeks serve Ray Serve Related Issue and removed triage Needs triage (eg: priority, bug/not-bug, and owning component) labels Nov 7, 2023
@anyscalesam
Copy link
Contributor

Chatted with @akshay-anyscale > this is a pretty good idea. @kyle-v6x would you be willing to contribute back here?

@anyscalesam anyscalesam added the @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. label May 17, 2024
@kyle-v6x
Copy link
Author

kyle-v6x commented May 17, 2024

@anyscalesam Sure! My only concern is that from the 2.10 release there is a new parameter which allows us to shed the load based on a maximum replica queue size. In practice, this queue-size based approach probably works for non time-sensitive jobs, but is quite awkward for anything coming from an HTTP request. It can achieve the same thing in practice two ways, but each has a huge issue:

  • Set the max_ongoing_requests to 1 (or the batch size), then manage the queue size using max_queued_requests based on the processing time of max_ongoing_requests.
    • You lose any ability to autoscale as the max_onging_requests is not large enougg for the autoscaler to balance
  • Set max_queued_requests to 1, and manage the deployment as normal using max_ongoing_requests. If more than 1 requests is queued it's immediately tossed out
    • In any real deployment there are bound to be requests coming in close together, and since max_queued_requests is so low, you end up with bulk requests getting rejected that could have probably been processed on time.

Currently, we use the second option with max_ongoing_requests set to some low value like 3 or 4. It's not hard to immagine, however, how this could fail given a sudden rise in requests and replicas.

Personally, I think it would be nice to add something like replica_scheduler_timeout to deployments as well. I'm actually quite curious how others use max_queued_requests in production deployments? (cc. @edoakes )

@edoakes
Copy link
Contributor

edoakes commented May 17, 2024

@anyscalesam Sure! My only concern is that from the 2.10 release there is a new parameter which allows us to shed the load based on a maximum replica queue size.

I don't quite understand why this is a concern for your feature request. max_queued_requests is not really designed to solve the problem you've raised here. It's a stability failsafe to avoid overloading the scheduler: better to actively shed load than end up in a bad state timing out all requests due to excessive load.

As a side note, I would not recommend setting max_queued_requests to 1.

Adding an assignment timeout makes sense. If you're open to contributing it I'd be happy to point you in the right direction.

@kyle-v6x
Copy link
Author

kyle-v6x commented May 17, 2024

@edoakes Thanks for clarifying. Got a little too tunnel-visoned on our own use-case, but I see how max_queued_requests could be useful when you aren't directly controlling the timeout of requests to a backend service.

And yes totally agree regarding setting max_queued_requests to 1. Just wanted to highlight that there is currently no ideal solution, and I don't want to add a whole new deployment parameter if we can somehow fix both failure cases with one.

I'll be away for a few days but happy to work on a solution as soon as I return. Would you recommend adding it as an additional deployment parameter then?

@kyle-v6x
Copy link
Author

One more note. We tried the following pattern, but I haven't dug into whether the returned reference really means that the task has been assigned to a replica.
Before:

ref = await asyncio.wait_for(handle.remote(), timeout=8)

Now:

response = handle.remote()
await asyncio.wait_for(response._to_object_ref(), timeout=8)

@edoakes
Copy link
Contributor

edoakes commented May 17, 2024

response = handle.remote()
await asyncio.wait_for(response._to_object_ref(), timeout=8)

I was going to suggest that you try this pattern. _to_object_ref() returning does indeed mean that the request has been scheduled.

There is one issue here though -- asyncio.wait_for will cancel the underlying task if it doesn't return within the timeout, but that cancelling _to_object_ref() does not actually cancel the underlying request. You'd need to modify this to call response.cancel() upon timeout.

How about you start with that, see if it works for your use case, then we can discuss if/how to add first class support?

@kyle-v6x
Copy link
Author

@edoakes Got it. I'll start there when I return. Thanks for the input!

@kyle-v6x
Copy link
Author

kyle-v6x commented Jun 11, 2024

Finally got around to do some testing.

I tried the following:

response_handle = None
try:
    response_handle = handle.remote() # This is a batched method in practice

    start = time.time()
    response_ref = await asyncio.wait_for(response._to_object_ref(), timeout=n) # This should return once scheduled
    print(f"Scheduling time: {time.time() - start}" )

    start = time.time()
    result = await response_ref # This should return once completed
    print(f"Processing time: {time.time() - start}")

except TimeoutError as e:
    if response_handle is not None:
        response_handle.cancel()

The issues is that the _to_object_ref call seems to be waiting for the final result. This is in ray==2.23.0. This doesn't seem to be intended, perhaps a bug? I'll continue looking into it but would appreciate some advice.

@Stack-Attack
Copy link

Got some time to look into this today. Turns out the above strategy is broken for ray[serve]>=2.10.

Digging into it more:

Since 2.10, Python replicas always return ObjectRefGenerator rather than ObjectRef. This was seemingly to ensure that replicas could return the status of the inner-queue as the first generator return, and the unary data as the second.

In order to deal with the new Generator return, the _to_object_ref_ call was modified to conditionally retrieve the data from the second index of the generator return.
This second await on __anext__() is waiting for the completion of the work, rather than the ObjectRef only. As I haven't built Ray locally yet, I haven't been able to track down the precise source.

Hopefully if I have time in the next few weeks I can build locally and get to the source of the issue. Removing the await from __anext__() works for most cases, but then the return value is not an ObjectRef, so it's not a real solution.

More notes:
The ObjectRef is iterated here, and created from a nearby call.

Rejections are handled here.

@Stack-Attack
Copy link

Finally a script to replicate the issue:

import asyncio
from ray import serve
from ray.serve.handle import DeploymentHandle
import time
from fastapi import FastAPI

app = FastAPI()


async def _to_object_ref_or_gen(
        self,
        hold=False
):
    """
    This replicates the behaviour locally for easier debugging.
    """
    obj_ref_or_gen = await asyncio.wrap_future(self._object_ref_future)
    if hold:
        obj_ref_or_gen = await obj_ref_or_gen.__anext__() # This call blocks until the result
    else:
        obj_ref_or_gen = obj_ref_or_gen.__anext__()

    self._object_ref_or_gen = obj_ref_or_gen

    return self._object_ref_or_gen

@serve.deployment(
    ray_actor_options={"num_cpus": 0},
    num_replicas=1
)
@serve.ingress(app)
class Dispatcher:
    def __init__(self, foo_handler: DeploymentHandle):
        self.foo_handler = foo_handler

    @app.post("/")
    async def entry(self, hold: bool):
        handle = None
        metrics = {}
        try:
            start = time.time()
            handle = self.foo_handler.remote()
            metrics["call_time"], start = time.time() - start, time.time()
            ref = await asyncio.wait_for(_to_object_ref_or_gen(handle, hold), timeout=30)
            metrics["scheduling_time"], start = time.time() - start, time.time()
            result = await ref
            metrics["worker_time"] = time.time() - start
        except TimeoutError:
            if handle is not None:
                handle.cancel()
            raise TimeoutError("Scheduler timeout error. All workers seemingly full.")
        finally:
            print(f"\n\nMetrics: {metrics}\n\n")
        return result


@serve.deployment(
    ray_actor_options={"num_cpus": 0},
    num_replicas=1
)
class Foo:
    def __call__(self):
        time.sleep(10)
        return True


foo = Foo.bind()
service = Dispatcher.bind(foo_handler=foo)


if __name__ == "__main__":
    from ray.cluster_utils import Cluster

    cluster = Cluster(
        initialize_head=True,
        head_node_args={
            "num_cpus": 4,
            "num_gpus": 0,
            "resources": {"head": 1},
            "dashboard_host": "0.0.0.0",
        },
    )
    worker_node = cluster.add_node(
        num_cpus=4,
        num_gpus=2,
        resources={"worker": 1},
    )

    cluster.wait_for_nodes(2)
    deployment = serve.run(service)

    print("\n\nTesting programmatically...\n\n")
    deployment.entry.remote(hold=False).result()
    deployment.entry.remote(hold=True).result()

    input("\n\nTest script completed. Press Enter to shutdown.\n\n")
    serve.shutdown()

(ServeReplica:default:Dispatcher pid=12543) Metrics: {'call_time': 0.008048772811889648, 'scheduling_time': 0.00822901725769043, 'worker_time': 10.042553186416626}
(ServeReplica:default:Dispatcher pid=12543) Metrics: {'call_time': 0.00012302398681640625, 'scheduling_time': 10.04919719696045, 'worker_time': 0.00021195411682128906}

@Stack-Attack
Copy link

Aha. Looks like the source of the issues might be from the ObjectRefGenerator code itself, but can't confirm untill I have time to build and test locally.

# TODO(swang): Avoid fetching the value.

(cc. @stephanie-wang)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Request for new feature and/or capability @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. P1 Issue that should be fixed within a few weeks serve Ray Serve Related Issue
Projects
None yet
Development

No branches or pull requests

5 participants