Skip to content

Commit

Permalink
[Serve] Autoscaling for deployment graph (#25424)
Browse files Browse the repository at this point in the history
  • Loading branch information
sihanwang41 authored Jun 10, 2022
1 parent 1483c45 commit 2546fbf
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 79 deletions.
8 changes: 8 additions & 0 deletions python/ray/serve/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,14 @@ py_test(
deps = [":serve_lib"],
)

py_test(
name = "test_deployment_graph_autoscaling",
size = "medium",
srcs = serve_tests_srcs,
tags = ["exclusive", "team:serve"],
deps = [":serve_lib"],
)

py_test(
name = "test_deployment_graph_classmethod",
size = "medium",
Expand Down
69 changes: 3 additions & 66 deletions python/ray/serve/controller.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
from collections import defaultdict
from copy import copy
import json
import logging
import traceback
Expand All @@ -14,7 +13,6 @@
from ray._private.utils import import_attr
from ray.exceptions import RayTaskError

from ray.serve.autoscaling_metrics import InMemoryMetricsStore
from ray.serve.autoscaling_policy import BasicAutoscalingPolicy
from ray.serve.common import (
DeploymentInfo,
Expand Down Expand Up @@ -130,10 +128,6 @@ async def __init__(
# Unix timestamp of latest config deployment request. Defaults to 0.
self.deployment_timestamp = 0

# TODO(simon): move autoscaling related stuff into a manager.
self.autoscaling_metrics_store = InMemoryMetricsStore()
self.handle_metrics_store = InMemoryMetricsStore()

asyncio.get_event_loop().create_task(self.run_control_loop())

def check_alive(self) -> None:
Expand All @@ -144,13 +138,13 @@ def get_pid(self) -> int:
return os.getpid()

def record_autoscaling_metrics(self, data: Dict[str, float], send_timestamp: float):
self.autoscaling_metrics_store.add_metrics_point(data, send_timestamp)
self.deployment_state_manager.record_autoscaling_metrics(data, send_timestamp)

def record_handle_metrics(self, data: Dict[str, float], send_timestamp: float):
self.handle_metrics_store.add_metrics_point(data, send_timestamp)
self.deployment_state_manager.record_handle_metrics(data, send_timestamp)

def _dump_autoscaling_metrics_for_testing(self):
return self.autoscaling_metrics_store.data
return self.deployment_state_manager.get_autoscaling_metrics()

def _dump_replica_states_for_testing(self, deployment_name):
return self.deployment_state_manager._deployment_states[
Expand Down Expand Up @@ -192,68 +186,11 @@ def get_http_proxy_names(self) -> bytes:
)
return actor_name_list.SerializeToString()

def autoscale(self) -> None:
"""Updates autoscaling deployments with calculated num_replicas."""
for deployment_name, (
deployment_info,
route_prefix,
) in self.list_deployments_internal().items():
deployment_config = deployment_info.deployment_config
autoscaling_policy = deployment_info.autoscaling_policy

if autoscaling_policy is None:
continue

replicas = self.deployment_state_manager._deployment_states[
deployment_name
]._replicas
running_replicas = replicas.get([ReplicaState.RUNNING])

current_num_ongoing_requests = []
for replica in running_replicas:
replica_tag = replica.replica_tag
num_ongoing_requests = self.autoscaling_metrics_store.window_average(
replica_tag,
time.time() - autoscaling_policy.config.look_back_period_s,
)
if num_ongoing_requests is not None:
current_num_ongoing_requests.append(num_ongoing_requests)

current_handle_queued_queries = self.handle_metrics_store.max(
deployment_name,
time.time() - autoscaling_policy.config.look_back_period_s,
)

if current_handle_queued_queries is None:
current_handle_queued_queries = 0

new_deployment_config = deployment_config.copy()

decision_num_replicas = autoscaling_policy.get_decision_num_replicas(
curr_target_num_replicas=deployment_config.num_replicas,
current_num_ongoing_requests=current_num_ongoing_requests,
current_handle_queued_queries=current_handle_queued_queries,
)

if decision_num_replicas == deployment_config.num_replicas:
continue

new_deployment_config.num_replicas = decision_num_replicas

new_deployment_info = copy(deployment_info)
new_deployment_info.deployment_config = new_deployment_config

self.deployment_state_manager.deploy(deployment_name, new_deployment_info)

async def run_control_loop(self) -> None:
# NOTE(edoakes): we catch all exceptions here and simply log them,
# because an unhandled exception would cause the main control loop to
# halt, which should *never* happen.
while True:
try:
self.autoscale()
except Exception:
logger.exception("Exception in autoscaling.")

async with self.write_lock:
try:
Expand Down
20 changes: 10 additions & 10 deletions python/ray/serve/deployment.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from copy import copy
import inspect
import logging
from typing import (
Any,
Callable,
Expand All @@ -16,6 +17,7 @@
AutoscalingConfig,
DeploymentConfig,
)
from ray.serve.constants import SERVE_LOGGER_NAME
from ray.serve.handle import RayServeHandle, RayServeSyncHandle
from ray.serve.utils import DEFAULT, get_deployment_import_path
from ray.util.annotations import PublicAPI
Expand All @@ -25,6 +27,9 @@
)


logger = logging.getLogger(SERVE_LOGGER_NAME)


@PublicAPI
class Deployment:
def __init__(
Expand Down Expand Up @@ -83,15 +88,6 @@ def __init__(
if init_kwargs is None:
init_kwargs = {}

# TODO(architkulkarni): Enforce that autoscaling_config and
# user-provided num_replicas should be mutually exclusive.
if version is None and config.autoscaling_config is not None:
# TODO(architkulkarni): Remove this restriction.
raise ValueError(
"Currently autoscaling is only supported for "
"versioned deployments. Try @serve.deployment(version=...)."
)

self._func_or_class = func_or_class
self._name = name
self._version = version
Expand Down Expand Up @@ -466,7 +462,11 @@ def deployment_to_schema(d: Deployment) -> DeploymentSchema:
),
init_args=(),
init_kwargs={},
num_replicas=d.num_replicas,
# TODO(Sihan) DeploymentConfig num_replicas and auto_config can be set together
# because internally we use these two field for autoscale and deploy.
# We can improve the code after we separate the user faced deployment config and
# internal deployment config.
num_replicas=None if d._config.autoscaling_config else d.num_replicas,
route_prefix=d.route_prefix,
max_concurrent_queries=d.max_concurrent_queries,
user_config=d.user_config,
Expand Down
130 changes: 129 additions & 1 deletion python/ray/serve/deployment_state.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import copy
from collections import defaultdict, OrderedDict
from enum import Enum
import itertools
Expand All @@ -17,6 +18,7 @@
from ray.util.placement_group import PlacementGroup
from ray import cloudpickle

from ray.serve.autoscaling_metrics import InMemoryMetricsStore
from ray.serve.common import (
DeploymentInfo,
DeploymentStatus,
Expand Down Expand Up @@ -946,6 +948,18 @@ def __init__(
)
self._deleting = False

def should_autoscale(self) -> bool:
"""
Check if the deployment is under autoscaling
"""
return self._target_info.autoscaling_policy is not None

def get_autoscale_metric_lookback_period(self) -> float:
"""
Return the autoscaling metrics look back period
"""
return self._target_info.autoscaling_policy.config.look_back_period_s

def get_target_state_checkpoint_data(self):
"""
Return deployment's target state submitted by user's deployment call.
Expand Down Expand Up @@ -1041,10 +1055,10 @@ def _set_deployment_goal(self, deployment_info: Optional[DeploymentInfo]) -> Non
replica config, if passed in as None, we're marking
target deployment as shutting down.
"""

if deployment_info is not None:
self._target_info = deployment_info
self._target_replicas = deployment_info.deployment_config.num_replicas

self._target_version = DeploymentVersion(
deployment_info.version,
user_config=deployment_info.deployment_config.user_config,
Expand Down Expand Up @@ -1097,6 +1111,43 @@ def deploy(self, deployment_info: DeploymentInfo) -> bool:

return True

def autoscale(
self,
current_num_ongoing_requests: List[float],
current_handle_queued_queries: int,
):
"""
Autoscale the deployment based on metrics
Args:
current_num_ongoing_requests: a list of number of running requests of all
replicas in the deployment
current_handle_queued_queries: The number of handle queued queries,
if there are multiple handles, the max number of queries at
a single handle should be passed in
"""
if self._deleting:
return

autoscaling_policy = self._target_info.autoscaling_policy
# decide num replicas
decision_num_replicas = autoscaling_policy.get_decision_num_replicas(
curr_target_num_replicas=self._target_info.deployment_config.num_replicas,
current_num_ongoing_requests=current_num_ongoing_requests,
current_handle_queued_queries=current_handle_queued_queries,
)
if decision_num_replicas == self._target_info.deployment_config.num_replicas:
return

new_config = copy(self._target_info)
new_config.deployment_config.num_replicas = decision_num_replicas
# Reset constructor retry counter.
self._replica_constructor_retry_counter = 0
if new_config.version is None:
new_config.version = self._target_version.code_version
self._set_deployment_goal(new_config)
self._save_checkpoint_func()

def delete(self) -> None:
self._set_deployment_goal(None)
self._save_checkpoint_func()
Expand Down Expand Up @@ -1570,6 +1621,22 @@ def __init__(

self._recover_from_checkpoint(all_current_actor_names)

# TODO(simon): move autoscaling related stuff into a manager.
self.autoscaling_metrics_store = InMemoryMetricsStore()
self.handle_metrics_store = InMemoryMetricsStore()

def record_autoscaling_metrics(self, data: Dict[str, float], send_timestamp: float):
self.autoscaling_metrics_store.add_metrics_point(data, send_timestamp)

def record_handle_metrics(self, data: Dict[str, float], send_timestamp: float):
self.handle_metrics_store.add_metrics_point(data, send_timestamp)

def get_autoscaling_metrics(self):
"""
Return autoscaling metrics (used for dumping from controller)
"""
return self.autoscaling_metrics_store.data

def _map_actor_names_to_deployment(
self, all_current_actor_names: List[str]
) -> Dict[str, List[str]]:
Expand Down Expand Up @@ -1735,10 +1802,71 @@ def delete_deployment(self, deployment_name: str):
if deployment_name in self._deployment_states:
self._deployment_states[deployment_name].delete()

def get_replica_ongoing_request_metrics(
self, deployment_name: str, look_back_period_s
) -> List[float]:
"""
Return replica average ongoing requests
Args:
deployment_name: deployment name
look_back_period_s: the look back time period to collect the requests
metrics
Returns:
List of ongoing requests, the length of list indicate the number
of replicas
"""

replicas = self._deployment_states[deployment_name]._replicas
running_replicas = replicas.get([ReplicaState.RUNNING])

current_num_ongoing_requests = []
for replica in running_replicas:
replica_tag = replica.replica_tag
num_ongoing_requests = self.autoscaling_metrics_store.window_average(
replica_tag,
time.time() - look_back_period_s,
)
if num_ongoing_requests is not None:
current_num_ongoing_requests.append(num_ongoing_requests)
return current_num_ongoing_requests

def get_handle_queueing_metrics(
self, deployment_name: str, look_back_period_s
) -> int:
"""
Return handle queue length metrics
Args:
deployment_name: deployment name
look_back_period_s: the look back time period to collect the requests
metrics
Returns:
if multiple handles queue length, return the max number of queue length.
"""
current_handle_queued_queries = self.handle_metrics_store.max(
deployment_name,
time.time() - look_back_period_s,
)

if current_handle_queued_queries is None:
current_handle_queued_queries = 0
return current_handle_queued_queries

def update(self):
"""Updates the state of all deployments to match their goal state."""
deleted_tags = []
for deployment_name, deployment_state in self._deployment_states.items():
if deployment_state.should_autoscale():
current_num_ongoing_requests = self.get_replica_ongoing_request_metrics(
deployment_name,
deployment_state.get_autoscale_metric_lookback_period(),
)
current_handle_queued_queries = self.get_handle_queueing_metrics(
deployment_name,
deployment_state.get_autoscale_metric_lookback_period(),
)
deployment_state.autoscale(
current_num_ongoing_requests, current_handle_queued_queries
)
deleted = deployment_state.update()
if deleted:
deleted_tags.append(deployment_name)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/serve/tests/test_autoscaling_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,12 @@ def __call__(self):
assert get_deployment_start_time(controller, A) == start_time


@mock.patch.object(ServeController, "autoscale")
@mock.patch.object(ServeController, "run_control_loop")
def test_initial_num_replicas(mock, serve_instance):
"""assert that the inital amount of replicas a deployment is launched with
respects the bounds set by autoscaling_config.
For this test we mock out the autoscaling loop, make sure the number of
For this test we mock out the run event loop, make sure the number of
replicas is set correctly before we hit the autoscaling procedure.
"""

Expand Down
Loading

0 comments on commit 2546fbf

Please sign in to comment.