-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[scheduling][5] Refactor resource syncer. #23270
Conversation
8c274e7
to
0fe7d0f
Compare
0fe7d0f
to
91667ff
Compare
Still working on writing the test cases and toy example. Given that this PR will be too big, the actual integration will happen in the next PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced offline with @scv119 and the following concerns should be addressed in the updates:
- We should put local node status into a separate class and provide Updates to update local status. (cluster_view_/reporters_/receivers_)
- Consider using upstream/downstream model.
Several improvements to address the comments.
The next step:
|
// The component this message is for. | ||
RayComponentId component_id = 2; | ||
// The actual payload. | ||
bytes sync_message = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this incur one more string copy? we probably can benchmark it. if it's a concern we can see if union makes it better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have one extra copy to avoid one extra serialization when sending data.
Furthermore, I think in the future, the component actually can use flatbuffer to optimize for performance if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't finish, will continue.
|
||
RaySyncer &instance_; | ||
instrumented_io_context &io_context_; | ||
std::string node_id_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is local_node_id or target_node_id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the node id of the connection. do you think the target node makes sense here?
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
/// It'll send nothing unless there is a request from the remote node | ||
/// for the sending request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Maybe make the comment a bit clearer: like "send nothing unless there is a long-polling request ..."?
|
||
absl::flat_hash_set<std::shared_ptr<const RaySyncMessage>, _MessageHash, _MessageEq> | ||
sending_queue_; | ||
// Keep track of the versions of components in this node. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still a bit confused: is this about the latest version the local node has received or sent?
331085a
to
7a879b5
Compare
Here this node means the node associated with the connection. |
@jjyao I think you mistake the connection and the local node here. Connection is about the node connected to the local node. All node_id in the connection is about the node connected to the local node. We went through the code in the morning. Do you mind revisiting your comment and trying to see which one still makes sense? |
|
||
RaySyncer &instance_; | ||
instrumented_io_context &io_context_; | ||
std::string node_id_; |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
|
||
absl::flat_hash_set<std::shared_ptr<const RaySyncMessage>, _MessageHash, _MessageEq> | ||
sending_queue_; | ||
// Keep track of the versions of components in this node. |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
if (node_versions[message.component_id()] < message.version()) { | ||
node_versions[message.component_id()] = message.version(); | ||
} | ||
instance_.BroadcastMessage(std::make_shared<RaySyncMessage>(std::move(message))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to broadcast in the else case? Doesn't that mean we received an old message that should be discarded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you.
Thanks, @jjyao for the comments. I'll update the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
if (!message) { | ||
continue; | ||
} | ||
if (upward_only_[message->component_id()] && !is_upward_conn) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the example of upward_only component?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put it in the Register comments.
} | ||
|
||
bool NodeState::ConsumeMessage(std::shared_ptr<const RaySyncMessage> message) { | ||
auto ¤t = cluster_view_[message->node_id()][message->component_id()]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check whether message->node_id() == node_id and skip if so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do it on purpose. 1) if there is a new connection, sending the snapshot will be easy to do. 2) receiver might want to handle the local updates and this gives a way for the local reporter communicates to the local receiver.
cluster_view_; | ||
}; | ||
|
||
class NodeSyncConnection { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
think again about this NodeSyncConnection. If we are using it to simulating streaming, should it have following APIs?
class NodeSyncConnection {
// Receives stream of responses from remote node.
using ResponseObserver = std::function<void(Response)>;
public:
NodeSyncConnection(ResponseObserver response_observer);
// Sends requests to remote node.
bool SendRequest(Request request);
};
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@scv119 even with streaming, we still need Server and Client concept here. The one who initializes the work is client.
Here the difference is: 1) initialization; 2) how to send; 3) how to receive. All these are communication layer things.
Put them into NodeSyncConnection gives me the feeling that it's not a good abstraction.
Now it's like:
- NodeSyncConnection -> application layer filtering
- Server/ClientSyncConneciton-> Handling how to do communication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm could you hide them under the NodeSyncConnection interface/abstract class?
class NodeSyncConnection {
// Receives stream of responses from remote node.
using ResponseObserver = std::function<void(Response)>;
public:
NodeSyncConnection(ResponseObserver response_observer);
// Sends requests to remote node.
bool SendRequest(Request request);
};
class ServerSyncConnection : public NodeSyncConnection {
public:
ServerSyncConnection(ResponseObserver response_observer);
bool SendRequest(Request request) override;
};
class ClientSyncConnection : public NodeSyncConnection {
public:
ClientSyncConnection(ResponseObserver response_observer);
bool SendRequest(Request request) override;
};
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's not changing anything.
- PushToSendingQueue is SendRequest
- ReceiveUpdates is ResponseObserver
Btw, the only different part is about communication, what to do when we receive the updates is the same.
And we still end up having two subclasses of NodeSyncConnection.
Once, thing I think we probably can do is DoSend is a callback. But is this better than the current implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just updated it. I think it looks better than before.
Discussed with @scv119 offline and I'll move ray syncer out of NodeSyncConnection. |
Mac is hanging forever ... and the failure is not related. |
Why are these changes needed?
This PR refactor the resource syncer to decouple it from GCS and raylet. GCS and raylet will use the same module to sync data. The integration will happen in the next PR.
There are several new introduced components:
The core protocol is that each node will send {what it has} - {what the target has} to the target.
For example, think about node A <-> B. A will send all A has exclude what B has to B.
Whenever when there is new information (from NodeState or NodeSyncConnection), it'll be passed to RaySyncer broadcast message to broadcast.
NodeSyncConnection is for the communication layer. It has two implementations Client and Server:
Here is one example:
It means A initialize the connection to B and B initialize the connections to C and D
Now C generate a message M:
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.