Skip to content

Commit

Permalink
[serve] Multi & single deployment large scale test (ray-project#17310)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiaodong authored and stephanie-wang committed Jul 31, 2021
1 parent bd3fb32 commit a359b46
Show file tree
Hide file tree
Showing 6 changed files with 504 additions and 183 deletions.
4 changes: 2 additions & 2 deletions release/serve_tests/compute_tpl_8_cpu.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cloud_id: {{env["ANYSCALE_CLOUD_ID"]}}
region: us-west-2

# 1k max replicas
# 1k max replicas (1000 / 8 = 125 containers needed)
max_workers: 130

head_node_type:
Expand All @@ -11,7 +11,7 @@ head_node_type:

worker_node_types:
- name: worker_node
# 8 cpus, x86, 32G mem, 10Gb NIC, $0.384/hr on demand
# 8 cpus, x86, 32G mem, 10Gb NIC, $0.384/hr on demand
instance_type: m5.2xlarge
min_workers: 130
# 1k max replicas
Expand Down
17 changes: 15 additions & 2 deletions release/serve_tests/serve_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,22 @@
compute_template: compute_tpl_8_cpu.yaml

run:
timeout: 10800
timeout: 7200
long_running: False
script: python workloads/single_deployment_1k_noop_replica.py --run-locally=False
script: python workloads/single_deployment_1k_noop_replica.py

smoke_test:
timeout: 600

- name: multi_deployment_1k_noop_replica
cluster:
app_config: app_config.yaml
compute_template: compute_tpl_8_cpu.yaml

run:
timeout: 7200
long_running: False
script: python workloads/multi_deployment_1k_noop_replica.py

smoke_test:
timeout: 600
178 changes: 178 additions & 0 deletions release/serve_tests/workloads/multi_deployment_1k_noop_replica.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#!/usr/bin/env python3
"""
Benchmark test for multi deployment at 1k no-op replica scale.
1) Start with a single head node.
2) Start 1000 deployments each with 10 no-op replicas
3) Launch wrk in each running node to simulate load balanced request
4) Recursively send queries to random deployments, up to depth=5
5) Run a 10-minute wrk trial on each node, aggregate results.
Report:
per_thread_latency_avg_ms
per_thread_latency_max_ms
per_thread_avg_tps
per_thread_max_tps
per_node_avg_tps
per_node_avg_transfer_per_sec_KB
cluster_total_thoughput
cluster_total_transfer_KB
cluster_max_P50_latency_ms
cluster_max_P75_latency_ms
cluster_max_P90_latency_ms
cluster_max_P99_latency_ms
"""

import click
import math
import os
import random
import ray

from ray import serve
from ray.serve.utils import logger
from serve_test_utils import (
aggregate_all_metrics,
run_wrk_on_all_nodes,
save_test_results,
)
from serve_test_cluster_utils import (
setup_local_single_node_cluster,
setup_anyscale_cluster,
warm_up_one_cluster,
NUM_CPU_PER_NODE,
NUM_CONNECTIONS,
)
from typing import Optional

# Experiment configs
DEFAULT_SMOKE_TEST_NUM_REPLICA = 8
DEFAULT_SMOKE_TEST_NUM_DEPLOYMENTS = 4 # 2 replicas each

# TODO:(jiaodong) We should investigate and change this back to 1k
# for now, we won't get valid latency numbers from wrk at 1k replica
# likely due to request timeout.
DEFAULT_FULL_TEST_NUM_REPLICA = 1000
DEFAULT_FULL_TEST_NUM_DEPLOYMENTS = 100 # 10 replicas each

# Experiment configs - wrk specific
DEFAULT_SMOKE_TEST_TRIAL_LENGTH = "5s"
DEFAULT_FULL_TEST_TRIAL_LENGTH = "10m"


def setup_multi_deployment_replicas(num_replicas, num_deployments):
num_replica_per_deployment = num_replicas // num_deployments
all_deployment_names = [f"Echo_{i+1}" for i in range(num_deployments)]

@serve.deployment(num_replicas=num_replica_per_deployment)
class Echo:
def __init__(self):
self.all_deployment_async_handles = []

def get_random_async_handle(self):
# sync get_handle() and expected to be called only a few times
# during deployment warmup so each deployment has reference to
# all other handles to send recursive inference call
if len(self.all_deployment_async_handles) < len(
all_deployment_names):
deployments = list(serve.list_deployments().values())
self.all_deployment_async_handles = [
deployment.get_handle(sync=False)
for deployment in deployments
]

return random.choice(self.all_deployment_async_handles)

async def handle_request(self, request, depth: int):
# Max recursive call depth reached
if depth > 4:
return "hi"

next_async_handle = self.get_random_async_handle()
obj_ref = await next_async_handle.handle_request.remote(
request, depth + 1)

return await obj_ref

async def __call__(self, request):
return await self.handle_request(request, 0)

for deployment in all_deployment_names:
Echo.options(name=deployment).deploy()


@click.command()
@click.option("--num-replicas", type=int)
@click.option("--num-deployments", type=int)
@click.option("--trial-length", type=str)
def main(num_replicas: Optional[int], num_deployments: Optional[int],
trial_length: Optional[str]):
# Give default cluster parameter values based on smoke_test config
# if user provided values explicitly, use them instead.
# IS_SMOKE_TEST is set by args of releaser's e2e.py
smoke_test = os.environ.get("IS_SMOKE_TEST", "1")
if smoke_test == "1":
num_replicas = num_replicas or DEFAULT_SMOKE_TEST_NUM_REPLICA
num_deployments = num_deployments or DEFAULT_SMOKE_TEST_NUM_DEPLOYMENTS
trial_length = trial_length or DEFAULT_SMOKE_TEST_TRIAL_LENGTH
logger.info(f"Running smoke test with {num_replicas} replicas, "
f"{num_deployments} deployments .. \n")
# Choose cluster setup based on user config. Local test uses Cluster()
# to mock actors that requires # of nodes to be specified, but ray
# client doesn't need to
num_nodes = int(math.ceil(num_replicas / NUM_CPU_PER_NODE))
logger.info(
f"Setting up local ray cluster with {num_nodes} nodes .. \n")
serve_client = setup_local_single_node_cluster(num_nodes)
else:
num_replicas = num_replicas or DEFAULT_FULL_TEST_NUM_REPLICA
num_deployments = num_deployments or DEFAULT_FULL_TEST_NUM_DEPLOYMENTS
trial_length = trial_length or DEFAULT_FULL_TEST_TRIAL_LENGTH
logger.info(f"Running full test with {num_replicas} replicas, "
f"{num_deployments} deployments .. \n")
logger.info("Setting up anyscale ray cluster .. \n")
serve_client = setup_anyscale_cluster()

http_host = str(serve_client._http_config.host)
http_port = str(serve_client._http_config.port)
logger.info(f"Ray serve http_host: {http_host}, http_port: {http_port}")

logger.info(f"Deploying with {num_replicas} target replicas ....\n")
setup_multi_deployment_replicas(num_replicas, num_deployments)

logger.info("Warming up cluster ....\n")
rst_ray_refs = []
for endpoint in serve.list_endpoints().keys():
rst_ray_refs.append(
warm_up_one_cluster.options(num_cpus=0.1).remote(
10, http_host, http_port, endpoint))
for endpoint in ray.get(rst_ray_refs):
logger.info(f"Finished warming up {endpoint}")

logger.info(f"Starting wrk trial on all nodes for {trial_length} ....\n")
# For detailed discussion, see https://github.com/wg/wrk/issues/205
# TODO:(jiaodong) What's the best number to use here ?
all_endpoints = list(serve.list_endpoints().keys())
all_metrics, all_wrk_stdout = run_wrk_on_all_nodes(
trial_length,
NUM_CONNECTIONS,
http_host,
http_port,
all_endpoints=all_endpoints)

aggregated_metrics = aggregate_all_metrics(all_metrics)
logger.info("Wrk stdout on each node: ")
for wrk_stdout in all_wrk_stdout:
logger.info(wrk_stdout)
logger.info("Final aggregated metrics: ")
for key, val in aggregated_metrics.items():
logger.info(f"{key}: {val}")
save_test_results(
aggregated_metrics,
default_output_file="/tmp/multi_deployment_1k_noop_replica.json")


if __name__ == "__main__":
main()
63 changes: 63 additions & 0 deletions release/serve_tests/workloads/serve_test_cluster_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#!/usr/bin/env python3
import ray
import requests

from ray import serve
from ray.cluster_utils import Cluster
from ray.serve.utils import logger
from ray.serve.config import DeploymentMode

# Cluster setup configs
NUM_CPU_PER_NODE = 10
NUM_CONNECTIONS = 100


def setup_local_single_node_cluster(num_nodes):
"""Setup ray cluster locally via ray.init() and Cluster()
Each actor is simulated in local process on single node,
thus smaller scale by default.
"""
cluster = Cluster()
for i in range(num_nodes):
cluster.add_node(
redis_port=6379 if i == 0 else None,
num_cpus=NUM_CPU_PER_NODE,
num_gpus=0,
resources={str(i): 2},
)
ray.init(address=cluster.address, dashboard_host="0.0.0.0")
serve_client = serve.start(
http_options={"location": DeploymentMode.EveryNode})

return serve_client


def setup_anyscale_cluster():
"""Setup ray cluster at anyscale via ray.client()
Note this is by default large scale and should be kicked off
less frequently.
"""
# TODO: Ray client didn't work with releaser script yet because
# we cannot connect to anyscale cluster from its headnode
# ray.client().env({}).connect()
ray.init(address="auto")
serve_client = serve.start(
http_options={"location": DeploymentMode.EveryNode})

return serve_client


@ray.remote
def warm_up_one_cluster(
num_warmup_iterations: int,
http_host: str,
http_port: str,
endpoint: str,
) -> None:
logger.info(f"Warming up {endpoint} ..")
for _ in range(num_warmup_iterations):
resp = requests.get(f"http://{http_host}:{http_port}/{endpoint}").text
logger.info(resp)
return endpoint
Loading

0 comments on commit a359b46

Please sign in to comment.