Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added ensemble model to docs #22771

Merged
merged 11 commits into from
Mar 25, 2022
28 changes: 28 additions & 0 deletions doc/source/serve/ml-models.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ This section should help you:

- batch requests to optimize performance
- serve multiple models by composing deployments
- serve multiple models by making ensemble deployments

.. contents::

Expand Down Expand Up @@ -73,6 +74,33 @@ That's it. Let's take a look at an example:

.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_model_composition.py

.. _serve-model-ensemble:

Model Ensemble
=================

Ray Serve supports creating different ensemble models

To define an ensemble of different models you need to do three things:

1. Define your underlying sub models (the ones that make up the ensemble) as
Ray Serve deployments.
2. Define your ensemble model, using the handles of the underlying models
(see the example below).
3. Define a deployment representing this ensemble model and query it!

In order to avoid synchronous execution in the ensemble model, you'll need to make
the function asynchronous by using an ``async def``. In contrast to a composition model,
within an ensemble model, you want to call **all** sub models in parallel. This will be
achieved by sending all prediction calls to the sub models via async by using
``asyncio.wait()``. Each serve deployment used in an ensemble use case is independently
scalable via changing ``num_replicas``.

That's it. Let's take a look at an example:

.. literalinclude:: ../../../python/ray/serve/examples/doc/snippet_model_ensemble.py


Integration with Model Registries
=================================

Expand Down
8 changes: 8 additions & 0 deletions python/ray/serve/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,14 @@ py_test(
deps = [":serve_lib"],
)

py_test(
name = "snippet_model_ensemble",
size = "small",
srcs = glob(["examples/doc/*.py"]),
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"]
)

py_test(
name = "test_json_serde",
size = "medium",
Expand Down
2 changes: 1 addition & 1 deletion python/ray/serve/examples/doc/snippet_model_composition.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def __call__(self, starlette_request):
if __name__ == "__main__":

# Start ray with 8 processes.
if ray.is_initialized:
if ray.is_initialized():
ray.shutdown()
ray.init(num_cpus=8)
serve.start()
Expand Down
70 changes: 70 additions & 0 deletions python/ray/serve/examples/doc/snippet_model_ensemble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import ray
frosk1 marked this conversation as resolved.
Show resolved Hide resolved
import time
import asyncio

from ray import serve


@serve.deployment(num_replicas=2)
def model_one(input_data):
print("Model 1 predict")
time.sleep(4)
return 1


@serve.deployment(num_replicas=2)
def model_two(input_data):
print("Model 2 predict")
time.sleep(4)
return 2


@serve.deployment(max_concurrent_queries=10, route_prefix="/composed")
class EnsembleModel:
def __init__(self):
self.model_one = model_one.get_handle()
self.model_two = model_two.get_handle()

async def __call__(self, input_data):
print("Call models concurrently, wait for both to finish")
tasks = [self.model_one.remote(input_data), self.model_two.remote(input_data)]
print("collect models predictions (non-blocking)")
predictions = await asyncio.gather(*tasks)
return predictions


def send_concurrent_model_requests(num_single_model_replicas=2):
ensemble_model = EnsembleModel.get_handle()
all_data = [
ensemble_model.remote(input_data)
for input_data in range(num_single_model_replicas)
]
all_predictions = ray.get(all_data)
print(all_predictions)


if __name__ == "__main__":
# start local cluster and ray serve processes
# Start ray with 8 processes.
if ray.is_initialized():
ray.shutdown()
ray.init(num_cpus=8)
serve.start()

# deploy all actors / models
model_one.deploy()
model_two.deploy()
EnsembleModel.deploy()

# Send 2 concurrent requests to the Ensemble Model for predictions.
# This runs 4 seconds in total calling 2 times the ensemble model
# concurrently,
# which calls 2 models in parallel for each call. In total 4 models run
# parallel.
st = time.time()
send_concurrent_model_requests()
print("duration", time.time() - st)

# Output
# [[1, 2], [1, 2], [1, 2], [1, 2], [1, 2]]
# duration 4.015406847000122