Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[scheduling][5] Refactor resource syncer. #23270

Merged
merged 79 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
91667ff
initial version
fishbone Mar 17, 2022
0b3265c
add more comments
fishbone Mar 17, 2022
f47727e
add proto comment
fishbone Mar 17, 2022
f386753
update
fishbone Mar 17, 2022
9a5ffbc
update
fishbone Mar 17, 2022
e870bd6
update
fishbone Mar 17, 2022
6d6989f
Merge branch 'refactor-syncer' of github.com:iycheng/ray into refacto…
fishbone Mar 17, 2022
21dd1e4
add test_syncer_service
fishbone Mar 17, 2022
46ca5aa
format
fishbone Mar 17, 2022
700aead
make example compilable
fishbone Mar 17, 2022
b7111f0
format
fishbone Mar 17, 2022
5d17d83
update
fishbone Mar 18, 2022
b4aa776
format
fishbone Mar 18, 2022
958e71a
fix comment
fishbone Mar 20, 2022
6600327
update
fishbone Mar 20, 2022
dc562e6
fix some
fishbone Mar 20, 2022
219eb19
fix some
fishbone Mar 20, 2022
33d77c2
make toy example compiled
fishbone Mar 20, 2022
fa19e39
format
fishbone Mar 20, 2022
ed02ab2
update
fishbone Mar 20, 2022
11aeb12
fix
fishbone Mar 20, 2022
712d9d3
format
fishbone Mar 20, 2022
489d3aa
fix serveral issues
fishbone Mar 20, 2022
e667b5b
remove uncessary logs
fishbone Mar 20, 2022
94fb719
lint
fishbone Mar 20, 2022
9450a29
revert bazelrc
fishbone Mar 21, 2022
d8ecd71
Merge remote-tracking branch 'upstream/master' into refactor-syncer
fishbone Mar 21, 2022
08d866a
gcs back
fishbone Mar 21, 2022
981b1ce
prep for unittest
fishbone Mar 21, 2022
1775659
up
fishbone Mar 21, 2022
5a6fac6
check
fishbone Mar 21, 2022
b2100d6
test for node status
fishbone Mar 21, 2022
d9d9194
add node connection unit test and fixing bugs
fishbone Mar 21, 2022
ca949b0
fix comment
fishbone Mar 21, 2022
26de0c6
fix
fishbone Mar 21, 2022
8ce7c6c
fix comment
fishbone Mar 21, 2022
85a8765
up
fishbone Mar 21, 2022
1ad60b7
build good fundation for ray syncer test
fishbone Mar 21, 2022
a6314e1
format
fishbone Mar 21, 2022
24f55fa
format
fishbone Mar 21, 2022
e778d0e
lint
fishbone Mar 21, 2022
3b7db93
fix error
fishbone Mar 21, 2022
8679b50
fix
fishbone Mar 21, 2022
f9d6515
fix comment
fishbone Mar 21, 2022
75d0f99
add comment
fishbone Mar 21, 2022
50989ce
fix comments
fishbone Mar 21, 2022
0a67584
fix
fishbone Mar 21, 2022
8b2d2d5
fix
fishbone Mar 21, 2022
58b8963
format
fishbone Mar 21, 2022
e2503b7
fix error
fishbone Mar 22, 2022
1ffd94a
add test for 1-1 communication
fishbone Mar 22, 2022
d7896b8
format
fishbone Mar 22, 2022
483278b
up
fishbone Mar 22, 2022
bf3030f
check
fishbone Mar 22, 2022
9e92dea
address comments
fishbone Mar 22, 2022
0a0e680
fix
fishbone Mar 22, 2022
295d060
update
fishbone Mar 22, 2022
f3e90ec
fix a bug when first connection comes extra information is send
fishbone Mar 23, 2022
9ff350f
Merge remote-tracking branch 'upstream/master' into refactor-syncer
fishbone Mar 23, 2022
b3f11f3
update
fishbone Mar 23, 2022
5cea3ee
fix io pool overloaded
fishbone Mar 24, 2022
cb63fa2
finish
fishbone Mar 24, 2022
09fafa6
update
fishbone Mar 24, 2022
33fbf3b
update
fishbone Mar 24, 2022
3f56cbd
format
fishbone Mar 25, 2022
da5b74f
try to fix tsan
fishbone Mar 25, 2022
b3cb08d
Merge remote-tracking branch 'upstream/master' into refactor-syncer
fishbone Mar 25, 2022
c71d847
another try to fix tsan
fishbone Mar 25, 2022
0c20810
try to fix flaky
fishbone Mar 28, 2022
cf8e28f
format
fishbone Mar 28, 2022
7a879b5
a try to fix
fishbone Mar 28, 2022
ef6c28b
fix
fishbone Mar 28, 2022
85fc7fb
fix
fishbone Mar 28, 2022
6b880ea
Merge remote-tracking branch 'upstream/master' into refactor-syncer
fishbone Mar 28, 2022
b654412
fix some
fishbone Mar 28, 2022
498c335
fix one bug!
fishbone Mar 29, 2022
4a6cb59
format
fishbone Mar 29, 2022
e3aa592
fix comments
fishbone Mar 29, 2022
16b5eab
fix comment
fishbone Mar 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,13 @@ cc_library(
visibility = ["//visibility:public"],
)

cc_grpc_library(
name = "ray_syncer_cc_grpc",
srcs = ["//src/ray/protobuf:ray_syncer_proto"],
grpc_only = True,
deps = ["//src/ray/protobuf:ray_syncer_cc_proto"],
)

cc_library(
name = "ray_common",
srcs = glob(
Expand Down Expand Up @@ -432,6 +439,7 @@ cc_library(
visibility = ["//visibility:public"],
deps = [
":node_manager_fbs",
":ray_syncer_cc_grpc",
":ray_util",
":stats_metric",
"//src/ray/protobuf:common_cc_proto",
Expand Down Expand Up @@ -2365,6 +2373,27 @@ cc_library(
],
)

cc_binary(
name = "syncer_service_e2e_test",
srcs = ["src/ray/common/test/syncer_service_e2e_test.cc"],
copts = COPTS,
deps = [
":ray_common",
],
)

cc_test(
name = "ray_syncer_test",
srcs = ["src/ray/common/test/ray_syncer_test.cc"],
copts = COPTS,
tags = ["team:core"],
deps = [
":ray_common",
":ray_mock",
"@com_google_googletest//:gtest",
],
)

cc_test(
name = "callback_reply_test",
size = "small",
Expand Down
50 changes: 50 additions & 0 deletions src/mock/ray/common/ray_syncer/ray_syncer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace ray {
namespace syncer {

class MockReporterInterface : public ReporterInterface {
public:
MOCK_METHOD(std::optional<RaySyncMessage>,
Snapshot,
(int64_t current_version, RayComponentId component_id),
(const, override));
};

} // namespace syncer
} // namespace ray

namespace ray {
namespace syncer {

class MockReceiverInterface : public ReceiverInterface {
public:
MOCK_METHOD(void, Update, (std::shared_ptr<const RaySyncMessage> message), (override));
};

} // namespace syncer
} // namespace ray

namespace ray {
namespace syncer {

class MockNodeSyncConnection : public NodeSyncConnection {
public:
using NodeSyncConnection::NodeSyncConnection;
MOCK_METHOD(void, DoSend, (), (override));
};

} // namespace syncer
} // namespace ray
3 changes: 3 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ RAY_CONFIG(std::string, custom_unit_instance_resources, "")
// Maximum size of the batches when broadcasting resources to raylet.
RAY_CONFIG(uint64_t, resource_broadcast_batch_size, 512);

// Maximum ray sync message batch size in bytes (1MB by default) between nodes.
RAY_CONFIG(uint64_t, max_sync_message_batch_bytes, 1 * 1024 * 1024);

// If enabled and worker stated in container, the container will add
// resource limit.
RAY_CONFIG(bool, worker_resource_limits_enabled, false)
Expand Down
201 changes: 201 additions & 0 deletions src/ray/common/ray_syncer/ray_syncer-inl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace ray {
namespace syncer {

/// NodeState keeps track of the modules in the local nodes.
/// It contains the local components for receiving and reporting.
/// It also keeps the raw messages receivers got.
class NodeState {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel a bit confused about the naming NodeState. What about SyncerState? IIUC, there should be one of this per syncer right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more about the status of the local node: syncer state (cluster_view) + local modules(receiver/reporter).

But, I have no strong opinion about this one, @scv119 how do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SyncerState sounds better? this is also used by gcs, which is technically contains cluster state rather than node state.

public:
/// Constructor of NodeState.
NodeState();

/// Set the local components.
///
/// \param cid The component id.
/// \param reporter The reporter is defined to be the local module which wants to
/// broadcast its internal status to the whole clsuter. When it's null, it means there
/// is no reporter in this node for this component. This is the place there messages are
/// generated.
/// \param receiver The receiver is defined to be the module which eventually
/// will have the view of of the cluster for this component. It's the place where
/// received messages are consumed.
///
/// \return true if set successfully.
bool SetComponents(RayComponentId cid,
fishbone marked this conversation as resolved.
Show resolved Hide resolved
const ReporterInterface *reporter,
ReceiverInterface *receiver);

/// Get the snapshot of a component for a newer version.
///
/// \param cid The component id to take the snapshot.
///
/// \return If a snapshot is taken, return the message, otherwise std::nullopt.
std::optional<RaySyncMessage> GetSnapshot(RayComponentId cid);

/// Consume a message. Receiver will consume this message if it doesn't have
/// this message.
///
/// \param message The message received.
///
/// \return true if this node doesn't have message with newer version.
bool ConsumeMessage(std::shared_ptr<const RaySyncMessage> message);

/// Return the cluster view of this local node.
const absl::flat_hash_map<std::string, Array<std::shared_ptr<const RaySyncMessage>>>
&GetClusterView() const {
return cluster_view_;
}

private:
/// For local nodes
Array<const ReporterInterface *> reporters_ = {nullptr};
Array<ReceiverInterface *> receivers_ = {nullptr};

/// This field records the version of the snapshot that has been taken.
Array<int64_t> snapshots_taken_;
/// Keep track of the latest messages received.
/// Use shared pointer for easier liveness management since these messages might be
/// sending via rpc.
absl::flat_hash_map<std::string, Array<std::shared_ptr<const RaySyncMessage>>>
fishbone marked this conversation as resolved.
Show resolved Hide resolved
cluster_view_;
};

class NodeSyncConnection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think again about this NodeSyncConnection. If we are using it to simulating streaming, should it have following APIs?

class NodeSyncConnection {
  // Receives stream of responses from remote node. 
  using ResponseObserver = std::function<void(Response)>;
  public:
   NodeSyncConnection(ResponseObserver response_observer);
   // Sends requests to remote node.
   bool SendRequest(Request request);
};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scv119 even with streaming, we still need Server and Client concept here. The one who initializes the work is client.

Here the difference is: 1) initialization; 2) how to send; 3) how to receive. All these are communication layer things.

Put them into NodeSyncConnection gives me the feeling that it's not a good abstraction.

Now it's like:

  • NodeSyncConnection -> application layer filtering
  • Server/ClientSyncConneciton-> Handling how to do communication.

Copy link
Contributor

@scv119 scv119 Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm could you hide them under the NodeSyncConnection interface/abstract class?

class NodeSyncConnection {
  // Receives stream of responses from remote node. 
  using ResponseObserver = std::function<void(Response)>;
  public:
   NodeSyncConnection(ResponseObserver response_observer);
   // Sends requests to remote node.
   bool SendRequest(Request request);
};

class ServerSyncConnection : public NodeSyncConnection {
 public:
   ServerSyncConnection(ResponseObserver response_observer);
   bool SendRequest(Request request) override;
};

class ClientSyncConnection : public NodeSyncConnection {
 public:
   ClientSyncConnection(ResponseObserver response_observer);
   bool SendRequest(Request request) override;
};

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not changing anything.

  • PushToSendingQueue is SendRequest
  • ReceiveUpdates is ResponseObserver

Btw, the only different part is about communication, what to do when we receive the updates is the same.

And we still end up having two subclasses of NodeSyncConnection.

Once, thing I think we probably can do is DoSend is a callback. But is this better than the current implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just updated it. I think it looks better than before.

public:
NodeSyncConnection(RaySyncer &instance,
instrumented_io_context &io_context,
std::string node_id);

/// Push a message to the sending queue to be sent later. Some message
/// might be dropped if the module think the target node has already got the
/// information. Usually it'll happen when the message has the same node id
fishbone marked this conversation as resolved.
Show resolved Hide resolved
/// as the target or the message is sent from this node.
///
/// \param message The message to be sent.
///
/// \return true if push to queue successfully.
bool PushToSendingQueue(std::shared_ptr<const RaySyncMessage> message);

/// Send the message queued.
virtual void DoSend() = 0;

virtual ~NodeSyncConnection() {}

/// Return the node id of this sync context.
fishbone marked this conversation as resolved.
Show resolved Hide resolved
const std::string &GetNodeId() const { return node_id_; }

/// Handle the udpates sent from this node.
///
/// \param messages The message received.
void ReceiveUpdate(RaySyncMessages messages);
fishbone marked this conversation as resolved.
Show resolved Hide resolved

protected:
FRIEND_TEST(RaySyncerTest, NodeSyncConnection);

std::array<int64_t, kComponentArraySize> &GetNodeComponentVersions(
const std::string &node_id);

RaySyncer &instance_;
instrumented_io_context &io_context_;
std::string node_id_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is local_node_id or target_node_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the node id of the connection. do you think the target node makes sense here?

This comment was marked as resolved.


struct _MessageHash {
std::size_t operator()(
const std::shared_ptr<const RaySyncMessage> &m) const noexcept {
std::size_t seed = 0;
boost::hash_combine(seed, m->node_id());
boost::hash_combine(seed, m->component_id());
return seed;
}
};

struct _MessageEq {
bool operator()(const std::shared_ptr<const RaySyncMessage> &lhs,
const std::shared_ptr<const RaySyncMessage> &rhs) const noexcept {
if (lhs == rhs) {
return true;
}
if (lhs == nullptr || rhs == nullptr) {
return false;
}
// We don't check the version here since we want the old version to be deleted.
return lhs->node_id() == rhs->node_id() &&
lhs->component_id() == rhs->component_id();
}
};

absl::flat_hash_set<std::shared_ptr<const RaySyncMessage>, _MessageHash, _MessageEq>
sending_queue_;
// Keep track of the versions of components in this node.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not (just) about components in this node right? Also components in other nodes?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a bit confused: is this about the latest version the local node has received or sent?

This comment was marked as resolved.

absl::flat_hash_map<std::string, std::array<int64_t, kComponentArraySize>>
node_versions_;
};

class ServerSyncConnection : public NodeSyncConnection {
fishbone marked this conversation as resolved.
Show resolved Hide resolved
public:
ServerSyncConnection(RaySyncer &instance,
instrumented_io_context &io_context,
const std::string &node_id);

~ServerSyncConnection() override;

void HandleLongPollingRequest(grpc::ServerUnaryReactor *reactor,
RaySyncMessages *response);

protected:
/// Send the message from the pending queue to the target node.
/// It'll send nothing unless there is a request from the remote node
/// for the sending request.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this sentence mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is about long-polling. It means unless the remote node request, it won't send the data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Maybe make the comment a bit clearer: like "send nothing unless there is a long-polling request ..."?

/// TODO (iycheng): Unify the sending algorithm when we migrate to gRPC streaming
void DoSend() override;

/// These two fields are RPC related. When the server got long-polling requests,
/// these two fields will be set so that it can be used to send message.
/// After the message being sent, these two fields will be set to be empty again.
/// When the periodical timer wake up, it'll check whether these two fields are set
/// and it'll only send data when these are set.
RaySyncMessages *response_ = nullptr;
grpc::ServerUnaryReactor *unary_reactor_ = nullptr;
};

class ClientSyncConnection : public NodeSyncConnection {
public:
ClientSyncConnection(RaySyncer &instance,
instrumented_io_context &io_context,
const std::string &node_id,
std::shared_ptr<grpc::Channel> channel);

protected:
/// Send the message from the pending queue to the target node.
/// It'll use gRPC to send the message directly.
void DoSend() override;

/// Start to send long-polling request to remote nodes.
void StartLongPolling();

/// Stub for this connection.
std::unique_ptr<ray::rpc::syncer::RaySyncer::Stub> stub_;
fishbone marked this conversation as resolved.
Show resolved Hide resolved

/// Where the received message is stored.
ray::rpc::syncer::RaySyncMessages in_message_;

/// Dummy request for long-polling.
DummyRequest dummy_;
};

} // namespace syncer
} // namespace ray
Loading