Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Support using concurrent actors for ActorPool #34253

Merged
merged 4 commits into from
Apr 11, 2023

Conversation

amogkam
Copy link
Contributor

@amogkam amogkam commented Apr 11, 2023

Support using concurrent actors for ActorPool. We do this by gating the user UDF in a separate threadpool of max size 1.

Why are these changes needed?

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: amogkam <[email protected]>
Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple questions:

  1. Should we always use this thread-pool code path regardless of the setting?
  2. Do we have any segfault issues right now?
  3. How does this relate to the thread-based pipelining code you have for accelerated prefetch? It seems like we could also use that pipeline verbatim, with each actor thread feeding into the pipeline and awaiting the output at the end.

@amogkam
Copy link
Contributor Author

amogkam commented Apr 11, 2023

  1. We can. Are you suggesting for code simplicity?
  2. This approach doesn't run into segfault
  3. Hmm I don't think that can very easily be plugged in as is. I think the pattern is different right? For prefetching there is a single producer with multiple threads being used for the computation. Here, there are multiple producers, with a single thread being used for computation.

@amogkam
Copy link
Contributor Author

amogkam commented Apr 11, 2023

Made the change for point 1


class _Wrapper(callable_cls):
def __init__(self, *args, **kwargs):
self.thread_pool_executor = ThreadPoolExecutor(max_workers=1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl we have max_workers=1 here so the user's UDF will always be run in a single thread.

Comment on lines +162 to +163
# Make sure user's UDF is not running concurrently.
assert len(set(thread_ids)) == 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test here @ericl

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we update documentation as well? so users are aware that they can use the concurrent actors by setting max_concurrency in map_batches?

@amogkam amogkam merged commit c8a4b98 into ray-project:master Apr 11, 2023
@amogkam amogkam deleted the data-actor-pool-concurrent-actor branch April 11, 2023 21:49
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
…4253)

Support using concurrent actors for ActorPool. We do this by gating the user UDF in a separate threadpool of max size 1.

---------

Signed-off-by: amogkam <[email protected]>
Signed-off-by: elliottower <[email protected]>
ProjectsByJackHe pushed a commit to ProjectsByJackHe/ray that referenced this pull request May 4, 2023
…4253)

Support using concurrent actors for ActorPool. We do this by gating the user UDF in a separate threadpool of max size 1.

---------

Signed-off-by: amogkam <[email protected]>
Signed-off-by: Jack He <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants