Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aDAG] Support multi-read of the same shm channel #47311

Merged
merged 1 commit into from
Aug 30, 2024

Conversation

ruisearch42
Copy link
Contributor

@ruisearch42 ruisearch42 commented Aug 24, 2024

Why are these changes needed?

If the same method of the same actor is bound to the same node (i.e., reads from the same shared memory channel), aDAG execution hangs. This PR adds support to this case by caching results read from the channel.

Related issue number

Closes #47041

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@ruisearch42 ruisearch42 changed the title [Not ready for review] [aDAG] Support multi-read of the same shm channel [aDAG] Support multi-read of the same shm channel Aug 26, 2024
@ruisearch42 ruisearch42 added the go add ONLY when ready to merge, run all tests label Aug 26, 2024
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: is this approach working for this kind of case? (I believe it will not work?)

with InputNode as inp:
    out = s1.fwd(inp)
    dag = s2.fwd(inp, out)

Also have some concerns that deepcopy on the first result can affect perf negatively.

I actually wonder if it is viable approach to just not allow the additional write until all downstream tasks finish reading (I think with buffering input PR, it may work well?)

@anyscalesam anyscalesam added P0 Issues that should be fixed in short order core Issues that should be addressed in Ray Core compiled-graph labels Aug 26, 2024
@ruisearch42 ruisearch42 marked this pull request as ready for review August 27, 2024 00:33
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline. deep/shallow copy overhead is pretty big, so we will make some assumptions that input won't be changed (which is not very correct). we can allow copy inputs with some sort of flags

@stephanie-wang stephanie-wang self-assigned this Aug 27, 2024
@stephanie-wang
Copy link
Contributor

Could you add a description to the PR?

@kevin85421 kevin85421 self-assigned this Aug 27, 2024
@rkooo567 rkooo567 self-assigned this Aug 27, 2024
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QQ: is intraprocess channel still needed after this change?

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/tests/test_channel.py Show resolved Hide resolved
python/ray/dag/tests/experimental/test_accelerated_dag.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good start but let's try not to introduce the new use_cached flag if possible (it seems to make the code more complicated) and ideally clean up the cache once we know no more tasks will need it.

One suggestion might be to have an args cache per task, instead of a cache per channel. Then we can also do a list lookup instead of dict. Like:

cache: List[List[Any]]

The outer list index is the task idx on that actor, and the inner list is the resolved args for that task. The first time we deserialize an arg, we put it into the inner lists for all reader tasks.

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
@ruisearch42
Copy link
Contributor Author

Looks like a good start but let's try not to introduce the new use_cached flag if possible (it seems to make the code more complicated) and ideally clean up the cache once we know no more tasks will need it.

Sounds good.

One suggestion might be to have an args cache per task, instead of a cache per channel. Then we can also do a list lookup instead of dict.

Why is a list lookup preferred than a dict? It is performance concerns (hash function overhead)?

The first time we deserialize an arg, we put it into the inner lists for all reader tasks.

How do we know where to put into the inner lists? We still need a dict to maintain that info? Otherwise we need to go through all items of all inner lists and replace?

@ruisearch42 ruisearch42 removed the go add ONLY when ready to merge, run all tests label Aug 28, 2024
python/ray/experimental/channel/cached_channel.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/cached_channel.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/cached_channel.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/cached_channel.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_accelerated_dag.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_accelerated_dag.py Outdated Show resolved Hide resolved
@stephanie-wang
Copy link
Contributor

One suggestion might be to have an args cache per task, instead of a cache per channel. Then we can also do a list lookup instead of dict.

Why is a list lookup preferred than a dict? It is performance concerns (hash function overhead)?

The first time we deserialize an arg, we put it into the inner lists for all reader tasks.

How do we know where to put into the inner lists? We still need a dict to maintain that info? Otherwise we need to go through all items of all inner lists and replace?

Yes, I thought list lookup would be better for performance and I think it also makes the garbage collection simpler. But the new approach here also looks OK.

@ruisearch42
Copy link
Contributor Author

ruisearch42 commented Aug 28, 2024

Still WIP after using a new approach, will clean up more!

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Aug 28, 2024
@rkooo567
Copy link
Contributor

@ruisearch42 plz remove author-action-required when it is ready!

@ruisearch42 ruisearch42 force-pushed the bind_multi branch 3 times, most recently from d007769 to a1dcb32 Compare August 29, 2024 17:21
@ruisearch42 ruisearch42 added go add ONLY when ready to merge, run all tests and removed @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. labels Aug 29, 2024
@ruisearch42
Copy link
Contributor Author

Looking into some GPU test CI failures, but PR is ready for another look.

python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/dag/compiled_dag_node.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/cached_channel.py Outdated Show resolved Hide resolved
Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very clean! Can you run a microbenchmark and address last test comments? let's merge it after that

@ruisearch42
Copy link
Contributor Author

@rkooo567
Copy link
Contributor

@ruisearch42 plz comment if there's any change here! if not, let's just merge it

@ruisearch42
Copy link
Contributor Author

@rkooo567 microbenchmark aligns with past runs:
https://buildkite.com/ray-project/release/builds/21950#0191a40e-897c-44b1-b1f9-388ea3761ff6

The latest code just added a TODO comment compared to the last version which passed full CI.

@rkooo567 rkooo567 enabled auto-merge (squash) August 30, 2024 17:59
@rkooo567
Copy link
Contributor

@ruisearch42 auto merged enabled. so we should follow up with removing intra process channel right?

@rkooo567
Copy link
Contributor

premerge failure. maybe consider to merge latest master?

@rkooo567 rkooo567 merged commit c9c150a into ray-project:master Aug 30, 2024
6 checks passed
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 12, 2024
If the same method of the same actor is bound to the same node (i.e., reads from the same shared memory channel), aDAG execution hangs. This PR adds support to this case by caching results read from the channel.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
If the same method of the same actor is bound to the same node (i.e., reads from the same shared memory channel), aDAG execution hangs. This PR adds support to this case by caching results read from the channel.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
If the same method of the same actor is bound to the same node (i.e., reads from the same shared memory channel), aDAG execution hangs. This PR adds support to this case by caching results read from the channel.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
If the same method of the same actor is bound to the same node (i.e., reads from the same shared memory channel), aDAG execution hangs. This PR adds support to this case by caching results read from the channel.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
If the same method of the same actor is bound to the same node (i.e., reads from the same shared memory channel), aDAG execution hangs. This PR adds support to this case by caching results read from the channel.

Signed-off-by: ujjawal-khare <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
compiled-graph core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests P0 Issues that should be fixed in short order
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[core][aDAG] Hangs if same shared memory channel is read twice by one actor
5 participants