Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] introduce abstract interface for data autoscaling #45002

Merged
merged 29 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/ray/data/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,14 @@ py_test(
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_autoscaler",
size = "small",
srcs = ["tests/test_autoscaler.py"],
tags = ["team:data", "exclusive"],
deps = ["//:ray_lib", ":conftest"],
)

py_test(
name = "test_lance",
size = "small",
Expand Down
15 changes: 15 additions & 0 deletions python/ray/data/_internal/execution/autoscaler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .autoscaler import Autoscaler
from .autoscaling_actor_pool import AutoscalingActorPool
from .default_autoscaler import DefaultAutoscaler


def create_autoscaler(topology, resource_manager, execution_id):
return DefaultAutoscaler(topology, resource_manager, execution_id)
bveeramani marked this conversation as resolved.
Show resolved Hide resolved


__all__ = [
"Autoscaler",
"DefaultAutoscaler",
"create_autoscaler",
"AutoscalingActorPool",
]
38 changes: 38 additions & 0 deletions python/ray/data/_internal/execution/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

from ray.util.annotations import DeveloperAPI

if TYPE_CHECKING:
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.execution.streaming_executor_state import Topology


@DeveloperAPI
class Autoscaler(ABC):
"""Abstract interface for Ray Data autoscaler."""

def __init__(
self,
topology: "Topology",
resource_manager: "ResourceManager",
execution_id: str,
):
self._topology = topology
self._resource_manager = resource_manager
self._execution_id = execution_id

@abstractmethod
def try_trigger_scaling(self):
"""Try trigger autoscaling.

This method will be called each time when StreamingExecutor makes
a scheduling decision. A subclass should override this method to
handle the autoscaling of both the cluster and `AutoscalingActorPool`s.
"""
...

@abstractmethod
def on_executor_shutdown(self):
"""Callback when the StreamingExecutor is shutting down."""
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from abc import ABC, abstractmethod

from ray.util.annotations import DeveloperAPI


@DeveloperAPI
class AutoscalingActorPool(ABC):
"""Abstract interface of an autoscaling actor pool.

A `PhysicalOperator` can manage one or more `AutoscalingActorPool`s.
`Autoscaler` is responsible for deciding autoscaling of these actor
pools.
"""

@abstractmethod
def min_size(self) -> int:
"""Min size of the actor pool."""
...

@abstractmethod
def max_size(self) -> int:
"""Max size of the actor pool."""
...

@abstractmethod
def current_size(self) -> int:
"""Current size of the actor pool."""
...

@abstractmethod
def num_running_actors(self) -> int:
"""Number of running actors."""
...

@abstractmethod
def num_active_actors(self) -> int:
"""Number of actors with at least one active task."""
...

@abstractmethod
def num_pending_actors(self) -> int:
"""Number of actors pending creation."""
...

@abstractmethod
def max_tasks_in_flight_per_actor(self) -> int:
"""Max number of in-flight tasks per actor."""
...

@abstractmethod
def current_in_flight_tasks(self) -> int:
"""Number of current in-flight tasks."""
...

def num_total_task_slots(self) -> int:
"""Total number of task slots."""
return self.max_tasks_in_flight_per_actor() * self.current_size()

def num_free_task_slots(self) -> int:
"""Number of free slots to run tasks."""
return (
self.max_tasks_in_flight_per_actor() * self.current_size()
- self.current_in_flight_tasks()
)

@abstractmethod
def scale_up(self, num_actors: int) -> int:
"""Request the actor pool to scale up by the given number of actors.

The number of actually added actors may be less than the requested
number.

Returns:
The number of actors actually added.
"""
...

@abstractmethod
def scale_down(self, num_actors: int) -> int:
"""Request actor pool to scale down by the given number of actors.

The number of actually removed actors may be less than the requested
number.

Returns:
The number of actors actually removed.
"""
...
184 changes: 184 additions & 0 deletions python/ray/data/_internal/execution/autoscaler/default_autoscaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import math
import time
from typing import TYPE_CHECKING, Dict

from .autoscaler import Autoscaler
from .autoscaling_actor_pool import AutoscalingActorPool
from ray.data._internal.execution.autoscaling_requester import (
get_or_create_autoscaling_requester_actor,
)
from ray.data._internal.execution.interfaces.execution_options import ExecutionResources

if TYPE_CHECKING:
from ray.data._internal.execution.interfaces import PhysicalOperator
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.execution.streaming_executor_state import OpState, Topology


class DefaultAutoscaler(Autoscaler):

# Default threshold of actor pool utilization to trigger scaling up.
DEFAULT_ACTOR_POOL_SCALING_UP_THRESHOLD: float = 0.8
# Default threshold of actor pool utilization to trigger scaling down.
DEFAULT_ACTOR_POOL_SCALING_DOWN_THRESHOLD: float = 0.5

# Min number of seconds between two autoscaling requests.
MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS = 20

def __init__(
self,
topology: "Topology",
resource_manager: "ResourceManager",
execution_id: str,
actor_pool_scaling_up_threshold: float = DEFAULT_ACTOR_POOL_SCALING_UP_THRESHOLD, # noqa: E501
actor_pool_scaling_down_threshold: float = DEFAULT_ACTOR_POOL_SCALING_DOWN_THRESHOLD, # noqa: E501
):
self._actor_pool_scaling_up_threshold = actor_pool_scaling_up_threshold
self._actor_pool_scaling_down_threshold = actor_pool_scaling_down_threshold
# Last time when a request was sent to Ray's autoscaler.
self._last_request_time = 0
super().__init__(topology, resource_manager, execution_id)

def try_trigger_scaling(self):
self._try_scale_up_cluster()
self._try_scale_up_or_down_actor_pool()

def _calculate_actor_pool_util(self, actor_pool: AutoscalingActorPool):
"""Calculate the utilization of the given actor pool."""
if actor_pool.current_size() == 0:
return 0
else:
return actor_pool.num_active_actors() / actor_pool.current_size()

def _actor_pool_should_scale_up(
self,
actor_pool: AutoscalingActorPool,
op: "PhysicalOperator",
op_state: "OpState",
):
# Do not scale up, if the op is completed or no more inputs are coming.
if op.completed() or (op._inputs_complete and op.internal_queue_size() == 0):
return False
if actor_pool.current_size() < actor_pool.min_size():
# Scale up, if the actor pool is below min size.
return True
elif actor_pool.current_size() >= actor_pool.max_size():
# Do not scale up, if the actor pool is already at max size.
return False
# Do not scale up, if the op still has enough resources to run.
if op_state._scheduling_status.under_resource_limits:
return False
# Do not scale up, if the op has enough free slots for the existing inputs.
if op_state.num_queued() <= actor_pool.num_free_task_slots():
return False
# Determine whether to scale up based on the actor pool utilization.
util = self._calculate_actor_pool_util(actor_pool)
return util > self._actor_pool_scaling_up_threshold

def _actor_pool_should_scale_down(
self,
actor_pool: AutoscalingActorPool,
op: "PhysicalOperator",
):
# Scale down, if the op is completed or no more inputs are coming.
if op.completed() or (op._inputs_complete and op.internal_queue_size() == 0):
return True
if actor_pool.current_size() > actor_pool.max_size():
# Scale down, if the actor pool is above max size.
return True
elif actor_pool.current_size() <= actor_pool.min_size():
# Do not scale down, if the actor pool is already at min size.
return False
# Determine whether to scale down based on the actor pool utilization.
util = self._calculate_actor_pool_util(actor_pool)
return util < self._actor_pool_scaling_down_threshold

def _try_scale_up_or_down_actor_pool(self):
for op, state in self._topology.items():
actor_pools = op.get_autoscaling_actor_pools()
for actor_pool in actor_pools:
while True:
# Try to scale up or down the actor pool.
should_scale_up = self._actor_pool_should_scale_up(
actor_pool,
op,
state,
)
should_scale_down = self._actor_pool_should_scale_down(
actor_pool, op
)
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
if should_scale_up and not should_scale_down:
if actor_pool.scale_up(1) == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would this evaluate to true? Looks like scale_up always returns input value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i understand correctly, looks like scale_up is intended to return the number of actors actually added, which could differ from the requested scaleup. but in the current default implementation, looks like we return the input as @bveeramani said

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the current implementation always returns true. But I wanted to the make the interface more flexible and also make it consistent with scale_down

break
elif should_scale_down and not should_scale_up:
if actor_pool.scale_down(1) == 0:
break
bveeramani marked this conversation as resolved.
Show resolved Hide resolved
else:
break

def _try_scale_up_cluster(self):
"""Try to scale up the cluster to accomodate the provided in-progress workload.

This makes a resource request to Ray's autoscaler consisting of the current,
aggregate usage of all operators in the DAG + the incremental usage of all
operators that are ready for dispatch (i.e. that have inputs queued). If the
autoscaler were to grant this resource request, it would allow us to dispatch
one task for every ready operator.

Note that this resource request does not take the global resource limits or the
liveness policy into account; it only tries to make the existing resource usage
+ one more task per ready operator feasible in the cluster.
"""
# Limit the frequency of autoscaling requests.
now = time.time()
if now - self._last_request_time < self.MIN_GAP_BETWEEN_AUTOSCALING_REQUESTS:
return

# Scale up the cluster, if no ops are allowed to run, but there are still data
# in the input queues.
no_runnable_op = all(
op_state._scheduling_status.runnable is False
for _, op_state in self._topology.items()
)
any_has_input = any(
op_state.num_queued() > 0 for _, op_state in self._topology.items()
)
if not (no_runnable_op and any_has_input):
return

self._last_request_time = now

# Get resource usage for all ops + additional resources needed to launch one
# more task for each ready op.
resource_request = []

def to_bundle(resource: ExecutionResources) -> Dict:
req = {}
if resource.cpu:
req["CPU"] = math.ceil(resource.cpu)
if resource.gpu:
req["GPU"] = math.ceil(resource.gpu)
return req

for op, state in self._topology.items():
per_task_resource = op.incremental_resource_usage()
task_bundle = to_bundle(per_task_resource)
resource_request.extend([task_bundle] * op.num_active_tasks())
# Only include incremental resource usage for ops that are ready for
# dispatch.
if state.num_queued() > 0:
# TODO(Clark): Scale up more aggressively by adding incremental resource
# usage for more than one bundle in the queue for this op?
resource_request.append(task_bundle)

self._send_resource_request(resource_request)

def _send_resource_request(self, resource_request):
# Make autoscaler resource request.
actor = get_or_create_autoscaling_requester_actor()
actor.request_resources.remote(resource_request, self._execution_id)

def on_executor_shutdown(self):
# Make request for zero resources to autoscaler for this execution.
actor = get_or_create_autoscaling_requester_actor()
actor.request_resources.remote({}, self._execution_id)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import ray
from .ref_bundle import RefBundle
from ray._raylet import ObjectRefGenerator
from ray.data._internal.execution.autoscaler.autoscaling_actor_pool import (
AutoscalingActorPool,
)
from ray.data._internal.execution.interfaces.execution_options import (
ExecutionOptions,
ExecutionResources,
Expand Down Expand Up @@ -400,30 +403,14 @@ def base_resource_usage(self) -> ExecutionResources:
"""
return ExecutionResources()

def incremental_resource_usage(
self, consider_autoscaling=True
) -> ExecutionResources:
def incremental_resource_usage(self) -> ExecutionResources:
"""Returns the incremental resources required for processing another input.

For example, an operator that launches a task per input could return
ExecutionResources(cpu=1) as its incremental usage.

Args:
consider_autoscaling: Whether to consider the possibility of autoscaling.
"""
return ExecutionResources()

def notify_resource_usage(
self, input_queue_size: int, under_resource_limits: bool
) -> None:
"""Called periodically by the executor.

Args:
input_queue_size: The number of inputs queued outside this operator.
under_resource_limits: Whether this operator is under resource limits.
"""
pass

def notify_in_task_submission_backpressure(self, in_backpressure: bool) -> None:
"""Called periodically from the executor to update internal in backpressure
status for stats collection purposes.
Expand All @@ -435,3 +422,7 @@ def notify_in_task_submission_backpressure(self, in_backpressure: bool) -> None:
if self._in_task_submission_backpressure != in_backpressure:
self._metrics.on_toggle_task_submission_backpressure(in_backpressure)
self._in_task_submission_backpressure = in_backpressure

def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]:
"""Return a list of `AutoscalingActorPool`s managed by this operator."""
return []
Comment on lines +426 to +428
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect any operators other than ActorPoolMapOperator to override this method? Feel like it's not ideal to add a method to the PhysicalOperator interface just for one subclass, although I can't think of any alternatives off the top of my head.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could potentially use some top PhysicalOperator level attribute which describes if the operator relies on actors or tasks, then call this method only for operators for which that attribute is true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streaming aggregation also use actor pools and may implement this. also, I want to avoid other components (autoscaler and resource manager) depending on the ActorPoolMapOperator. This makes the dependency graph more complex.

Copy link
Contributor Author

@raulchen raulchen May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use some top PhysicalOperator level attribute which describes if the operator relies on actors or tasks

I think this is redundant, because we can tell this by if get_autoscaling_actor_pools returns an empty list.

Loading
Loading