-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] introduce abstract interface for data autoscaling #45002
Conversation
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
43b1e07
to
11f96e0
Compare
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM
|
||
|
||
@DeveloperAPI | ||
class Autoscaler(metaclass=ABCMeta): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
New class ABC has ABCMeta as its meta class. Using ABC as a base class has essentially the same effect as specifying metaclass=abc.ABCMeta, but is simpler to type and easier to read.
class Autoscaler(metaclass=ABCMeta): | |
class Autoscaler(ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. I forgot this again 😅
self._execution_id = execution_id | ||
|
||
@abstractmethod | ||
def try_trigger_scaling(self, scheduling_decision: "SchedulingDecision"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is supposed to trigger autoscaling of both the cluster and actor pools, right? Could we add a note in the docstring? I don't think it's obvious what's getting autoscaled from the name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
|
||
|
||
@DeveloperAPI | ||
class AutoscalingActorPool(metaclass=ABCMeta): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
class AutoscalingActorPool(metaclass=ABCMeta): | |
class AutoscalingActorPool(ABC): |
self._try_scale_up_cluster(scheduling_decision) | ||
self._try_scale_up_or_down_actor_pool(scheduling_decision) | ||
|
||
def _actor_pool_util(self, actor_pool: AutoscalingActorPool): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Could we rename this method to be more descriptive? I wasn't able to infer what the method does from the name _actor_pool_util
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update it to _calculate_actor_pool_util
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, does "util" refer to "utilization" in this context? Thought it meant "util" as in "utility"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I didn't realize "util" is also an common abbreviation for "utility"
def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]: | ||
"""Return a list of `AutoscalingActorPool`s managed by this operator.""" | ||
return [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we expect any operators other than ActorPoolMapOperator
to override this method? Feel like it's not ideal to add a method to the PhysicalOperator
interface just for one subclass, although I can't think of any alternatives off the top of my head.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could potentially use some top PhysicalOperator level attribute which describes if the operator relies on actors or tasks, then call this method only for operators for which that attribute is true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
streaming aggregation also use actor pools and may implement this. also, I want to avoid other components (autoscaler and resource manager) depending on the ActorPoolMapOperator. This makes the dependency graph more complex.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use some top PhysicalOperator level attribute which describes if the operator relies on actors or tasks
I think this is redundant, because we can tell this by if get_autoscaling_actor_pools returns an empty list.
actor_pool, op | ||
) | ||
if should_scale_up and not should_scale_down: | ||
if actor_pool.scale_up(1) == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would this evaluate to true? Looks like scale_up
always returns input value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if i understand correctly, looks like scale_up is intended to return the number of actors actually added, which could differ from the requested scaleup. but in the current default implementation, looks like we return the input as @bveeramani said
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the current implementation always returns true. But I wanted to the make the interface more flexible and also make it consistent with scale_down
def try_trigger_scaling(self, scheduling_decision: "SchedulingDecision"): | ||
"""Try trigger autoscaling. | ||
|
||
This method will be called each time when StreamExecutor makes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: here and docstring of on_executor_shutdown
, to make sure readers don't get confused by a different name
This method will be called each time when StreamExecutor makes | |
This method will be called each time when StreamingExecutor makes |
actor_pool, op | ||
) | ||
if should_scale_up and not should_scale_down: | ||
if actor_pool.scale_up(1) == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if i understand correctly, looks like scale_up is intended to return the number of actors actually added, which could differ from the requested scaleup. but in the current default implementation, looks like we return the input as @bveeramani said
def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]: | ||
"""Return a list of `AutoscalingActorPool`s managed by this operator.""" | ||
return [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could potentially use some top PhysicalOperator level attribute which describes if the operator relies on actors or tasks, then call this method only for operators for which that attribute is true
class OpSchedulingStatus: | ||
"""The scheduling status of an operator.""" | ||
|
||
# Whether the operator is runnable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what exactly does "runnable" mean here? can we expand on this definition?
also, wondering if it would make sense to connect this with OpState
, or combine the two classes together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
select_op_to_run
checks which ops are runnable, and choose the best op to run. "runnable" means this. Will add a comment.
also good suggestion to incorporate this class with OpState
.
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
@bveeramani @scottjlee thanks for your comments. all addressed. |
@raulchen this broke https://github.com/orgs/anyscale/projects/76/views/1?pane=issue&itemId=54699584, can you help investigate, thankks |
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Fix following bugs introduced by #45002: * `autoscaler.try_trigger_scaling` not called when `select_op_to_run` returns None. * scaling up condition on `under_resource_limits`. `python/ray/data/tests/test_streaming_integration.py::test_e2e_autoscaling_up` should pass after this fix. ## Related issue number Closes #43481 ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Hao Chen <[email protected]>
…#45002) * Introduce an abstract interface for data autoscaling, making autoscaling behavior easier to customize and extend. Main components: * `Autoscaler`: the abstract interface responsible for all autoscaling decisions, including cluster and actor pool autoscaling. * `AutoscalingActorPool`: abstract interface that represents an actor pool that can autoscale. * `DefaultAutoscaler`: default implementation. * No major code logic changes in this PR, except * fixing a small bug of calculating actor pool util (should be `num_active_actors/current_size` instead of `num_running_actors/current_size`). * `ActorPoolMapOperator.incremental_resource_usage` now doesn't consider autoscaling, as we are abstracting autoscaling out of the op. Previously the info wasn't useful either. * Removed actor pool autoscaling logic for bulk executor. --------- Signed-off-by: Hao Chen <[email protected]>
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? Fix following bugs introduced by ray-project#45002: * `autoscaler.try_trigger_scaling` not called when `select_op_to_run` returns None. * scaling up condition on `under_resource_limits`. `python/ray/data/tests/test_streaming_integration.py::test_e2e_autoscaling_up` should pass after this fix. ## Related issue number Closes ray-project#43481 ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( Signed-off-by: Hao Chen <[email protected]>
Why are these changes needed?
Autoscaler
: the abstract interface responsible for all autoscaling decisions, including cluster and actor pool autoscaling.AutoscalingActorPool
: abstract interface that represents an actor pool that can autoscale.DefaultAutoscaler
: default implementation.num_active_actors/current_size
instead ofnum_running_actors/current_size
).ActorPoolMapOperator.incremental_resource_usage
now doesn't consider autoscaling, as we are abstracting autoscaling out of the op. Previously the info wasn't useful either.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.