Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
  • Loading branch information
raulchen committed Apr 29, 2024
1 parent a853c52 commit 11f96e0
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@

@DeveloperAPI
class AutoscalingActorPool(metaclass=ABCMeta):
"""Abstract interface of an autoscaling actor pool."""
"""Abstract interface of an autoscaling actor pool.
A `PhysicalOperator` can manage one or more `AutoscalingActorPool`s.
`Autoscaler` is responsible for deciding autoscaling of these actor
pools.
"""

@abstractmethod
def min_size(self) -> int:
Expand Down Expand Up @@ -49,10 +54,18 @@ def current_in_flight_tasks(self) -> int:

@abstractmethod
def scale_up(self, num_actors: int) -> int:
"""Scale up the actor pool by the given number of actors."""
"""Scale up the actor pool by the given number of actors.
Returns:
The number of actors actually added.
"""
...

@abstractmethod
def scale_down(self, num_actors: int) -> int:
"""Scale down the actor pool by the given number of actors."""
"""Scale down the actor pool by the given number of actors.
Returns:
The number of actors actually removed.
"""
...
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _actor_pool_util(self, actor_pool: AutoscalingActorPool):
if actor_pool.current_size() == 0:
return 0
else:
return actor_pool.num_running_actors() / actor_pool.current_size()
return actor_pool.num_busy_actors() / actor_pool.current_size()

def _actor_pool_should_scale_up(
self,
Expand All @@ -60,8 +60,11 @@ def _actor_pool_should_scale_up(
# Do not scale up, if the op is completed or no more inputs are coming.
if op.completed() or (op._inputs_complete and op.internal_queue_size() == 0):
return False
# Do not scale up, if the actor pool is already at max size.
if actor_pool.current_size() >= actor_pool.max_size():
if actor_pool.current_size() < actor_pool.min_size():
# Scale up, if the actor pool is below min size.
return True
elif actor_pool.current_size() >= actor_pool.max_size():
# Do not scale up, if the actor pool is already at max size.
return False
# Do not scale up, if the op still has enough resources to run.
if op_scheduling_status.under_resource_limits:
Expand All @@ -85,8 +88,11 @@ def _actor_pool_should_scale_down(
# Scale down, if the op is completed or no more inputs are coming.
if op.completed() or (op._inputs_complete and op.internal_queue_size()) == 0:
return True
# Do not scale down, if the actor pool is already at min size.
if actor_pool.current_size() <= actor_pool.min_size():
if actor_pool.current_size() > actor_pool.max_size():
# Scale down, if the actor pool is above max size.
return True
elif actor_pool.current_size() <= actor_pool.min_size():
# Do not scale down, if the actor pool is already at min size.
return False
# Determine whether to scale down based on the actor pool utilization.
util = self._actor_pool_util(actor_pool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,5 @@ def notify_in_task_submission_backpressure(self, in_backpressure: bool) -> None:
self._in_task_submission_backpressure = in_backpressure

def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]:
"""Return a list of `AutoscalingActorPool`s managed by this operator."""
return []
Original file line number Diff line number Diff line change
Expand Up @@ -382,46 +382,36 @@ def __init__(
# === Overriding methods of AutoscalingActorPool ===

def min_size(self) -> int:
"""Min size of the actor pool."""
return self._min_size

def max_size(self) -> int:
"""Max size of the actor pool."""
return self._max_size

def current_size(self) -> int:
"""Current size of the actor pool."""
return self.num_pending_actors() + self.num_running_actors()

def num_running_actors(self) -> int:
"""Number of running actors."""
return len(self._num_tasks_in_flight)

def num_busy_actors(self) -> int:
"""Number of actors with in-flight tasks."""
return self.current_size() - self.num_idle_actors()

def num_pending_actors(self) -> int:
"""Number of actors pending creation."""
return len(self._pending_actors)

def max_tasks_in_flight_per_actor(self) -> int:
"""Max number of in-flight tasks per actor."""
return self._max_tasks_in_flight

def current_in_flight_tasks(self) -> int:
"""Number of current in-flight tasks."""
return sum(num for _, num in self._num_tasks_in_flight.items())

def scale_up(self, num_actors: int) -> int:
"""Scale up the actor pool by the given number of actors."""
for _ in range(num_actors):
actor, ready_ref = self._create_actor_fn()
self.add_pending_actor(actor, ready_ref)
return num_actors

def scale_down(self, num_actors: int) -> int:
"""Scale down the actor pool by the given number of actors."""
num_killed = 0
for _ in range(num_actors):
if self.kill_inactive_actor():
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ def execute(
self._resource_manager = ResourceManager(self._topology, self._options)
self._backpressure_policies = get_backpressure_policies(self._topology)
self._autoscaler = create_autoscaler(
self._topology, self._resource_manager, self._execution_id
self._topology,
self._resource_manager,
self._execution_id,
)

self._has_op_completed = {op: False for op in self._topology}
Expand Down

0 comments on commit 11f96e0

Please sign in to comment.