Skip to content

Commit

Permalink
[resource-reporting 3/n] further clean up LocalResourceManager (#21927)
Browse files Browse the repository at this point in the history
* clean up

* address comments
  • Loading branch information
scv119 authored Jan 28, 2022
1 parent 069c499 commit 0ff8bfa
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 68 deletions.
2 changes: 1 addition & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ void NodeManager::FillResourceReport(rpc::ResourcesData &resources_data) {
resources_data.set_node_manager_address(initial_config_.node_manager_address);
// Update local cache from gcs remote cache, this is needed when gcs restart.
// We should always keep the cache view consistent.
cluster_resource_scheduler_->GetLocalResourceManager().UpdateLastResourceUsage(
cluster_resource_scheduler_->GetLocalResourceManager().ResetLastReportResourceUsage(
gcs_client_->NodeResources().GetLastResourceUsage());
cluster_resource_scheduler_->GetLocalResourceManager().FillResourceUsage(
resources_data);
Expand Down
5 changes: 0 additions & 5 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,6 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface {
void DeleteResource(const std::string &node_name,
const std::string &resource_name) override;

/// Return local resources.
NodeResourceInstances GetLocalResources() const {
return local_resource_manager_->GetLocalResources();
};

/// Return local resources in human-readable string form.
std::string GetLocalResourceViewString() const override;

Expand Down
85 changes: 58 additions & 27 deletions src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,9 @@ TEST_F(ClusterResourceSchedulerTest, GetLocalAvailableResourcesWithCpuUnitTest)
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);

TaskResourceInstances available_cluster_resources =
resource_scheduler.GetLocalResources().GetAvailableResourceInstances();
resource_scheduler.GetLocalResourceManager()
.GetLocalResources()
.GetAvailableResourceInstances();

TaskResourceInstances expected_cluster_resources;
addTaskResourceInstances(true, {1., 1., 1.}, 0, &expected_cluster_resources);
Expand Down Expand Up @@ -604,7 +606,9 @@ TEST_F(ClusterResourceSchedulerTest, GetLocalAvailableResourcesTest) {
ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_);

TaskResourceInstances available_cluster_resources =
resource_scheduler.GetLocalResources().GetAvailableResourceInstances();
resource_scheduler.GetLocalResourceManager()
.GetLocalResources()
.GetAvailableResourceInstances();

TaskResourceInstances expected_cluster_resources;
addTaskResourceInstances(true, {3.}, 0, &expected_cluster_resources);
Expand Down Expand Up @@ -675,7 +679,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) {
initResourceRequest(resource_request, pred_demands, EmptyIntVector,
EmptyFixedPointVector);

NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources();
NodeResourceInstances old_local_resources =
resource_scheduler.GetLocalResourceManager().GetLocalResources();

std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
Expand All @@ -688,7 +693,9 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) {
resource_scheduler.GetLocalResourceManager().FreeTaskResourceInstances(
task_allocation);

ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true);
ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() ==
old_local_resources),
true);
}
// Try to allocate resources for a task request that overallocates a hard constrained
// resource.
Expand All @@ -704,15 +711,18 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) {
initResourceRequest(resource_request, pred_demands, EmptyIntVector,
EmptyFixedPointVector);

NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources();
NodeResourceInstances old_local_resources =
resource_scheduler.GetLocalResourceManager().GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
bool success =
resource_scheduler.GetLocalResourceManager().AllocateTaskResourceInstances(
resource_request, task_allocation);

ASSERT_EQ(success, false);
ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true);
ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() ==
old_local_resources),
true);
}
// Allocate resources for a task request specifying both predefined and custom
// resources.
Expand All @@ -729,7 +739,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) {
vector<FixedPoint> cust_demands{3, 2};
initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands);

NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources();
NodeResourceInstances old_local_resources =
resource_scheduler.GetLocalResourceManager().GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
bool success =
Expand All @@ -741,7 +752,9 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) {
resource_scheduler.GetLocalResourceManager().FreeTaskResourceInstances(
task_allocation);

ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true);
ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() ==
old_local_resources),
true);
}
// Allocate resources for a task request specifying both predefined and custom
// resources, but overallocates a hard-constrained custom resource.
Expand All @@ -758,15 +771,18 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) {
vector<FixedPoint> cust_demands{3, 10};
initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands);

NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources();
NodeResourceInstances old_local_resources =
resource_scheduler.GetLocalResourceManager().GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
bool success =
resource_scheduler.GetLocalResourceManager().AllocateTaskResourceInstances(
resource_request, task_allocation);

ASSERT_EQ(success, false);
ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true);
ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() ==
old_local_resources),
true);
}
}

Expand All @@ -785,7 +801,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesAllocationFailureTest)
vector<FixedPoint> cust_demands{3, 3, 4};
initResourceRequest(resource_request, pred_demands, req_cust_ids, cust_demands);

NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources();
NodeResourceInstances old_local_resources =
resource_scheduler.GetLocalResourceManager().GetLocalResources();
std::shared_ptr<TaskResourceInstances> task_allocation =
std::make_shared<TaskResourceInstances>();
bool success =
Expand All @@ -794,7 +811,9 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesAllocationFailureTest)

ASSERT_EQ(success, false);
// resource_scheduler.FreeTaskResourceInstances(task_allocation);
ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true);
ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() ==
old_local_resources),
true);
}

TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) {
Expand All @@ -817,14 +836,17 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) {
resource_scheduler.GetLocalResourceManager().AllocateTaskResourceInstances(
resource_request, task_allocation);

NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources();
NodeResourceInstances old_local_resources =
resource_scheduler.GetLocalResourceManager().GetLocalResources();
ASSERT_EQ(success, true);
std::vector<double> cpu_instances = task_allocation->GetCPUInstancesDouble();
resource_scheduler.GetLocalResourceManager().AddCPUResourceInstances(cpu_instances);
resource_scheduler.GetLocalResourceManager().SubtractCPUResourceInstances(
cpu_instances);

ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true);
ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() ==
old_local_resources),
true);
}
}

Expand Down Expand Up @@ -861,16 +883,19 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) {
std::vector<double> allocate_gpu_instances{0.5, 0.5, 0.5, 0.5};
resource_scheduler.GetLocalResourceManager().SubtractGPUResourceInstances(
allocate_gpu_instances);
std::vector<double> available_gpu_instances = resource_scheduler.GetLocalResources()
.GetAvailableResourceInstances()
.GetGPUInstancesDouble();
std::vector<double> available_gpu_instances =
resource_scheduler.GetLocalResourceManager()
.GetLocalResources()
.GetAvailableResourceInstances()
.GetGPUInstancesDouble();
std::vector<double> expected_available_gpu_instances{0.5, 0.5, 0.5, 0.5};
ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(),
expected_available_gpu_instances.begin()));

resource_scheduler.GetLocalResourceManager().AddGPUResourceInstances(
allocate_gpu_instances);
available_gpu_instances = resource_scheduler.GetLocalResources()
available_gpu_instances = resource_scheduler.GetLocalResourceManager()
.GetLocalResources()
.GetAvailableResourceInstances()
.GetGPUInstancesDouble();
expected_available_gpu_instances = {1., 1., 1., 1.};
Expand All @@ -884,7 +909,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) {
std::vector<double> expected_underflow{.5, .5, 0., .5};
ASSERT_TRUE(
std::equal(underflow.begin(), underflow.end(), expected_underflow.begin()));
available_gpu_instances = resource_scheduler.GetLocalResources()
available_gpu_instances = resource_scheduler.GetLocalResourceManager()
.GetLocalResources()
.GetAvailableResourceInstances()
.GetGPUInstancesDouble();
expected_available_gpu_instances = {0., 0., 0.5, 0.};
Expand All @@ -897,7 +923,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) {
allocate_gpu_instances);
std::vector<double> expected_overflow{.0, .0, .5, 0.};
ASSERT_TRUE(std::equal(overflow.begin(), overflow.end(), expected_overflow.begin()));
available_gpu_instances = resource_scheduler.GetLocalResources()
available_gpu_instances = resource_scheduler.GetLocalResourceManager()
.GetLocalResources()
.GetAvailableResourceInstances()
.GetGPUInstancesDouble();
expected_available_gpu_instances = {1., .5, 1., .5};
Expand All @@ -922,9 +949,11 @@ TEST_F(ClusterResourceSchedulerTest,
// UpdateLocalAvailableResourcesFromResourceInstances() under the hood.
resource_scheduler.GetLocalResourceManager().SubtractGPUResourceInstances(
allocate_gpu_instances);
std::vector<double> available_gpu_instances = resource_scheduler.GetLocalResources()
.GetAvailableResourceInstances()
.GetGPUInstancesDouble();
std::vector<double> available_gpu_instances =
resource_scheduler.GetLocalResourceManager()
.GetLocalResources()
.GetAvailableResourceInstances()
.GetGPUInstancesDouble();
std::vector<double> expected_available_gpu_instances{0.5, 0.5, 0., 0.5};
ASSERT_TRUE(std::equal(available_gpu_instances.begin(),
available_gpu_instances.end(),
Expand All @@ -941,9 +970,11 @@ TEST_F(ClusterResourceSchedulerTest,
// UpdateLocalAvailableResourcesFromResourceInstances() under the hood.
resource_scheduler.GetLocalResourceManager().AddGPUResourceInstances(
allocate_gpu_instances);
std::vector<double> available_gpu_instances = resource_scheduler.GetLocalResources()
.GetAvailableResourceInstances()
.GetGPUInstancesDouble();
std::vector<double> available_gpu_instances =
resource_scheduler.GetLocalResourceManager()
.GetLocalResources()
.GetAvailableResourceInstances()
.GetGPUInstancesDouble();
std::vector<double> expected_available_gpu_instances{1., 1., 1., 0.8};
ASSERT_TRUE(std::equal(available_gpu_instances.begin(),
available_gpu_instances.end(),
Expand Down Expand Up @@ -1098,7 +1129,7 @@ TEST_F(ClusterResourceSchedulerTest, ResourceUsageReportTest) {
resource_scheduler.GetLocalResourceManager().AllocateLocalTaskResources(
allocation_map, allocations);
rpc::ResourcesData data;
resource_scheduler.GetLocalResourceManager().UpdateLastResourceUsage(
resource_scheduler.GetLocalResourceManager().ResetLastReportResourceUsage(
std::make_shared<SchedulingResources>());
resource_scheduler.GetLocalResourceManager().FillResourceUsage(data);

Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/scheduling/cluster_task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,8 @@ TEST_F(ClusterTaskManagerTestWithGPUsAtHead, RleaseAndReturnWorkerCpuResources)
// Check failed as the worker has no allocated resource instances.
ASSERT_FALSE(task_manager_.ReleaseCpuResourcesFromUnblockedWorker(worker));

auto node_resource_instances = scheduler_->GetLocalResources();
auto node_resource_instances =
scheduler_->GetLocalResourceManager().GetLocalResources();
auto available_resource_instances =
node_resource_instances.GetAvailableResourceInstances();

Expand Down
12 changes: 5 additions & 7 deletions src/ray/raylet/scheduling/local_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,6 @@ bool LocalResourceManager::AllocateTaskResourceInstances(
return false;
}
}

OnResourceChanged();
return true;
}

Expand All @@ -378,7 +376,6 @@ void LocalResourceManager::FreeTaskResourceInstances(
AddAvailableResourceInstances(task_allocation_custom_resource.second, &it->second);
}
}
OnResourceChanged();
}

std::vector<double> LocalResourceManager::AddCPUResourceInstances(
Expand Down Expand Up @@ -656,11 +653,12 @@ std::string LocalResourceManager::SerializedTaskResourceInstances(
return buffer.str();
}

void LocalResourceManager::UpdateLastResourceUsage(
std::shared_ptr<SchedulingResources> gcs_resources) {
void LocalResourceManager::ResetLastReportResourceUsage(
std::shared_ptr<SchedulingResources> replacement) {
last_report_resources_ = std::make_unique<NodeResources>(ResourceMapToNodeResources(
resource_name_to_id_, gcs_resources->GetTotalResources().GetResourceMap(),
gcs_resources->GetAvailableResources().GetResourceMap()));
resource_name_to_id_, replacement->GetTotalResources().GetResourceMap(),
replacement->GetAvailableResources().GetResourceMap()));
OnResourceChanged();
}

bool LocalResourceManager::ResourcesExist(const std::string &resource_name) {
Expand Down
Loading

0 comments on commit 0ff8bfa

Please sign in to comment.