-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[scheduling][5] Refactor resource syncer. #23270
Changes from all commits
91667ff
0b3265c
f47727e
f386753
9a5ffbc
e870bd6
6d6989f
21dd1e4
46ca5aa
700aead
b7111f0
5d17d83
b4aa776
958e71a
6600327
dc562e6
219eb19
33d77c2
fa19e39
ed02ab2
11aeb12
712d9d3
489d3aa
e667b5b
94fb719
9450a29
d8ecd71
08d866a
981b1ce
1775659
5a6fac6
b2100d6
d9d9194
ca949b0
26de0c6
8ce7c6c
85a8765
1ad60b7
a6314e1
24f55fa
e778d0e
3b7db93
8679b50
f9d6515
75d0f99
50989ce
0a67584
8b2d2d5
58b8963
e2503b7
1ffd94a
d7896b8
483278b
bf3030f
9e92dea
0a0e680
295d060
f3e90ec
9ff350f
b3f11f3
5cea3ee
cb63fa2
09fafa6
33fbf3b
3f56cbd
da5b74f
b3cb08d
c71d847
0c20810
cf8e28f
7a879b5
ef6c28b
85fc7fb
6b880ea
b654412
498c335
4a6cb59
e3aa592
16b5eab
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
// Copyright The Ray Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
namespace ray { | ||
namespace syncer { | ||
|
||
class MockReporterInterface : public ReporterInterface { | ||
public: | ||
MOCK_METHOD(std::optional<RaySyncMessage>, | ||
Snapshot, | ||
(int64_t current_version, RayComponentId component_id), | ||
(const, override)); | ||
}; | ||
|
||
} // namespace syncer | ||
} // namespace ray | ||
|
||
namespace ray { | ||
namespace syncer { | ||
|
||
class MockReceiverInterface : public ReceiverInterface { | ||
public: | ||
MOCK_METHOD(void, Update, (std::shared_ptr<const RaySyncMessage> message), (override)); | ||
}; | ||
|
||
} // namespace syncer | ||
} // namespace ray | ||
|
||
namespace ray { | ||
namespace syncer { | ||
|
||
class MockNodeSyncConnection : public NodeSyncConnection { | ||
public: | ||
using NodeSyncConnection::NodeSyncConnection; | ||
MOCK_METHOD(void, DoSend, (), (override)); | ||
}; | ||
|
||
} // namespace syncer | ||
} // namespace ray |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,197 @@ | ||
// Copyright 2022 The Ray Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
namespace ray { | ||
namespace syncer { | ||
|
||
/// NodeState keeps track of the modules in the local nodes. | ||
/// It contains the local components for receiving and reporting. | ||
/// It also keeps the raw messages receivers got. | ||
class NodeState { | ||
public: | ||
/// Constructor of NodeState. | ||
NodeState(); | ||
|
||
/// Set the local component. | ||
/// | ||
/// \param cid The component id. | ||
/// \param reporter The reporter is defined to be the local module which wants to | ||
/// broadcast its internal status to the whole clsuter. When it's null, it means there | ||
/// is no reporter in the local node for this component. This is the place there | ||
/// messages are | ||
/// generated. | ||
/// \param receiver The receiver is defined to be the module which eventually | ||
/// will have the view of of the cluster for this component. It's the place where | ||
/// received messages are consumed. | ||
/// | ||
/// \return true if set successfully. | ||
bool SetComponent(RayComponentId cid, | ||
const ReporterInterface *reporter, | ||
ReceiverInterface *receiver); | ||
|
||
/// Get the snapshot of a component for a newer version. | ||
/// | ||
/// \param cid The component id to take the snapshot. | ||
/// | ||
/// \return If a snapshot is taken, return the message, otherwise std::nullopt. | ||
std::optional<RaySyncMessage> GetSnapshot(RayComponentId cid); | ||
|
||
/// Consume a message. Receiver will consume this message if it doesn't have | ||
/// this message. | ||
/// | ||
/// \param message The message received. | ||
/// | ||
/// \return true if the local node doesn't have message with newer version. | ||
bool ConsumeMessage(std::shared_ptr<const RaySyncMessage> message); | ||
|
||
/// Return the cluster view of this local node. | ||
const absl::flat_hash_map< | ||
std::string, | ||
std::array<std::shared_ptr<const RaySyncMessage>, kComponentArraySize>> | ||
&GetClusterView() const { | ||
return cluster_view_; | ||
} | ||
|
||
private: | ||
/// For local nodes | ||
std::array<const ReporterInterface *, kComponentArraySize> reporters_ = {nullptr}; | ||
std::array<ReceiverInterface *, kComponentArraySize> receivers_ = {nullptr}; | ||
|
||
/// This field records the version of the snapshot that has been taken. | ||
std::array<int64_t, kComponentArraySize> snapshots_versions_taken_; | ||
/// Keep track of the latest messages received. | ||
/// Use shared pointer for easier liveness management since these messages might be | ||
/// sending via rpc. | ||
absl::flat_hash_map< | ||
std::string, | ||
std::array<std::shared_ptr<const RaySyncMessage>, kComponentArraySize>> | ||
cluster_view_; | ||
}; | ||
|
||
class NodeSyncConnection { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. think again about this NodeSyncConnection. If we are using it to simulating streaming, should it have following APIs?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @scv119 even with streaming, we still need Server and Client concept here. The one who initializes the work is client. Here the difference is: 1) initialization; 2) how to send; 3) how to receive. All these are communication layer things. Put them into NodeSyncConnection gives me the feeling that it's not a good abstraction. Now it's like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm could you hide them under the NodeSyncConnection interface/abstract class?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's not changing anything.
Btw, the only different part is about communication, what to do when we receive the updates is the same. And we still end up having two subclasses of NodeSyncConnection. Once, thing I think we probably can do is DoSend is a callback. But is this better than the current implementation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just updated it. I think it looks better than before. |
||
public: | ||
NodeSyncConnection( | ||
instrumented_io_context &io_context, | ||
std::string remote_node_id, | ||
std::function<void(std::shared_ptr<RaySyncMessage>)> message_processor); | ||
|
||
/// Push a message to the sending queue to be sent later. Some message | ||
/// might be dropped if the module think the target node has already got the | ||
/// information. Usually it'll happen when the message has the source node id | ||
/// as the target or the message is sent from the remote node. | ||
/// | ||
/// \param message The message to be sent. | ||
/// | ||
/// \return true if push to queue successfully. | ||
bool PushToSendingQueue(std::shared_ptr<const RaySyncMessage> message); | ||
|
||
/// Send the message queued. | ||
virtual void DoSend() = 0; | ||
|
||
virtual ~NodeSyncConnection() {} | ||
|
||
/// Return the remote node id of this connection. | ||
const std::string &GetRemoteNodeID() const { return remote_node_id_; } | ||
|
||
/// Handle the udpates sent from the remote node. | ||
/// | ||
/// \param messages The message received. | ||
void ReceiveUpdate(RaySyncMessages messages); | ||
fishbone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
protected: | ||
// For testing | ||
FRIEND_TEST(RaySyncerTest, NodeSyncConnection); | ||
friend struct SyncerServerTest; | ||
|
||
std::array<int64_t, kComponentArraySize> &GetNodeComponentVersions( | ||
const std::string &node_id); | ||
|
||
/// The io context | ||
instrumented_io_context &io_context_; | ||
|
||
/// The remote node id. | ||
std::string remote_node_id_; | ||
|
||
/// Handler of a message update. | ||
std::function<void(std::shared_ptr<RaySyncMessage>)> message_processor_; | ||
|
||
/// Buffering all the updates. Sending will be done in an async way. | ||
absl::flat_hash_map<std::pair<std::string, RayComponentId>, | ||
std::shared_ptr<const RaySyncMessage>> | ||
sending_buffer_; | ||
|
||
/// Keep track of the versions of components in the remote node. | ||
/// This field will be udpated when messages are received or sent. | ||
/// We'll filter the received or sent messages when the message is stale. | ||
absl::flat_hash_map<std::string, std::array<int64_t, kComponentArraySize>> | ||
node_versions_; | ||
}; | ||
|
||
/// SyncConnection for gRPC server side. It has customized logic for sending. | ||
class ServerSyncConnection : public NodeSyncConnection { | ||
fishbone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public: | ||
ServerSyncConnection( | ||
instrumented_io_context &io_context, | ||
const std::string &remote_node_id, | ||
std::function<void(std::shared_ptr<RaySyncMessage>)> message_processor); | ||
|
||
~ServerSyncConnection() override; | ||
|
||
void HandleLongPollingRequest(grpc::ServerUnaryReactor *reactor, | ||
RaySyncMessages *response); | ||
|
||
protected: | ||
/// Send the message from the pending queue to the target node. | ||
/// It'll send nothing unless there is a long-polling request. | ||
/// TODO (iycheng): Unify the sending algorithm when we migrate to gRPC streaming | ||
void DoSend() override; | ||
|
||
/// These two fields are RPC related. When the server got long-polling requests, | ||
/// these two fields will be set so that it can be used to send message. | ||
/// After the message being sent, these two fields will be set to be empty again. | ||
/// When the periodical timer wake up, it'll check whether these two fields are set | ||
/// and it'll only send data when these are set. | ||
RaySyncMessages *response_ = nullptr; | ||
grpc::ServerUnaryReactor *unary_reactor_ = nullptr; | ||
}; | ||
|
||
/// SyncConnection for gRPC client side. It has customized logic for sending. | ||
class ClientSyncConnection : public NodeSyncConnection { | ||
public: | ||
ClientSyncConnection( | ||
instrumented_io_context &io_context, | ||
const std::string &node_id, | ||
std::function<void(std::shared_ptr<RaySyncMessage>)> message_processor, | ||
std::shared_ptr<grpc::Channel> channel); | ||
|
||
protected: | ||
/// Send the message from the pending queue to the target node. | ||
/// It'll use gRPC to send the message directly. | ||
void DoSend() override; | ||
|
||
/// Start to send long-polling request to remote nodes. | ||
void StartLongPolling(); | ||
|
||
/// Stub for this connection. | ||
std::unique_ptr<ray::rpc::syncer::RaySyncer::Stub> stub_; | ||
fishbone marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
/// Where the received message is stored. | ||
ray::rpc::syncer::RaySyncMessages in_message_; | ||
|
||
/// Dummy request for long-polling. | ||
DummyRequest dummy_; | ||
}; | ||
|
||
} // namespace syncer | ||
} // namespace ray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel a bit confused about the naming NodeState. What about SyncerState? IIUC, there should be one of this per syncer right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more about the status of the local node: syncer state (cluster_view) + local modules(receiver/reporter).
But, I have no strong opinion about this one, @scv119 how do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SyncerState sounds better? this is also used by gcs, which is technically contains cluster state rather than node state.