From 0ff8bfacec3f49f6a7304704cc1c9df46a296bd7 Mon Sep 17 00:00:00 2001 From: Chen Shen Date: Fri, 28 Jan 2022 01:50:54 -0800 Subject: [PATCH] [resource-reporting 3/n] further clean up LocalResourceManager (#21927) * clean up * address comments --- src/ray/raylet/node_manager.cc | 2 +- .../scheduling/cluster_resource_scheduler.h | 5 -- .../cluster_resource_scheduler_test.cc | 85 +++++++++++++------ .../scheduling/cluster_task_manager_test.cc | 3 +- .../scheduling/local_resource_manager.cc | 12 ++- .../scheduling/local_resource_manager.h | 60 +++++++------ 6 files changed, 99 insertions(+), 68 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 6199eb213914..1cd79811040e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -587,7 +587,7 @@ void NodeManager::FillResourceReport(rpc::ResourcesData &resources_data) { resources_data.set_node_manager_address(initial_config_.node_manager_address); // Update local cache from gcs remote cache, this is needed when gcs restart. // We should always keep the cache view consistent. - cluster_resource_scheduler_->GetLocalResourceManager().UpdateLastResourceUsage( + cluster_resource_scheduler_->GetLocalResourceManager().ResetLastReportResourceUsage( gcs_client_->NodeResources().GetLastResourceUsage()); cluster_resource_scheduler_->GetLocalResourceManager().FillResourceUsage( resources_data); diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 20f9d361081e..2f8a5cd3c714 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -136,11 +136,6 @@ class ClusterResourceScheduler : public ClusterResourceSchedulerInterface { void DeleteResource(const std::string &node_name, const std::string &resource_name) override; - /// Return local resources. - NodeResourceInstances GetLocalResources() const { - return local_resource_manager_->GetLocalResources(); - }; - /// Return local resources in human-readable string form. std::string GetLocalResourceViewString() const override; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index 7876216ec78d..ddf7a3f3573d 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -574,7 +574,9 @@ TEST_F(ClusterResourceSchedulerTest, GetLocalAvailableResourcesWithCpuUnitTest) ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); TaskResourceInstances available_cluster_resources = - resource_scheduler.GetLocalResources().GetAvailableResourceInstances(); + resource_scheduler.GetLocalResourceManager() + .GetLocalResources() + .GetAvailableResourceInstances(); TaskResourceInstances expected_cluster_resources; addTaskResourceInstances(true, {1., 1., 1.}, 0, &expected_cluster_resources); @@ -604,7 +606,9 @@ TEST_F(ClusterResourceSchedulerTest, GetLocalAvailableResourcesTest) { ClusterResourceScheduler resource_scheduler(0, node_resources, *gcs_client_); TaskResourceInstances available_cluster_resources = - resource_scheduler.GetLocalResources().GetAvailableResourceInstances(); + resource_scheduler.GetLocalResourceManager() + .GetLocalResources() + .GetAvailableResourceInstances(); TaskResourceInstances expected_cluster_resources; addTaskResourceInstances(true, {3.}, 0, &expected_cluster_resources); @@ -675,7 +679,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { initResourceRequest(resource_request, pred_demands, EmptyIntVector, EmptyFixedPointVector); - NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); + NodeResourceInstances old_local_resources = + resource_scheduler.GetLocalResourceManager().GetLocalResources(); std::shared_ptr task_allocation = std::make_shared(); @@ -688,7 +693,9 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { resource_scheduler.GetLocalResourceManager().FreeTaskResourceInstances( task_allocation); - ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true); + ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() == + old_local_resources), + true); } // Try to allocate resources for a task request that overallocates a hard constrained // resource. @@ -704,7 +711,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { initResourceRequest(resource_request, pred_demands, EmptyIntVector, EmptyFixedPointVector); - NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); + NodeResourceInstances old_local_resources = + resource_scheduler.GetLocalResourceManager().GetLocalResources(); std::shared_ptr task_allocation = std::make_shared(); bool success = @@ -712,7 +720,9 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { resource_request, task_allocation); ASSERT_EQ(success, false); - ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true); + ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() == + old_local_resources), + true); } // Allocate resources for a task request specifying both predefined and custom // resources. @@ -729,7 +739,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector cust_demands{3, 2}; initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands); - NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); + NodeResourceInstances old_local_resources = + resource_scheduler.GetLocalResourceManager().GetLocalResources(); std::shared_ptr task_allocation = std::make_shared(); bool success = @@ -741,7 +752,9 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { resource_scheduler.GetLocalResourceManager().FreeTaskResourceInstances( task_allocation); - ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true); + ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() == + old_local_resources), + true); } // Allocate resources for a task request specifying both predefined and custom // resources, but overallocates a hard-constrained custom resource. @@ -758,7 +771,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { vector cust_demands{3, 10}; initResourceRequest(resource_request, pred_demands, cust_ids, cust_demands); - NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); + NodeResourceInstances old_local_resources = + resource_scheduler.GetLocalResourceManager().GetLocalResources(); std::shared_ptr task_allocation = std::make_shared(); bool success = @@ -766,7 +780,9 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest) { resource_request, task_allocation); ASSERT_EQ(success, false); - ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true); + ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() == + old_local_resources), + true); } } @@ -785,7 +801,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesAllocationFailureTest) vector cust_demands{3, 3, 4}; initResourceRequest(resource_request, pred_demands, req_cust_ids, cust_demands); - NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); + NodeResourceInstances old_local_resources = + resource_scheduler.GetLocalResourceManager().GetLocalResources(); std::shared_ptr task_allocation = std::make_shared(); bool success = @@ -794,7 +811,9 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesAllocationFailureTest) ASSERT_EQ(success, false); // resource_scheduler.FreeTaskResourceInstances(task_allocation); - ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true); + ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() == + old_local_resources), + true); } TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) { @@ -817,14 +836,17 @@ TEST_F(ClusterResourceSchedulerTest, TaskResourceInstancesTest2) { resource_scheduler.GetLocalResourceManager().AllocateTaskResourceInstances( resource_request, task_allocation); - NodeResourceInstances old_local_resources = resource_scheduler.GetLocalResources(); + NodeResourceInstances old_local_resources = + resource_scheduler.GetLocalResourceManager().GetLocalResources(); ASSERT_EQ(success, true); std::vector cpu_instances = task_allocation->GetCPUInstancesDouble(); resource_scheduler.GetLocalResourceManager().AddCPUResourceInstances(cpu_instances); resource_scheduler.GetLocalResourceManager().SubtractCPUResourceInstances( cpu_instances); - ASSERT_EQ((resource_scheduler.GetLocalResources() == old_local_resources), true); + ASSERT_EQ((resource_scheduler.GetLocalResourceManager().GetLocalResources() == + old_local_resources), + true); } } @@ -861,16 +883,19 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) { std::vector allocate_gpu_instances{0.5, 0.5, 0.5, 0.5}; resource_scheduler.GetLocalResourceManager().SubtractGPUResourceInstances( allocate_gpu_instances); - std::vector available_gpu_instances = resource_scheduler.GetLocalResources() - .GetAvailableResourceInstances() - .GetGPUInstancesDouble(); + std::vector available_gpu_instances = + resource_scheduler.GetLocalResourceManager() + .GetLocalResources() + .GetAvailableResourceInstances() + .GetGPUInstancesDouble(); std::vector expected_available_gpu_instances{0.5, 0.5, 0.5, 0.5}; ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(), expected_available_gpu_instances.begin())); resource_scheduler.GetLocalResourceManager().AddGPUResourceInstances( allocate_gpu_instances); - available_gpu_instances = resource_scheduler.GetLocalResources() + available_gpu_instances = resource_scheduler.GetLocalResourceManager() + .GetLocalResources() .GetAvailableResourceInstances() .GetGPUInstancesDouble(); expected_available_gpu_instances = {1., 1., 1., 1.}; @@ -884,7 +909,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) { std::vector expected_underflow{.5, .5, 0., .5}; ASSERT_TRUE( std::equal(underflow.begin(), underflow.end(), expected_underflow.begin())); - available_gpu_instances = resource_scheduler.GetLocalResources() + available_gpu_instances = resource_scheduler.GetLocalResourceManager() + .GetLocalResources() .GetAvailableResourceInstances() .GetGPUInstancesDouble(); expected_available_gpu_instances = {0., 0., 0.5, 0.}; @@ -897,7 +923,8 @@ TEST_F(ClusterResourceSchedulerTest, TaskGPUResourceInstancesTest) { allocate_gpu_instances); std::vector expected_overflow{.0, .0, .5, 0.}; ASSERT_TRUE(std::equal(overflow.begin(), overflow.end(), expected_overflow.begin())); - available_gpu_instances = resource_scheduler.GetLocalResources() + available_gpu_instances = resource_scheduler.GetLocalResourceManager() + .GetLocalResources() .GetAvailableResourceInstances() .GetGPUInstancesDouble(); expected_available_gpu_instances = {1., .5, 1., .5}; @@ -922,9 +949,11 @@ TEST_F(ClusterResourceSchedulerTest, // UpdateLocalAvailableResourcesFromResourceInstances() under the hood. resource_scheduler.GetLocalResourceManager().SubtractGPUResourceInstances( allocate_gpu_instances); - std::vector available_gpu_instances = resource_scheduler.GetLocalResources() - .GetAvailableResourceInstances() - .GetGPUInstancesDouble(); + std::vector available_gpu_instances = + resource_scheduler.GetLocalResourceManager() + .GetLocalResources() + .GetAvailableResourceInstances() + .GetGPUInstancesDouble(); std::vector expected_available_gpu_instances{0.5, 0.5, 0., 0.5}; ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(), @@ -941,9 +970,11 @@ TEST_F(ClusterResourceSchedulerTest, // UpdateLocalAvailableResourcesFromResourceInstances() under the hood. resource_scheduler.GetLocalResourceManager().AddGPUResourceInstances( allocate_gpu_instances); - std::vector available_gpu_instances = resource_scheduler.GetLocalResources() - .GetAvailableResourceInstances() - .GetGPUInstancesDouble(); + std::vector available_gpu_instances = + resource_scheduler.GetLocalResourceManager() + .GetLocalResources() + .GetAvailableResourceInstances() + .GetGPUInstancesDouble(); std::vector expected_available_gpu_instances{1., 1., 1., 0.8}; ASSERT_TRUE(std::equal(available_gpu_instances.begin(), available_gpu_instances.end(), @@ -1098,7 +1129,7 @@ TEST_F(ClusterResourceSchedulerTest, ResourceUsageReportTest) { resource_scheduler.GetLocalResourceManager().AllocateLocalTaskResources( allocation_map, allocations); rpc::ResourcesData data; - resource_scheduler.GetLocalResourceManager().UpdateLastResourceUsage( + resource_scheduler.GetLocalResourceManager().ResetLastReportResourceUsage( std::make_shared()); resource_scheduler.GetLocalResourceManager().FillResourceUsage(data); diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 38cfccd17fd8..c934f317973a 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -1442,7 +1442,8 @@ TEST_F(ClusterTaskManagerTestWithGPUsAtHead, RleaseAndReturnWorkerCpuResources) // Check failed as the worker has no allocated resource instances. ASSERT_FALSE(task_manager_.ReleaseCpuResourcesFromUnblockedWorker(worker)); - auto node_resource_instances = scheduler_->GetLocalResources(); + auto node_resource_instances = + scheduler_->GetLocalResourceManager().GetLocalResources(); auto available_resource_instances = node_resource_instances.GetAvailableResourceInstances(); diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index e04be391339f..5ab05ce08c06 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -358,8 +358,6 @@ bool LocalResourceManager::AllocateTaskResourceInstances( return false; } } - - OnResourceChanged(); return true; } @@ -378,7 +376,6 @@ void LocalResourceManager::FreeTaskResourceInstances( AddAvailableResourceInstances(task_allocation_custom_resource.second, &it->second); } } - OnResourceChanged(); } std::vector LocalResourceManager::AddCPUResourceInstances( @@ -656,11 +653,12 @@ std::string LocalResourceManager::SerializedTaskResourceInstances( return buffer.str(); } -void LocalResourceManager::UpdateLastResourceUsage( - std::shared_ptr gcs_resources) { +void LocalResourceManager::ResetLastReportResourceUsage( + std::shared_ptr replacement) { last_report_resources_ = std::make_unique(ResourceMapToNodeResources( - resource_name_to_id_, gcs_resources->GetTotalResources().GetResourceMap(), - gcs_resources->GetAvailableResources().GetResourceMap())); + resource_name_to_id_, replacement->GetTotalResources().GetResourceMap(), + replacement->GetAvailableResources().GetResourceMap())); + OnResourceChanged(); } bool LocalResourceManager::ResourcesExist(const std::string &resource_name) { diff --git a/src/ray/raylet/scheduling/local_resource_manager.h b/src/ray/raylet/scheduling/local_resource_manager.h index 6e9229a11837..9bbc8e97d9db 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.h +++ b/src/ray/raylet/scheduling/local_resource_manager.h @@ -26,18 +26,17 @@ #include "ray/gcs/gcs_client/accessor.h" #include "ray/gcs/gcs_client/gcs_client.h" #include "ray/raylet/scheduling/cluster_resource_data.h" -#include "ray/raylet/scheduling/cluster_resource_scheduler_interface.h" #include "ray/raylet/scheduling/fixed_point.h" -#include "ray/raylet/scheduling/scheduling_ids.h" -#include "ray/raylet/scheduling/scheduling_policy.h" #include "ray/util/logging.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { -using rpc::HeartbeatTableData; - /// Class manages the resources of the local node. +/// It is responsible for allocating/deallocating resources for (task) resource request; +/// it also supports creating a new resource or delete an existing resource. +/// Whenever the resouce changes, it notifies the subscriber of the change. +/// This class is not thread safe. class LocalResourceManager { public: LocalResourceManager( @@ -69,24 +68,6 @@ class LocalResourceManager { /// Return local resources. NodeResourceInstances GetLocalResources() const { return local_resources_; } - /// Allocate local resources to satisfy a given request (resource_request). - /// - /// \param resource_request: Resources requested by a task. - /// \param task_allocation: Local resources allocated to satsify resource_request - /// demand. - /// - /// \return true, if allocation successful. If false, the caller needs to free the - /// allocated resources, i.e., task_allocation. - bool AllocateTaskResourceInstances( - const ResourceRequest &resource_request, - std::shared_ptr task_allocation); - - /// Free resources which were allocated with a task. The freed resources are - /// added back to the node's local available resources. - /// - /// \param task_allocation: Task's resources to be freed. - void FreeTaskResourceInstances(std::shared_ptr task_allocation); - /// Increase the available CPU instances of this node. /// /// \param cpu_instances CPU instances to be added to available cpus. @@ -173,11 +154,11 @@ class LocalResourceManager { std::string SerializedTaskResourceInstances( std::shared_ptr task_allocation) const; - /// Update last report resources local cache from gcs cache, - /// this is needed when gcs fo. + /// Replace the local resources by the provided value. /// - /// \param gcs_resources: The remote cache from gcs. - void UpdateLastResourceUsage(const std::shared_ptr gcs_resources); + /// \param replacement: the new value. + void ResetLastReportResourceUsage( + const std::shared_ptr replacement); /// Check whether the specific resource exists or not in local node. /// @@ -208,6 +189,7 @@ class LocalResourceManager { /// Init the information about which resources are unit_instance. void InitResourceUnitInstanceInfo(); + /// Notify the subscriber that the local resouces has changed. void OnResourceChanged(); /// Increase the available capacities of the instances of a given resource. @@ -268,6 +250,24 @@ class LocalResourceManager { bool AllocateResourceInstances(FixedPoint demand, std::vector &available, std::vector *allocation) const; + /// Allocate local resources to satisfy a given request (resource_request). + /// + /// \param resource_request: Resources requested by a task. + /// \param task_allocation: Local resources allocated to satsify resource_request + /// demand. + /// + /// \return true, if allocation successful. If false, the caller needs to free the + /// allocated resources, i.e., task_allocation. + bool AllocateTaskResourceInstances( + const ResourceRequest &resource_request, + std::shared_ptr task_allocation); + + /// Free resources which were allocated with a task. The freed resources are + /// added back to the node's local available resources. + /// + /// \param task_allocation: Task's resources to be freed. + void FreeTaskResourceInstances(std::shared_ptr task_allocation); + /// Identifier of local node. int64_t local_node_id_; /// Keep the mapping between node and resource IDs in string representation @@ -292,6 +292,12 @@ class LocalResourceManager { FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest); FRIEND_TEST(ClusterResourceSchedulerTest, AvailableResourceInstancesOpsTest); + FRIEND_TEST(ClusterResourceSchedulerTest, TaskResourceInstancesTest); + FRIEND_TEST(ClusterResourceSchedulerTest, TaskResourceInstancesAllocationFailureTest); + FRIEND_TEST(ClusterResourceSchedulerTest, TaskResourceInstancesTest2); + FRIEND_TEST(ClusterResourceSchedulerTest, TaskResourceInstanceWithHardRequestTest); + FRIEND_TEST(ClusterResourceSchedulerTest, TaskResourceInstanceWithoutCpuUnitTest); + FRIEND_TEST(ClusterResourceSchedulerTest, CustomResourceInstanceTest); }; } // end namespace ray