Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[scheduler][6] Integrate ray with syncer. #23660

Merged
merged 85 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from 80 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
4f598da
up
fishbone Apr 1, 2022
e908415
up
fishbone Apr 1, 2022
a0388c2
format
fishbone Apr 1, 2022
d7de78a
integrate with raylet
fishbone Apr 2, 2022
f9187ce
format
fishbone Apr 4, 2022
34be999
finish add and remove
fishbone Apr 4, 2022
3e88774
integrate with gcs
fishbone Apr 4, 2022
c45efd4
fix it
fishbone Apr 4, 2022
dad1885
make raylet compile
fishbone Apr 4, 2022
1e812bd
enable for testing
fishbone Apr 5, 2022
c2e715b
fix one issue
fishbone Apr 5, 2022
5bee526
Merge remote-tracking branch 'upstream/master' into syncer-integration
fishbone Apr 5, 2022
e61e63f
fix
fishbone Apr 6, 2022
abc4579
format
fishbone Apr 6, 2022
c124f2f
fix test_basic_3.py
fishbone Apr 7, 2022
f2034b9
format
fishbone Apr 7, 2022
df968a7
up
fishbone Apr 7, 2022
4219b1d
up
fishbone Apr 7, 2022
42bd6d0
fix pg mini test
fishbone Apr 7, 2022
dd25eb6
format
fishbone Apr 7, 2022
3be9361
add pg updates in raylet
fishbone Apr 7, 2022
478987a
format and fix some cpp test
fishbone Apr 7, 2022
8db538a
fix raylet
fishbone Apr 8, 2022
6ec8542
Merge remote-tracking branch 'upstream/master' into syncer-integration
fishbone Apr 8, 2022
fd4b89a
obj memory and warning issue
fishbone Apr 9, 2022
5903e2a
format
fishbone Apr 9, 2022
617f296
fix gcs reconnect
fishbone Apr 9, 2022
fa612f1
format
fishbone Apr 9, 2022
172fa7e
fix cpp issues
fishbone Apr 9, 2022
57c3ecc
move gc to command channel
fishbone Apr 9, 2022
6f3c6bd
fix gc
fishbone Apr 9, 2022
94a4fad
format
fishbone Apr 9, 2022
fd5370d
Merge remote-tracking branch 'upstream/master' into syncer-integration
fishbone Apr 9, 2022
2b813f4
revert one line
fishbone Apr 9, 2022
2b5b1ff
update
fishbone Apr 9, 2022
7a14e80
fix
fishbone Apr 9, 2022
e8daa87
add test case for ray syncer
fishbone Apr 9, 2022
fb49bd8
format
fishbone Apr 9, 2022
cbffd67
up
fishbone Apr 9, 2022
e73688e
add test
fishbone Apr 11, 2022
25aae70
format
fishbone Apr 11, 2022
8fe8e3e
up
fishbone Apr 11, 2022
1503e10
update
fishbone Apr 11, 2022
a0e2d68
comments
fishbone Apr 11, 2022
8f421c0
fix some
fishbone Apr 14, 2022
63da674
format
fishbone Apr 14, 2022
7d561fa
Merge remote-tracking branch 'upstream/master' into syncer-integration
fishbone Apr 20, 2022
08c91b8
fix compiling and some comments
fishbone Apr 21, 2022
f07cf33
fix compiling
fishbone Apr 21, 2022
91f2f93
format
fishbone Apr 21, 2022
1984ea3
update
fishbone Apr 21, 2022
575009a
update
fishbone Apr 21, 2022
78e2432
update
fishbone Apr 21, 2022
001c413
format
fishbone Apr 21, 2022
ff520ab
format
fishbone Apr 21, 2022
d0352fe
format
fishbone Apr 21, 2022
4ea66a7
fix large test
fishbone Apr 21, 2022
2e03a48
update
fishbone Apr 22, 2022
fb79b6c
format
fishbone Apr 22, 2022
351c4bd
format
fishbone Apr 22, 2022
16ccbba
fix comment
fishbone Apr 22, 2022
7107aa1
format
fishbone Apr 22, 2022
257219f
Merge remote-tracking branch 'upstream/master' into syncer-integration
fishbone Apr 27, 2022
63991be
up
fishbone Apr 28, 2022
86fb73d
fix ray syncer test
fishbone Apr 28, 2022
0471f12
update
fishbone Apr 28, 2022
234a7d6
update
fishbone Apr 28, 2022
9110b3b
Merge remote-tracking branch 'upstream/master' into syncer-integration
fishbone Apr 28, 2022
5701f83
up
fishbone Apr 28, 2022
d332986
fix
fishbone Apr 29, 2022
f836124
format
fishbone Apr 29, 2022
189ae15
Merge remote-tracking branch 'upstream/master' into syncer-integration
fishbone Apr 29, 2022
c13a22c
revert serve
fishbone Apr 29, 2022
b66b907
revert state
fishbone Apr 29, 2022
d4bf660
Merge remote-tracking branch 'upstream/master' into syncer-integration
fishbone May 9, 2022
a11c98e
up
fishbone May 9, 2022
4e6e81e
merge and fix
fishbone May 9, 2022
bf42239
fix comment
fishbone May 9, 2022
e185a34
fix comment
fishbone May 9, 2022
263fcec
fix comment
fishbone May 9, 2022
d18834a
fix comments
fishbone May 9, 2022
3825141
fix compiling failure
fishbone May 10, 2022
77710ef
fix mis-merge
fishbone May 10, 2022
eba9fc0
fix mis-update
fishbone May 10, 2022
ff56f8b
fix a bad merge
fishbone May 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@
--build_tests_only
--test_tag_filters=release_unit
release/...

- label: ":python: (Small & Client)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
Expand Down Expand Up @@ -354,6 +353,38 @@
--test_env=CONDA_DEFAULT_ENV
python/ray/tests/...

- label: ":construction: :python: (syncer) (Small & Client)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=client_tests,small_size_python_tests
--test_env=RAY_use_ray_syncer=true
-- python/ray/tests/...
- label: ":construction: :python: (syncer) (Large)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
parallelism: 3
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- RAY_use_ray_syncer=true . ./ci/ci.sh test_large
- label: ":construction: :python: (syncer) (Medium A-J)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=-kubernetes,medium_size_python_tests_a_to_j
--test_env=RAY_use_ray_syncer=true
python/ray/tests/...
- label: ":construction: :python: (syncer) (Medium K-Z)"
conditions: ["RAY_CI_PYTHON_AFFECTED"]
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/travis/upload_build_info.sh; fi }; trap cleanup EXIT
- bazel test --config=ci $(./scripts/bazel_export_options)
--test_tag_filters=-kubernetes,medium_size_python_tests_k_to_z
--test_env=RAY_use_ray_syncer=true
python/ray/tests/...


# https://github.com/ray-project/ray/issues/22460
#- label: ":python: (Privileged test)"
#conditions: ["RAY_CI_PYTHON_AFFECTED"]
Expand Down
2 changes: 2 additions & 0 deletions python/ray/includes/ray_config.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@ cdef extern from "ray/common/ray_config.h" nogil:
c_bool gcs_grpc_based_pubsub() const

c_bool start_python_importer_thread() const

c_bool use_ray_syncer() const
4 changes: 4 additions & 0 deletions python/ray/includes/ray_config.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,7 @@ cdef class Config:
@staticmethod
def start_python_importer_thread():
return RayConfig.instance().start_python_importer_thread()

@staticmethod
def use_ray_syncer():
return RayConfig.instance().use_ray_syncer()
4 changes: 3 additions & 1 deletion python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
"ray_internal_num_spilled_tasks",
# "ray_unintentional_worker_failures_total",
# "ray_node_failure_total",
"ray_outbound_heartbeat_size_kb_sum",
"ray_operation_count",
"ray_operation_run_time_ms",
"ray_operation_queue_time_ms",
Expand Down Expand Up @@ -75,6 +74,9 @@
"ray_gcs_actors_count",
]

if not ray._raylet.Config.use_ray_syncer():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just supporting this from the syncer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably prefer another name instead of ray_outbound_heartbeat_size_kb_sum

_METRICS.append("ray_outbound_heartbeat_size_kb_sum")

# This list of metrics should be kept in sync with
# ray/python/ray/autoscaler/_private/prom_metrics.py
_AUTOSCALER_METRICS = [
Expand Down
2 changes: 1 addition & 1 deletion src/mock/ray/common/ray_syncer/ray_syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class MockReporterInterface : public ReporterInterface {
public:
MOCK_METHOD(std::optional<RaySyncMessage>,
CreateSyncMessage,
(int64_t current_version, RayComponentId component_id),
(int64_t current_version, MessageType message_type),
(const, override));
};

Expand Down
1 change: 1 addition & 0 deletions src/mock/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class MockRayletClientInterface : public RayletClientInterface {
WaitForDirectActorCallArgs,
(const std::vector<rpc::ObjectReference> &references, int64_t tag),
(override));
MOCK_METHOD(std::shared_ptr<grpc::Channel>, GetChannel, (), (const));
MOCK_METHOD(void,
ReportWorkerBacklog,
(const WorkerID &worker_id,
Expand Down
7 changes: 7 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ RAY_CONFIG(uint64_t, num_heartbeats_warning, 5)

/// The duration between reporting resources sent by the raylets.
RAY_CONFIG(uint64_t, raylet_report_resources_period_milliseconds, 100)

/// The duration between raylet check memory pressure and send gc request
RAY_CONFIG(uint64_t, raylet_check_gc_period_milliseconds, 100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this hardcoded before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the same as raylet_report_resources_period_milliseconds before.


/// For a raylet, if the last resource report was sent more than this many
/// report periods ago, then a warning will be logged that the report
/// handler is drifting.
Expand Down Expand Up @@ -335,6 +339,9 @@ RAY_CONFIG(int32_t, gcs_rpc_server_reconnect_timeout_s, 60)
/// Minimum interval between reconnecting gcs rpc server when gcs server restarts.
RAY_CONFIG(int32_t, minimum_gcs_reconnect_interval_milliseconds, 5000)

/// Feature flag to use the ray syncer for resource synchronization
RAY_CONFIG(bool, use_ray_syncer, false)

/// The interval at which the gcs client will check if the address of gcs service has
/// changed. When the address changed, we will resubscribe again.
RAY_CONFIG(uint64_t, gcs_service_address_check_interval_milliseconds, 1000)
Expand Down
10 changes: 5 additions & 5 deletions src/ray/common/ray_syncer/ray_syncer-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class NodeState {

/// Set the local component.
///
/// \param cid The component id.
/// \param message_type The type of the message for this component.
/// \param reporter The reporter is defined to be the local module which wants to
/// broadcast its internal status to the whole clsuter. When it's null, it means there
/// is no reporter in the local node for this component. This is the place there
Expand All @@ -36,16 +36,16 @@ class NodeState {
/// received messages are consumed.
///
/// \return true if set successfully.
bool SetComponent(RayComponentId cid,
bool SetComponent(MessageType message_type,
const ReporterInterface *reporter,
ReceiverInterface *receiver);

/// Get the snapshot of a component for a newer version.
///
/// \param cid The component id to take the snapshot.
/// \param message_type The component to take the snapshot.
///
/// \return If a snapshot is taken, return the message, otherwise std::nullopt.
std::optional<RaySyncMessage> CreateSyncMessage(RayComponentId cid);
std::optional<RaySyncMessage> CreateSyncMessage(MessageType message_type);

/// Consume a message. Receiver will consume this message if it doesn't have
/// this message.
Expand Down Expand Up @@ -127,7 +127,7 @@ class NodeSyncConnection {
std::function<void(std::shared_ptr<RaySyncMessage>)> message_processor_;

/// Buffering all the updates. Sending will be done in an async way.
absl::flat_hash_map<std::pair<std::string, RayComponentId>,
absl::flat_hash_map<std::pair<std::string, MessageType>,
std::shared_ptr<const RaySyncMessage>>
sending_buffer_;

Expand Down
54 changes: 27 additions & 27 deletions src/ray/common/ray_syncer/ray_syncer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,38 +22,38 @@ namespace syncer {

NodeState::NodeState() { sync_message_versions_taken_.fill(-1); }

bool NodeState::SetComponent(RayComponentId cid,
bool NodeState::SetComponent(MessageType message_type,
const ReporterInterface *reporter,
ReceiverInterface *receiver) {
if (cid < static_cast<RayComponentId>(kComponentArraySize) &&
reporters_[cid] == nullptr && receivers_[cid] == nullptr) {
reporters_[cid] = reporter;
receivers_[cid] = receiver;
if (message_type < static_cast<MessageType>(kComponentArraySize) &&
reporters_[message_type] == nullptr && receivers_[message_type] == nullptr) {
reporters_[message_type] = reporter;
receivers_[message_type] = receiver;
return true;
} else {
RAY_LOG(FATAL) << "Fail to set components, component_id:" << cid
RAY_LOG(FATAL) << "Fail to set components, message_type:" << message_type
<< ", reporter:" << reporter << ", receiver:" << receiver;
return false;
}
}

std::optional<RaySyncMessage> NodeState::CreateSyncMessage(RayComponentId cid) {
if (reporters_[cid] == nullptr) {
std::optional<RaySyncMessage> NodeState::CreateSyncMessage(MessageType message_type) {
if (reporters_[message_type] == nullptr) {
return std::nullopt;
}
auto message =
reporters_[cid]->CreateSyncMessage(sync_message_versions_taken_[cid], cid);
auto message = reporters_[message_type]->CreateSyncMessage(
sync_message_versions_taken_[message_type], message_type);
if (message != std::nullopt) {
sync_message_versions_taken_[cid] = message->version();
RAY_LOG(DEBUG) << "Sync message taken: cid:" << cid
sync_message_versions_taken_[message_type] = message->version();
RAY_LOG(DEBUG) << "Sync message taken: message_type:" << message_type
<< ", version:" << message->version()
<< ", node:" << NodeID::FromBinary(message->node_id());
}
return message;
}

bool NodeState::ConsumeSyncMessage(std::shared_ptr<const RaySyncMessage> message) {
auto &current = cluster_view_[message->node_id()][message->component_id()];
auto &current = cluster_view_[message->node_id()][message->message_type()];

RAY_LOG(DEBUG) << "ConsumeSyncMessage: " << (current ? current->version() : -1)
<< " message_version: " << message->version()
Expand All @@ -64,7 +64,7 @@ bool NodeState::ConsumeSyncMessage(std::shared_ptr<const RaySyncMessage> message
}

current = message;
auto receiver = receivers_[message->component_id()];
auto receiver = receivers_[message->message_type()];
if (receiver != nullptr) {
receiver->ConsumeSyncMessage(message);
}
Expand All @@ -83,11 +83,11 @@ void NodeSyncConnection::ReceiveUpdate(RaySyncMessages messages) {
for (auto &message : *messages.mutable_sync_messages()) {
auto &node_versions = GetNodeComponentVersions(message.node_id());
RAY_LOG(DEBUG) << "Receive update: "
<< " component_id=" << message.component_id()
<< " message_type=" << message.message_type()
<< ", message_version=" << message.version()
<< ", local_message_version=" << node_versions[message.component_id()];
if (node_versions[message.component_id()] < message.version()) {
node_versions[message.component_id()] = message.version();
<< ", local_message_version=" << node_versions[message.message_type()];
if (node_versions[message.message_type()] < message.version()) {
node_versions[message.message_type()] = message.version();
message_processor_(std::make_shared<RaySyncMessage>(std::move(message)));
}
}
Expand All @@ -104,9 +104,9 @@ bool NodeSyncConnection::PushToSendingQueue(
}

auto &node_versions = GetNodeComponentVersions(message->node_id());
if (node_versions[message->component_id()] < message->version()) {
node_versions[message->component_id()] = message->version();
sending_buffer_[std::make_pair(message->node_id(), message->component_id())] =
if (node_versions[message->message_type()] < message->version()) {
node_versions[message->message_type()] = message->version();
sending_buffer_[std::make_pair(message->node_id(), message->message_type())] =
message;
return true;
}
Expand Down Expand Up @@ -323,30 +323,30 @@ void RaySyncer::Disconnect(const std::string &node_id) {
"RaySyncerDisconnect");
}

bool RaySyncer::Register(RayComponentId component_id,
bool RaySyncer::Register(MessageType message_type,
const ReporterInterface *reporter,
ReceiverInterface *receiver,
int64_t pull_from_reporter_interval_ms) {
if (!node_state_->SetComponent(component_id, reporter, receiver)) {
if (!node_state_->SetComponent(message_type, reporter, receiver)) {
return false;
}

// Set job to pull from reporter periodically
if (reporter != nullptr && pull_from_reporter_interval_ms > 0) {
timer_.RunFnPeriodically(
[this, component_id]() { OnDemandBroadcasting(component_id); },
[this, message_type]() { OnDemandBroadcasting(message_type); },
pull_from_reporter_interval_ms);
}

RAY_LOG(DEBUG) << "Registered components: "
<< "component_id:" << component_id << ", reporter:" << reporter
<< "message_type:" << message_type << ", reporter:" << reporter
<< ", receiver:" << receiver
<< ", pull_from_reporter_interval_ms:" << pull_from_reporter_interval_ms;
return true;
}

bool RaySyncer::OnDemandBroadcasting(RayComponentId component_id) {
auto msg = node_state_->CreateSyncMessage(component_id);
bool RaySyncer::OnDemandBroadcasting(MessageType message_type) {
auto msg = node_state_->CreateSyncMessage(message_type);
if (msg) {
RAY_CHECK(msg->node_id() == GetLocalNodeID());
BroadcastMessage(std::make_shared<RaySyncMessage>(std::move(*msg)));
Expand Down
16 changes: 8 additions & 8 deletions src/ray/common/ray_syncer/ray_syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ namespace syncer {

using ray::rpc::syncer::DummyRequest;
using ray::rpc::syncer::DummyResponse;
using ray::rpc::syncer::RayComponentId;
using ray::rpc::syncer::MessageType;
using ray::rpc::syncer::RaySyncMessage;
using ray::rpc::syncer::RaySyncMessages;
using ray::rpc::syncer::StartSyncRequest;
using ray::rpc::syncer::StartSyncResponse;

static constexpr size_t kComponentArraySize =
static_cast<size_t>(ray::rpc::syncer::RayComponentId_ARRAYSIZE);
static_cast<size_t>(ray::rpc::syncer::MessageType_ARRAYSIZE);

/// The interface for a reporter. Reporter is defined to be a local module which would
/// like to let the other nodes know its state. For example, local cluster resource
Expand All @@ -49,13 +49,13 @@ struct ReporterInterface {
///
/// \param version_after Request message with version after `version_after`. If the
/// reporter doesn't have the qualified one, just return std::nullopt
/// \param component_id The component id asked for.
/// \param message_type The message type asked for.
///
/// \return std::nullopt if the reporter doesn't have such component or the current
/// snapshot of the component is not newer the asked one. Otherwise, return the
/// actual message.
virtual std::optional<RaySyncMessage> CreateSyncMessage(
int64_t version_after, RayComponentId component_id) const = 0;
int64_t version_after, MessageType message_type) const = 0;
virtual ~ReporterInterface() {}
};

Expand Down Expand Up @@ -114,14 +114,14 @@ class RaySyncer {
/// it'll have a global view of the cluster.
///
///
/// \param component_id The component to sync.
/// \param message_type The component to sync.
fishbone marked this conversation as resolved.
Show resolved Hide resolved
/// \param reporter The local component to be broadcasted.
/// \param receiver The consumer of the sync message sent by the other nodes in the
/// cluster.
/// \param pull_from_reporter_interval_ms The frequence to pull a message. 0 means
/// never pull a message in syncer.
/// from reporter and push it to sending queue.
bool Register(RayComponentId component_id,
bool Register(MessageType message_type,
const ReporterInterface *reporter,
ReceiverInterface *receiver,
int64_t pull_from_reporter_interval_ms = 100);
Expand All @@ -132,10 +132,10 @@ class RaySyncer {
/// Request trigger a broadcasting for a specific component immediately instead of
/// waiting for ray syncer to poll the message.
///
/// \param component_id The component to check.
/// \param message_type The component to check.
/// \return true if a message is generated. If the component doesn't have a new
/// version of message, false will be returned.
bool OnDemandBroadcasting(RayComponentId component_id);
bool OnDemandBroadcasting(MessageType message_type);

private:
/// Get the io_context used by RaySyncer.
Expand Down
Loading