Skip to content

Commit

Permalink
[Data] Ramp up max_tasks_in_flight (ray-project#33379)
Browse files Browse the repository at this point in the history
n this PR, we start the max_tasks_in_flight at 1 to make sure we fully utilize all the min_workers to start with. Then once min_workers have been created, we ramp up to the default max_tasks_in_flight of 4.

This allows us to maximize parallelism for datasets with a small number of blocks.

---------

Signed-off-by: amogkam <[email protected]>
Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
amogkam authored and edoakes committed Mar 22, 2023
1 parent 4c12525 commit f32c47e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ def __init__(
ObjectRef[ObjectRefGenerator], Tuple[_TaskState, ray.actor.ActorHandle]
] = {}
# A pool of running actors on which we can execute mapper tasks.
self._actor_pool = _ActorPool(autoscaling_policy._config.max_tasks_in_flight)
self._actor_pool = _ActorPool(
max_tasks_in_flight=self._autoscaling_policy._config.max_tasks_in_flight,
max_tasks_rampup_threshold=self._autoscaling_policy.min_workers,
)
# A queue of bundles awaiting dispatch to actors.
self._bundle_queue = collections.deque()
# Cached actor class.
Expand Down Expand Up @@ -430,10 +433,30 @@ class _ActorPool:
This class is in charge of tracking the number of in-flight tasks per actor,
providing the least heavily loaded actor to the operator, and killing idle
actors when the operator is done submitting work to the pool.
Args:
max_tasks_in_flight: The maximum number of tasks to queue on a single actor at
a time. The actual max_tasks_in_flight starts at 1, and then ramps up
to this value after `ramp_up_threshold` actors have been created, to allow
for even scheduling for datasets with a small number of partitions.
max_tasks_rampup_threshold: The number of actors to wait for before ramping up
`max_tasks_in_flight`.
"""

def __init__(self, max_tasks_in_flight: int = float("inf")):
self._max_tasks_in_flight = max_tasks_in_flight
def __init__(
self,
max_tasks_in_flight: int = float("inf"),
max_tasks_rampup_threshold: int = 1,
):
# We start with 1 task in flight, then ramp up to `max_tasks_in_flight`
# after the `max_tasks_rampup_threshold` actors have been created.
# This is to ensure we don't overschedule tasks to certain actors
# when the dataset has a small number of partitions. We first prioritize
# even sharding across min_workers actors.
self._max_tasks_in_flight_final = max_tasks_in_flight
self._max_tasks_in_flight = 1
self._max_tasks_rampup_threshold = max_tasks_rampup_threshold

# Number of tasks in flight per actor.
self._num_tasks_in_flight: Dict[ray.actor.ActorHandle, int] = {}
# Node id of each ready actor.
Expand Down Expand Up @@ -481,6 +504,11 @@ def pending_to_running(self, ready_ref: ray.ObjectRef) -> bool:
actor = self._pending_actors.pop(ready_ref)
self._num_tasks_in_flight[actor] = 0
self._actor_locations[actor] = ray.get(ready_ref)

# Ramp up to the final `max_tasks_in_flight` value after the threshold is
# reached.
if len(self._num_tasks_in_flight) >= self._max_tasks_rampup_threshold:
self._max_tasks_in_flight = self._max_tasks_in_flight_final
return True

def pick_actor(
Expand Down
12 changes: 12 additions & 0 deletions python/ray/data/tests/test_actor_pool_map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ def test_pick_max_tasks_in_flight(self, ray_start_regular_shared):
# Check that the 3rd pick doesn't return the actor.
assert pool.pick_actor() is None

def test_pick_max_tasks_in_flight_with_rampup(self, ray_start_regular_shared):
# Test that max_tasks_in_flight starts at 1, and ramps up after meeting
# the threshold.
pool = _ActorPool(max_tasks_in_flight=2, max_tasks_rampup_threshold=2)
actor = self._add_ready_worker(pool)
assert pool.pick_actor() == actor
assert pool.pick_actor() is None
actor2 = self._add_ready_worker(pool)
# Actors can be selected again after ramping up.
assert pool.pick_actor() == actor2
assert pool.pick_actor() in [actor, actor2]

def test_pick_ordering_lone_idle(self, ray_start_regular_shared):
# Test that a lone idle actor is the one that's picked.
pool = _ActorPool()
Expand Down

0 comments on commit f32c47e

Please sign in to comment.