Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Allow tasks to control concurrency in map-like APIs #42637

Merged
merged 6 commits into from
Jan 30, 2024

Conversation

c21
Copy link
Contributor

@c21 c21 commented Jan 24, 2024

Why are these changes needed?

This PR is to allow tasks to control concurrency in map-like APIs, when user uses map_batches(fn, concurrency=...). Each TaskPoolMapOperator will have a concurrency cap to control the concurrency.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@c21 c21 changed the title [WIP][Data] Allow tasks to control concurrency in map-like APIs [Data] Allow tasks to control concurrency in map-like APIs Jan 26, 2024
Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

ray_remote_args: Customize the ray remote args for this op's tasks.
"""
self._concurrency = concurrency
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Don't think it really matters here, but IIRC calling super().__init__ first is a best practice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

raise ValueError(
"``concurrency`` is set as a tuple of integers, but ``fn`` "
f"is not a callable class: {fn}. Use ``concurrency=n`` to "
"control maximal number of workers to use."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"control maximal number of workers to use."
"control maximum number of workers to use."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

# The multiplier to multiply the concurrency cap by.
CAP_MULTIPLIER = 2.0
CAP_MULTIPLIER_CONFIG_KEY = "backpressure_policies.concurrency_cap.cap_multiplier"
NOTE: Only support setting concurrency cap for `TaskPoolMapOperator` for now.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's our motivation for not adding it to ActorPoolMapOperator? Is it because there's a separate code path for controlling actor concurrency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because there's a separate code path for controlling actor concurrency?

Yes, the concurrency cap back pressure policy here is not useful given we already have internal scaling up and down in ActorPoolMapOperator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add this to the comment? and maybe also leave a TODO to consolidate this policy with the actor pool concurrency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.


if size is not None and size < 1:
raise ValueError("`size` must be >= 1", size)
self.size = size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, maybe rename this to max_size to make the meaning clearer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay, let's be consistent with naming of ActorPoolStrategy(size=...).

# The multiplier to multiply the concurrency cap by.
CAP_MULTIPLIER = 2.0
CAP_MULTIPLIER_CONFIG_KEY = "backpressure_policies.concurrency_cap.cap_multiplier"
NOTE: Only support setting concurrency cap for `TaskPoolMapOperator` for now.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add this to the comment? and maybe also leave a TODO to consolidate this policy with the actor pool concurrency.


The concrete stategy is as follows:
- Each PhysicalOperator is assigned an initial concurrency cap.
- Each PhysicalOperator is assigned a concurrency cap.
- An PhysicalOperator can run new tasks if the number of running tasks is less
than the cap.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment looks a bit too verbose since we've removed the exponential ramp-up. We can simplify it to something like "The policy will limit the number of concurrently running tasks based on its concurrency parameter.`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@@ -185,24 +138,6 @@ def test_e2e_normal(self):
start2, end2 = ray.get(actor.get_start_and_end_time_for_op.remote(2))
assert start1 < start2 < end1 < end2, (start1, start2, end1, end2)

def test_e2e_no_ramping_up(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need to update the above test_e2e_normal to use the concurrency parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank, updated.

@c21 c21 merged commit 46a9efe into ray-project:master Jan 30, 2024
9 checks passed
@c21 c21 deleted the cap branch January 30, 2024 02:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants