Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Enable gcs scheduler 4/n] Fix PG scheduling by gcs scheduler #27084

Merged
merged 10 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 121 additions & 19 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ void GcsPlacementGroupScheduler::ScheduleUnplacedBundles(
.second);

// Acquire resources from gcs resources manager to reserve bundle resources.
const auto &prepared_bundle_locations = lease_status_tracker->GetBundleLocations();
AcquireBundleResources(prepared_bundle_locations);
const auto &bundle_locations = lease_status_tracker->GetBundleLocations();
AcquireBundleResources(bundle_locations);

// Covert to a set of bundle specifications grouped by the node.
std::unordered_map<NodeID, std::vector<std::shared_ptr<const BundleSpecification>>>
Expand Down Expand Up @@ -612,6 +612,22 @@ absl::flat_hash_map<scheduling::NodeID, ResourceRequest> ToNodeBundleResourcesMa
return node_bundle_resources_map;
}

/// Help function to check if the resource_name is like
/// {original_resource_name}_group_{placement_group_id}.
bool IsPlacementGroupWildcardResource(const std::string &resource_name) {
std::string_view resource_name_view(resource_name);
std::string_view pattern("_group_");

// The length of {placement_group_id} is fixed, so we just need to check that if the
// length and the pos of `_group_` match.
if (resource_name_view.size() > pattern.size() + 2 * PlacementGroupID::Size()) {
return false;
}

auto idx = resource_name_view.size() - (pattern.size() + PlacementGroupID::Size());
return resource_name_view.substr(idx, pattern.size()) == pattern;
}

void GcsPlacementGroupScheduler::CommitBundleResources(
const std::shared_ptr<BundleLocations> &bundle_locations) {
// Acquire bundle resources from gcs resources manager.
Expand All @@ -620,35 +636,121 @@ void GcsPlacementGroupScheduler::CommitBundleResources(
auto node_bundle_resources_map = ToNodeBundleResourcesMap(bundle_locations);
for (const auto &[node_id, node_bundle_resources] : node_bundle_resources_map) {
for (const auto &[resource_id, capacity] : node_bundle_resources.ToMap()) {
cluster_resource_manager.UpdateResourceCapacity(
node_id, resource_id, capacity.Double());
// A placement group's wildcard resource has to be the sum of all related bundles.
// Even though `ToNodeBundleResourcesMap` has already considered this,
// it misses the scenario in which single (or subset of) bundle is rescheduled.
// When commiting this single bundle, its wildcard resource would wrongly overwrite
// the existing value, unless using the following additive operation.
if (IsPlacementGroupWildcardResource(resource_id.Binary())) {
Chong-Li marked this conversation as resolved.
Show resolved Hide resolved
auto new_capacity =
capacity +
cluster_resource_manager.GetNodeResources(node_id).total.Get(resource_id);
cluster_resource_manager.UpdateResourceCapacity(
node_id, resource_id, new_capacity.Double());
} else {
cluster_resource_manager.UpdateResourceCapacity(
node_id, resource_id, capacity.Double());
}
}
}

for (const auto &listener : resources_changed_listeners_) {
listener();
}
}

void GcsPlacementGroupScheduler::ReturnBundleResources(
const std::shared_ptr<BundleLocations> &bundle_locations) {
// Return bundle resources to gcs resources manager should contains the following steps.
// 1. Remove related bundle resources from nodes.
// 2. Add resources allocated for bundles back to nodes.
// TODO(Shanly): It's necessary to check whether the bundle could be returned in gcs
// scheduling. Only bundles with same total and availe resources could be returnd.
for (auto &bundle : *bundle_locations) {
if (!TryReleasingBundleResources(bundle.second)) {
waiting_removed_bundles_.push_back(bundle.second);
}
}

for (const auto &listener : resources_changed_listeners_) {
listener();
}
}

void GcsPlacementGroupScheduler::AddResourcesChangedListener(
std::function<void()> listener) {
RAY_CHECK(listener != nullptr);
resources_changed_listeners_.emplace_back(std::move(listener));
}

bool GcsPlacementGroupScheduler::TryReleasingBundleResources(
Chong-Li marked this conversation as resolved.
Show resolved Hide resolved
const std::pair<NodeID, std::shared_ptr<const BundleSpecification>> &bundle) {
auto &cluster_resource_manager =
cluster_resource_scheduler_.GetClusterResourceManager();
for (auto &bundle : *bundle_locations) {
auto node_id = scheduling::NodeID(bundle.second.first.Binary());
const auto &bundle_spec = *bundle.second.second;
// Remove bundle resource names (the label with `_group_`).
std::vector<scheduling::ResourceID> bundle_resource_ids;
for (const auto &entry : bundle_spec.GetFormattedResources()) {
bundle_resource_ids.emplace_back(scheduling::ResourceID(entry.first));
auto node_id = scheduling::NodeID(bundle.first.Binary());
const auto &bundle_spec = bundle.second;
std::vector<scheduling::ResourceID> bundle_resource_ids;
absl::flat_hash_map<std::string, FixedPoint> wildcard_resources;
// Subtract wildcard resources and delete bundle resources.
for (const auto &entry : bundle_spec->GetFormattedResources()) {
auto resource_id = scheduling::ResourceID(entry.first);
auto capacity =
cluster_resource_manager.GetNodeResources(node_id).total.Get(resource_id);
if (IsPlacementGroupWildcardResource(entry.first)) {
wildcard_resources[entry.first] = capacity - entry.second;
} else {
if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
auto available_amount =
cluster_resource_manager.GetNodeResources(node_id).available.Get(resource_id);
if (available_amount != capacity) {
RAY_LOG(WARNING)
<< "The resource " << entry.first
<< " now is still in use when removing bundle " << bundle_spec->Index()
<< " from placement group: " << bundle_spec->PlacementGroupId()
<< ", maybe some workers depending on this bundle have not released the "
"resource yet."
<< " We will try it later.";
bundle_resource_ids.clear();
break;
}
}
bundle_resource_ids.emplace_back(resource_id);
}
}

// This bundle is not ready for returning.
if (bundle_resource_ids.empty()) {
return false;
}

for (const auto &[resource_name, capacity] : wildcard_resources) {
if (capacity == 0) {
bundle_resource_ids.emplace_back(scheduling::ResourceID(resource_name));
} else {
cluster_resource_manager.UpdateResourceCapacity(
node_id, scheduling::ResourceID(resource_name), capacity.Double());
}
}

// It will affect nothing if the resource_id to be deleted does not exist in the
// cluster_resource_manager_.
cluster_resource_manager.DeleteResources(node_id, bundle_resource_ids);
// Add reserved bundle resources back to the node.
cluster_resource_manager.AddNodeAvailableResources(node_id,
bundle_spec->GetRequiredResources());
return true;
}

void GcsPlacementGroupScheduler::HandleWaitingRemovedBundles() {
for (auto iter = waiting_removed_bundles_.begin();
iter != waiting_removed_bundles_.end();) {
auto current = iter++;
auto bundle = *current;
if (TryReleasingBundleResources(bundle)) {
// Release bundle successfully.
waiting_removed_bundles_.erase(current);
}
// It will affect nothing if the resource_id to be deleted does not exist in the
// cluster_resource_manager_.
cluster_resource_manager.DeleteResources(node_id, bundle_resource_ids);
// Add reserved bundle resources back to the node.
cluster_resource_manager.AddNodeAvailableResources(
node_id, bundle_spec.GetRequiredResources());
}
for (const auto &listener : resources_changed_listeners_) {
listener();
}
}

Expand Down
20 changes: 20 additions & 0 deletions src/ray/gcs/gcs_server/gcs_placement_group_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
std::vector<std::shared_ptr<BundleSpecification>>>
&group_to_bundles) override;

/// Add resources changed listener.
void AddResourcesChangedListener(std::function<void()> listener);

void HandleWaitingRemovedBundles();

protected:
/// Send bundles PREPARE requests to a node. The PREPARE requests will lock resources
/// on a node until COMMIT or CANCEL requests are sent to a node.
Expand Down Expand Up @@ -429,6 +434,14 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
rpc::PlacementStrategy strategy,
double max_cpu_fraction_per_node);

/// Try to release bundle resource to cluster resource manager.
///
/// \param bundle The node to which the bundle is scheduled and the bundle's
/// specification.
/// \return True if the bundle is succesfully released. False otherwise.
bool TryReleasingBundleResources(
const std::pair<NodeID, std::shared_ptr<const BundleSpecification>> &bundle);

/// A timer that ticks every cancel resource failure milliseconds.
boost::asio::deadline_timer return_timer_;

Expand Down Expand Up @@ -457,6 +470,13 @@ class GcsPlacementGroupScheduler : public GcsPlacementGroupSchedulerInterface {
/// The syncer of resource. This is used to report placement group updates.
/// TODO (iycheng): Remove this one from pg once we finish the refactor
gcs_syncer::RaySyncer *ray_syncer_;

/// The resources changed listeners.
std::vector<std::function<void()>> resources_changed_listeners_;

/// The bundles that waiting to be destroyed and release resources.
std::list<std::pair<NodeID, std::shared_ptr<const BundleSpecification>>>
waiting_removed_bundles_;
};

} // namespace gcs
Expand Down
19 changes: 16 additions & 3 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ void GcsServer::InitGcsActorManager(const GcsInitData &gcs_init_data) {

void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {
RAY_CHECK(gcs_table_storage_ && gcs_node_manager_);
auto scheduler =
gcs_placement_group_scheduler_ =
std::make_shared<GcsPlacementGroupScheduler>(main_service_,
gcs_table_storage_,
*gcs_node_manager_,
Expand All @@ -423,7 +423,7 @@ void GcsServer::InitGcsPlacementGroupManager(const GcsInitData &gcs_init_data) {

gcs_placement_group_manager_ = std::make_shared<GcsPlacementGroupManager>(
main_service_,
scheduler,
gcs_placement_group_scheduler_,
gcs_table_storage_,
*gcs_resource_manager_,
[this](const JobID &job_id) {
Expand Down Expand Up @@ -647,6 +647,7 @@ void GcsServer::InstallEventListeners() {
worker_failure_data->exit_type(),
worker_failure_data->exit_detail(),
creation_task_exception);
gcs_placement_group_scheduler_->HandleWaitingRemovedBundles();
});

// Install job event listeners.
Expand All @@ -661,11 +662,23 @@ void GcsServer::InstallEventListeners() {
main_service_.post(
[this] {
// Because resources have been changed, we need to try to schedule the
// pending actors.
// pending placement groups and actors.
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
cluster_task_manager_->ScheduleAndDispatchTasks();
},
"GcsServer.SchedulePendingActors");
});

gcs_placement_group_scheduler_->AddResourcesChangedListener([this] {
main_service_.post(
[this] {
// Because some placement group resources have been committed or deleted, we
// need to try to schedule the pending placement groups and actors.
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
cluster_task_manager_->ScheduleAndDispatchTasks();
},
"GcsServer.SchedulePendingPGActors");
});
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class GcsNodeManager;
class GcsActorManager;
class GcsJobManager;
class GcsWorkerManager;
class GcsPlacementGroupScheduler;
class GcsPlacementGroupManager;

/// The GcsServer will take over all requests from GcsClient and transparent
Expand Down Expand Up @@ -202,6 +203,8 @@ class GcsServer {
std::shared_ptr<GcsRedisFailureDetector> gcs_redis_failure_detector_;
/// The gcs actor manager.
std::shared_ptr<GcsActorManager> gcs_actor_manager_;
/// The gcs placement group scheduler.
std::shared_ptr<GcsPlacementGroupScheduler> gcs_placement_group_scheduler_;
/// The gcs placement group manager.
std::shared_ptr<GcsPlacementGroupManager> gcs_placement_group_manager_;
/// Job info handler and service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,79 @@ TEST_F(GcsPlacementGroupSchedulerTest, TestCommitToDeadNodes) {
ASSERT_TRUE(EnsureClusterResourcesAreNotInUse());
}

TEST_F(GcsPlacementGroupSchedulerTest, TestWaitingRemovedBundles) {
// This feature is only required by gcs actor scheduler.
RayConfig::instance().initialize(R"({"gcs_actor_scheduling_enabled": true})");

auto node = Mocker::GenNodeInfo();
AddNode(node);
ASSERT_EQ(1, gcs_node_manager_->GetAllAliveNodes().size());

auto create_placement_group_request = Mocker::GenCreatePlacementGroupRequest();
auto placement_group =
std::make_shared<gcs::GcsPlacementGroup>(create_placement_group_request, "");

// Schedule the placement_group with 1 available node, and the lease request should be
// send to the node.
scheduler_->ScheduleUnplacedBundles(
placement_group,
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group,
bool is_insfeasble) {
absl::MutexLock lock(&placement_group_requests_mutex_);
failure_placement_groups_.emplace_back(std::move(placement_group));
},
[this](std::shared_ptr<gcs::GcsPlacementGroup> placement_group) {
absl::MutexLock lock(&placement_group_requests_mutex_);
success_placement_groups_.emplace_back(std::move(placement_group));
});
ASSERT_TRUE(raylet_clients_[0]->GrantPrepareBundleResources());
WaitPendingDone(raylet_clients_[0]->commit_callbacks, 1);
ASSERT_TRUE(raylet_clients_[0]->GrantCommitBundleResources());
WaitPlacementGroupPendingDone(0, GcsPlacementGroupStatus::FAILURE);
WaitPlacementGroupPendingDone(1, GcsPlacementGroupStatus::SUCCESS);

// Assume bundle (and wildcard) resources are acquired by actors.
for (const auto &bundle : placement_group->GetBundles()) {
for (const auto &resource_entry : bundle->GetFormattedResources()) {
cluster_resource_scheduler_->GetClusterResourceManager()
.SubtractNodeAvailableResources(
scheduling::NodeID(node->node_id()),
ResourceRequest({{scheduling::ResourceID(resource_entry.first),
FixedPoint(resource_entry.second)}}));
}
}

// Remove the placement group.
const auto &placement_group_id = placement_group->GetPlacementGroupID();
scheduler_->DestroyPlacementGroupBundleResourcesIfExists(placement_group_id);
ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve());
ASSERT_TRUE(raylet_clients_[0]->GrantCancelResourceReserve());

// Because actors have not released the bundle resources, bundles have to keep waiting.
ASSERT_EQ(scheduler_->GetWaitingRemovedBundlesSize(), 2);
const auto &node_resources =
cluster_resource_scheduler_->GetClusterResourceManager().GetNodeResources(
scheduling::NodeID(node->node_id()));
ASSERT_NE(node_resources.available.Get(scheduling::ResourceID::CPU()),
node_resources.total.Get(scheduling::ResourceID::CPU()));

// Assume actors are releasing the bundle resources.
for (const auto &bundle : placement_group->GetBundles()) {
for (const auto &resource_entry : bundle->GetFormattedResources()) {
cluster_resource_scheduler_->GetClusterResourceManager().AddNodeAvailableResources(
scheduling::NodeID(node->node_id()),
ResourceRequest({{scheduling::ResourceID(resource_entry.first),
FixedPoint(resource_entry.second)}}));
}
}

scheduler_->HandleWaitingRemovedBundles();
// The waiting bundles are removed, and resources are successfully returned to node.
ASSERT_EQ(scheduler_->GetWaitingRemovedBundlesSize(), 0);
ASSERT_EQ(node_resources.available.Get(scheduling::ResourceID::CPU()),
node_resources.total.Get(scheduling::ResourceID::CPU()));
}

} // namespace ray

int main(int argc, char **argv) {
Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_server/test/gcs_server_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ struct GcsServerMocker {
class MockedGcsPlacementGroupScheduler : public gcs::GcsPlacementGroupScheduler {
public:
using gcs::GcsPlacementGroupScheduler::GcsPlacementGroupScheduler;

size_t GetWaitingRemovedBundlesSize() { return waiting_removed_bundles_.size(); }
};
class MockedGcsActorTable : public gcs::GcsActorTable {
public:
Expand Down