Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[serve] Add replica queue length caching to replica scheduler #42943

Merged
merged 39 commits into from
Feb 7, 2024

Conversation

edoakes
Copy link
Contributor

@edoakes edoakes commented Feb 2, 2024

Why are these changes needed?

Adds caching logic to avoid actively probing replicas for every request. This is integrated into the existing PowerOfTwoChoicesReplicaScheduler so it can reuse much of the same policy and mechanism (e.g., locality-aware and model multiplexing-aware candidate selection).

The benefits of this change are:

  • Enables strict enforcement of max_concurrent_queries.
  • Reduces proxy-side overhead for scheduling requests.
  • Reduces latency for scheduling requests (in the "happy path," there's no extra RTT).

The changes are as follows:

  • All calls to replicas are now streaming calls, and the first message returned is a system message. The replica uses this message to return its current queue length and reject requests if it's at capacity (max_concurrent_queries). If the replica rejects, the request scheduling procedure will be retried.
  • The replica scheduler maintains a local cache of replica queue lengths. Entries in this cache have a timeout (currently set to 10 seconds). The cache is updated by (1) actively probing replicas and (2) the system response messages mentioned above.
  • When scheduling a request, we first attempt to choose the best replica based on the queue lengths in the cache. If none of the candidates have entries in the cache that are below max_concurrent_queries, we fall back to active probing (as before this PR).

There are two feature flags introduced to control this behavior (both currently off by default):

  • RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE
  • RAY_SERVE_ENABLE_STRICT_MAX_CONCURRENT_QUERIES (implicitly set by the above)

TODOs before merging:

  • Get all existing test cases to pass.
  • Add tests for ReplicaQueueLengthCache.
  • Add general test cases for the caching logic.
  • Run a subset of tests with the feature flag turned on.
  • Add a test case for the emplace_front logic (avoid tail latencies).
  • Add testing for the with_rejection logic.
  • Maybe add a separate feature flag for strict enforcement (decoupled from the caching logic).

TODOs for subsequent PRs (should file follow-up issues):

  • Add a replica-side timestamp to the queue length information to avoid overwriting the cache with stale information.
  • De-duplicate active probes to the same replica.

Related issue number

Closes #42946
Closes #42947

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
@edoakes
Copy link
Contributor Author

edoakes commented Feb 2, 2024

Benchmark results

HTTP no-op latency

Baseline

(ray) eoakes@Edwards-MacBook-Pro-2 serve % RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE=0 python ...
Latency (ms) for noop HTTP requests (num_replicas=1,num_requests=1000):
count    1000.000000
mean        3.838556
std         1.031378
min         3.371375
50%         3.737083
90%         4.060138
95%         4.188520
99%         5.597760
max        31.916375
dtype: float64

With caching

(ray) eoakes@Edwards-MacBook-Pro-2 serve % RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE=1 python _private/benchmarks/http_noop_latency.py --num-requests 1000
...
Latency (ms) for noop HTTP requests (num_replicas=1,num_requests=1000):
count    1000.000000
mean        3.265671
std         0.964305
min         2.864250
50%         3.183500
90%         3.451258
95%         3.539712
99%         3.917807
max        30.811750
dtype: float64

HTTP throughput (using ab)

from ray import serve

@serve.deployment(num_replicas=8, max_concurrent_queries=10)
class A:
    def __call__(self, *args):
        return b"hi"

app = A.bind()

Baseline

RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE=0 serve run noop:app
...
Requests per second:    757.95 [#/sec] (mean)
Time per request:       131.934 [ms] (mean)
Time per request:       1.319 [ms] (mean, across all concurrent requests)
Transfer rate:          141.38 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    1   0.8      0       4
Processing:    27  121  20.4    117     188
Waiting:       27  118  19.9    114     186
Total:         30  121  20.4    118     188
WARNING: The median and mean for the initial connection time are not within a normal deviation
        These results are probably not that reliable.

Percentage of the requests served within a certain time (ms)
  50%    118
  66%    123
  75%    128
  80%    131
  90%    152
  95%    159
  98%    175
  99%    180
 100%    188 (longest request)

With caching

RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE=1 serve run noop:app
...
Requests per second:    990.00 [#/sec] (mean)
Time per request:       101.010 [ms] (mean)
Time per request:       1.010 [ms] (mean, across all concurrent requests)
Transfer rate:          184.66 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    1   0.7      1       3
Processing:    13   97  24.5     95     165
Waiting:       11   92  24.4     90     161
Total:         14   98  24.6     96     166

Percentage of the requests served within a certain time (ms)
  50%     96
  66%    102
  75%    106
  80%    114
  90%    140
  95%    143
  98%    155
  99%    159
 100%    166 (longest request)

HTTP streaming throughput

Baseline

(ray) eoakes@Edwards-MacBook-Pro-2 serve % RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE=0 python _private/benchmarks/streaming/streaming_http_throughput.py
...
HTTP streaming throughput (num_replicas=1, tokens_per_request=1000, batch_size=10, use_intermediate_deployment=False): 228498.48 +- 6799.28 tokens/s

With caching

(ray) eoakes@Edwards-MacBook-Pro-2 serve % RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE=1 python _private/benchmarks/streaming/streaming_http_throughput.py
...
HTTP streaming throughput (num_replicas=1, tokens_per_request=1000, batch_size=10, use_intermediate_deployment=False): 210692.81 +- 5081.87 tokens/s

Handle throughput

Baseline

(ray) eoakes@Edwards-MacBook-Pro-2 serve % RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE=0 python _private/benchmarks/handle_throughput.py
...
DeploymentHandle throughput (num_replicas=1, batch_size=100): 1830.25 +- 5.44 requests/s

With caching

(ray) eoakes@Edwards-MacBook-Pro-2 serve % RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE=1 python _private/benchmarks/handle_throughput.py
...
DeploymentHandle throughput (num_replicas=1, batch_size=100): 1840.15 +- 30.62 requests/s

Handle streaming throughput

Baseline

(ray) eoakes@Edwards-MacBook-Pro-2 serve % RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE=0 python _private/benchmarks/streaming/streaming_handle_throughput.py
...
DeploymentHandle streaming throughput (ASYNC) (num_replicas=1, tokens_per_request=1000, batch_size=10): 11267.96 +- 172.1 tokens/s
(ServeReplica:default:CallerDeployment pid=48752) Individual request quantiles:
(ServeReplica:default:CallerDeployment pid=48752)       P50=676.8099579999998
(ServeReplica:default:CallerDeployment pid=48752)       P75=843.8243122500003
(ServeReplica:default:CallerDeployment pid=48752)       P99=915.2339992500001

With caching

(ray) eoakes@Edwards-MacBook-Pro-2 serve % RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE=1 python _private/benchmarks/streaming/streaming_handle_throughput.py
...
DeploymentHandle streaming throughput (ASYNC) (num_replicas=1, tokens_per_request=1000, batch_size=10): 11671.64 +- 209.44 tokens/s
(ServeReplica:default:CallerDeployment pid=48848) Individual request quantiles:
(ServeReplica:default:CallerDeployment pid=48848)       P50=661.6651875000007
(ServeReplica:default:CallerDeployment pid=48848)       P75=806.0847917499996
(ServeReplica:default:CallerDeployment pid=48848)       P99=911.5834779999999

Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
@edoakes edoakes changed the title [WIP][serve] Add queue length cache to replica scheduler [serve] Add replica queue length caching to replica scheduler Feb 2, 2024
@edoakes edoakes marked this pull request as ready for review February 2, 2024 20:26
@edoakes edoakes requested a review from a team February 2, 2024 20:26
@edoakes
Copy link
Contributor Author

edoakes commented Feb 2, 2024

@ray-project/ray-serve the changes in this PR are ready for an initial review, but please note the TODOs in the description (most notably, I still have a lot of tests to write).

Copy link
Contributor

@GeneDer GeneDer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach LGTM, thanks for making this more efficient Ed!

Copy link
Contributor

@shrekris-anyscale shrekris-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work so far. I haven't looked at the tests, but I left some comments on the implementation.

python/ray/serve/_private/proxy_state.py Outdated Show resolved Hide resolved
python/ray/serve/_private/replica.py Outdated Show resolved Hide resolved
chosen_replica_id = t.replica_id
queue_len = t.result()
result.append((t.replica, queue_len))
self._replica_queue_len_cache.update(r.replica_id, queue_len)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Nit] We calculate the timestamp for all the responses upon update rather than when we actually received the response. This means if there's one really slow replica, then all the replica's timestamps could actually be off by queue_len_response_deadline_s.

Since queue_len_response_deadline_s is pretty low, this shouldn't be a major concern, but if it becomes larger, then the timestamps may be pretty inaccurate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep you're right -- I have a follow-up item to generate all of these timestamps on the replica where possible

Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
@edoakes
Copy link
Contributor Author

edoakes commented Feb 6, 2024

@GeneDer @shrekris-anyscale addressed comments and finished all of the TODOs from the description, PTAL.

Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
Copy link
Contributor

@GeneDer GeneDer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor non-blocking comments, LGTM!

self._pending_requests_to_schedule.append(pending_request)
else:
index = 0
for pr in self._pending_requests_to_fulfill:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocker nitpick, we can probably just use enumerate and so we don't need to track and increment index separately

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, want to mention, since now that we can assume the queue is sorted. We can utilize python's bisect.insort 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nice I hadn't heard of bisect.insort. Looks interesting. I think in this case it might not be ideal because in the common case we should be inserting at or near the front of the queue when going through this slower is_retry path.

Signed-off-by: Edward Oakes <[email protected]>
Copy link
Contributor

@shrekris-anyscale shrekris-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm approving the code changes. I haven't had a chance to review the unit tests.

python/ray/serve/_private/constants.py Outdated Show resolved Hide resolved
python/ray/serve/_private/replica_scheduler/common.py Outdated Show resolved Hide resolved
Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
@edoakes edoakes merged commit d8b0fe9 into ray-project:master Feb 7, 2024
9 checks passed
ratnopamc pushed a commit to ratnopamc/ray that referenced this pull request Feb 11, 2024
…oject#42943)

Adds caching logic to avoid actively probing replicas for every request. This is integrated into the existing PowerOfTwoChoicesReplicaScheduler so it can reuse much of the same policy and mechanism (e.g., locality-aware and model multiplexing-aware candidate selection).

The benefits of this change are:

- Enables strict enforcement of max_concurrent_queries.
- Reduces proxy-side overhead for scheduling requests.
- Reduces latency for scheduling requests (in the "happy path," there's no extra RTT).

The changes are as follows:

- All calls to replicas are now streaming calls, and the first message returned is a system message. The replica uses this message to return its current queue length and reject requests if it's at capacity (max_concurrent_queries). If the replica rejects, the request scheduling procedure will be retried.
- The replica scheduler maintains a local cache of replica queue lengths. Entries in this cache have a timeout (currently set to 10 seconds). The cache is updated by (1) actively probing replicas and (2) the system response messages mentioned above.
- When scheduling a request, we first attempt to choose the best replica based on the queue lengths in the cache. If none of the candidates have entries in the cache that are below max_concurrent_queries, we fall back to active probing (as before this PR).

There are two feature flags introduced to control this behavior (both currently off by default):

- `RAY_SERVE_ENABLE_QUEUE_LENGTH_CACHE`
- `RAY_SERVE_ENABLE_STRICT_MAX_CONCURRENT_QUERIES` (implicitly set by the above)

---------

Signed-off-by: Edward Oakes <[email protected]>
Signed-off-by: Ratnopam Chakrabarti <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants