Skip to content

Commit

Permalink
[serve] Add ensemble model example to docs (#22771)
Browse files Browse the repository at this point in the history
Added ensemble model examples to the Documentation. That was needed, due to a user request and there was no methodology outlining the creation of higher level ensemble models.

Co-authored-by: Jiao Dong <[email protected]>
  • Loading branch information
frosk1 and jiaodong authored Mar 25, 2022
1 parent e109c13 commit f78404d
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 1 deletion.
28 changes: 28 additions & 0 deletions doc/source/serve/ml-models.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This section should help you:

- batch requests to optimize performance
- serve multiple models by composing deployments
- serve multiple models by making ensemble deployments

.. contents::

Expand Down Expand Up @@ -73,6 +74,33 @@ That's it. Let's take a look at an example:

.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_model_composition.py

.. _serve-model-ensemble:

Model Ensemble
=================

Ray Serve supports creating different ensemble models

To define an ensemble of different models you need to do three things:

1. Define your underlying sub models (the ones that make up the ensemble) as
Ray Serve deployments.
2. Define your ensemble model, using the handles of the underlying models
(see the example below).
3. Define a deployment representing this ensemble model and query it!

In order to avoid synchronous execution in the ensemble model, you'll need to make
the function asynchronous by using an ``async def``. In contrast to a composition model,
within an ensemble model, you want to call **all** sub models in parallel. This will be
achieved by sending all prediction calls to the sub models via async by using
``asyncio.wait()``. Each serve deployment used in an ensemble use case is independently
scalable via changing ``num_replicas``.

That's it. Let's take a look at an example:

.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_model_ensemble.py


Integration with Model Registries
=================================

Expand Down
8 changes: 8 additions & 0 deletions python/ray/serve/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,14 @@ py_test(
deps = [":serve_lib"],
)

py_test(
name = "snippet_model_ensemble",
size = "small",
srcs = glob(["examples/doc/*.py"]),
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"]
)

py_test(
name = "test_json_serde",
size = "medium",
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/examples/doc/snippet_model_composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def __call__(self, starlette_request):
if __name__ == "__main__":

# Start ray with 8 processes.
if ray.is_initialized:
if ray.is_initialized():
ray.shutdown()
ray.init(num_cpus=8)
serve.start()
Expand Down
70 changes: 70 additions & 0 deletions python/ray/serve/examples/doc/snippet_model_ensemble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import ray
import time
import asyncio

from ray import serve


@serve.deployment(num_replicas=2)
def model_one(input_data):
print("Model 1 predict")
time.sleep(4)
return 1


@serve.deployment(num_replicas=2)
def model_two(input_data):
print("Model 2 predict")
time.sleep(4)
return 2


@serve.deployment(max_concurrent_queries=10, route_prefix="/composed")
class EnsembleModel:
def __init__(self):
self.model_one = model_one.get_handle()
self.model_two = model_two.get_handle()

async def __call__(self, input_data):
print("Call models concurrently, wait for both to finish")
tasks = [self.model_one.remote(input_data), self.model_two.remote(input_data)]
print("collect models predictions (non-blocking)")
predictions = await asyncio.gather(*tasks)
return predictions


def send_concurrent_model_requests(num_single_model_replicas=2):
ensemble_model = EnsembleModel.get_handle()
all_data = [
ensemble_model.remote(input_data)
for input_data in range(num_single_model_replicas)
]
all_predictions = ray.get(all_data)
print(all_predictions)


if __name__ == "__main__":
# start local cluster and ray serve processes
# Start ray with 8 processes.
if ray.is_initialized():
ray.shutdown()
ray.init(num_cpus=8)
serve.start()

# deploy all actors / models
model_one.deploy()
model_two.deploy()
EnsembleModel.deploy()

# Send 2 concurrent requests to the Ensemble Model for predictions.
# This runs 4 seconds in total calling 2 times the ensemble model
# concurrently,
# which calls 2 models in parallel for each call. In total 4 models run
# parallel.
st = time.time()
send_concurrent_model_requests()
print("duration", time.time() - st)

# Output
# [[1, 2], [1, 2], [1, 2], [1, 2], [1, 2]]
# duration 4.015406847000122

0 comments on commit f78404d

Please sign in to comment.