Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][aDag] Support multi node multi reader #47480

Merged
merged 15 commits into from
Sep 10, 2024

Conversation

rkooo567
Copy link
Contributor

@rkooo567 rkooo567 commented Sep 4, 2024

Why are these changes needed?

This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.

multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.

Related issue number

Closes #46269

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@@ -1448,59 +1447,6 @@ def test_driver_and_actor_as_readers(ray_start_cluster):
dag.experimental_compile()


def test_payload_large(ray_start_cluster):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to multi node test suite

@@ -16,7 +17,7 @@
# entry/init points.
logger = logging.getLogger(__name__)

DEFAULT_MAX_BUFFER_SIZE = int(100 * 1e6) # 100 mB
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a bug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a fundamental fix. I will fix it in a separate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean the buffer size should be 1MB? If that's the case, can you update comment as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we have this 1MB buffer size

buffer_size_bytes: int = DEFAULT_BUFFER_SIZE_BYTES
.

But it was not passed correctly (meainng our default buffer size has been 100mb)

timeout_ms,
)
# TODO(sang): Clean the previous ref that won't be used.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This currently leaks a thread whenever resizing happens. we should fix it

@rkooo567 rkooo567 changed the title [wip][aDag] Support multi node multi reader [Core][aDag] Support multi node multi reader Sep 6, 2024
@rkooo567 rkooo567 added the go add ONLY when ready to merge, run all tests label Sep 6, 2024
python/ray/dag/tests/experimental/test_multi_node_dag.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_multi_node_dag.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_multi_node_dag.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_multi_node_dag.py Outdated Show resolved Hide resolved
python/ray/dag/tests/experimental/test_multi_node_dag.py Outdated Show resolved Hide resolved
@@ -16,7 +17,7 @@
# entry/init points.
logger = logging.getLogger(__name__)

DEFAULT_MAX_BUFFER_SIZE = int(100 * 1e6) # 100 mB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean the buffer size should be 1MB? If that's the case, can you update comment as well?

python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
Comment on lines 163 to 165
_reader_node_ids: Optional[Set["ray.NodeID"]] = None,
_writer_ref: Optional["ray.ObjectRef"] = None,
_reader_ref: Optional["ray.ObjectRef"] = None,
_reader_refs: Optional[Dict[str, "ray.ObjectRef"]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please update docstring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a comment underneath, and given other private args don't have docstring, I will keep it as is. Lmk if you think we should update docstring for all private attr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we should move it to the docstring so that the caller knows what to pass in. There are also args that seem to contain overlapping information, which need clean up or clarification.

python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
self._reader_ref = reader_ref
def __init__(
self,
_node_id_to_reader_info: Dict[str, ReaderInfo] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad. it should not be None

self._reader_node_ids = _reader_node_ids
self._node_id_to_reader_info = _node_id_to_reader_info

assert self._num_local_readers == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit weird, it is set to 0 at L233, and asserted here. Why not just set it here?

@rkooo567
Copy link
Contributor Author

rkooo567 commented Sep 9, 2024

all comments are addressed. premerge passing

Comment on lines 365 to 368
remote_reader_refs,
remote_reader_node_ids,
remote_reader_ids,
remote_num_readers_per_node,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we define a struct for each "node reader"? It is less error prone and we don't need the assert on L357

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@rkooo567
Copy link
Contributor Author

rkooo567 commented Sep 9, 2024

cc @ruisearch42 do you think we can merge this today ?

Copy link
Contributor

@ruisearch42 ruisearch42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good

std::shared_ptr<std::vector<std::shared_ptr<MutableObjectReaderInterface>>>
remote_readers =
std::make_shared<std::vector<std::shared_ptr<MutableObjectReaderInterface>>>();
// TODO(sang): Currently, these attributes are not cleaned up.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which attributes?

Comment on lines +248 to +251
RAY_LOG(ERROR)
<< "Failed to transfer object to a remote node for an object id "
<< writer_object_id << ". It can cause hang.";
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a hard failure here?

Comment on lines +360 to +362
self._worker.core_worker.experimental_channel_register_reader(
reader_ref_info.reader_ref,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to assert this is called exactly once?

python/ray/experimental/channel/shared_memory_channel.py Outdated Show resolved Hide resolved
@rkooo567 rkooo567 enabled auto-merge (squash) September 10, 2024 16:56
@rkooo567 rkooo567 merged commit 57136b5 into ray-project:master Sep 10, 2024
6 checks passed
rkooo567 pushed a commit to rkooo567/ray that referenced this pull request Sep 11, 2024
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.

multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.

multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.

multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.

multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.

multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.

multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.

multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.

multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.

Signed-off-by: ujjawal-khare <[email protected]>
ujjawal-khare pushed a commit to ujjawal-khare-27/ray that referenced this pull request Oct 15, 2024
This PR supports multi readers in multi nodes. It also adds tests that the feature works with large gRPC payloads and buffer resizing.

multi readers in multi node didn't work because the code allows to only register 1 remote reader reference on 1 specific node. This fixes the issues by allowing to register remote reader references in multi nodes.

Signed-off-by: ujjawal-khare <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
go add ONLY when ready to merge, run all tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[in-progress] Readers across different nodes - ADAG Developer Preview - Test Coverage
3 participants