Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[autoscaler] Optimize resource constraints proto to dedup requests with the same shape #37863

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2332,12 +2332,13 @@ cdef class GcsClient:
def request_cluster_resource_constraint(
self,
bundles: c_vector[unordered_map[c_string, double]],
count_array: c_vector[int64_t],
timeout_s=None):
cdef:
int64_t timeout_ms = round(1000 * timeout_s) if timeout_s else -1
with nogil:
check_status(self.inner.get().RequestClusterResourceConstraint(
timeout_ms, bundles))
timeout_ms, bundles, count_array))

@_auto_reconnect
def get_cluster_status(
Expand Down
18 changes: 17 additions & 1 deletion python/ray/autoscaler/v2/sdk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from collections import defaultdict
from typing import List, Optional

import ray
Expand Down Expand Up @@ -43,7 +44,22 @@ def request_cluster_resources(
timeout: Timeout in seconds for the request to be timeout

"""
get_gcs_client().request_cluster_resource_constraint(to_request, timeout_s=timeout)

# Aggregate bundle by shape.
resource_requests_by_count = defaultdict(int)
for request in to_request:
bundle = frozenset(request.items())
resource_requests_by_count[bundle] += 1

bundles = []
counts = []
for bundle, count in resource_requests_by_count.items():
bundles.append(dict(bundle))
counts.append(count)

get_gcs_client().request_cluster_resource_constraint(
bundles, counts, timeout_s=timeout
)


def get_cluster_status(
Expand Down
36 changes: 28 additions & 8 deletions python/ray/autoscaler/v2/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_node_ids() -> Tuple[str, List[str]]:


def assert_cluster_resource_constraints(
state: ClusterResourceState, expected: List[dict]
state: ClusterResourceState, expected_bundles: List[dict], expected_count: List[int]
):
"""
Assert a GetClusterResourceStateReply has cluster_resource_constraints that
Expand All @@ -62,16 +62,26 @@ def assert_cluster_resource_constraints(
assert len(state.cluster_resource_constraints) == 1

min_bundles = state.cluster_resource_constraints[0].min_bundles
assert len(min_bundles) == len(expected)
assert len(min_bundles) == len(expected_bundles) == len(expected_count)

# Sort all the bundles by bundle's resource names
min_bundles = sorted(
min_bundles, key=lambda bundle: "".join(bundle.resources_bundle.keys())
min_bundles,
key=lambda bundle_by_count: "".join(
bundle_by_count.request.resources_bundle.keys()
),
)
expected = zip(expected_bundles, expected_count)
expected = sorted(
expected, key=lambda bundle_count: "".join(bundle_count[0].keys())
)
expected = sorted(expected, key=lambda bundle: "".join(bundle.keys()))

for actual_bundle, expected_bundle in zip(min_bundles, expected):
assert dict(actual_bundle.resources_bundle) == expected_bundle
for actual_bundle_count, expected_bundle_count in zip(min_bundles, expected):
assert (
dict(actual_bundle_count.request.resources_bundle)
== expected_bundle_count[0]
)
assert actual_bundle_count.count == expected_bundle_count[1]


@dataclass
Expand Down Expand Up @@ -240,7 +250,7 @@ def test_request_cluster_resources_basic(shutdown_only):

def verify():
state = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(state, [{"CPU": 1}])
assert_cluster_resource_constraints(state, [{"CPU": 1}], [1])
return True

wait_for_condition(verify)
Expand All @@ -250,7 +260,17 @@ def verify():

def verify():
state = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(state, [{"CPU": 2, "GPU": 1}, {"CPU": 1}])
assert_cluster_resource_constraints(
state, [{"CPU": 2, "GPU": 1}, {"CPU": 1}], [1, 1]
)
return True

# Request multiple is aggregated by shape.
request_cluster_resources([{"CPU": 1}] * 100)

def verify():
state = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(state, [{"CPU": 1}], [100])
return True

wait_for_condition(verify)
Expand Down
7 changes: 5 additions & 2 deletions python/ray/autoscaler/v2/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ def test_cluster_status_parser_cluster_resource_state():
{
"min_bundles": [
{
"resources_bundle": {"GPU": 2, "CPU": 100},
"placement_constraints": [],
"request": {
"resources_bundle": {"GPU": 2, "CPU": 100},
"placement_constraints": [],
},
"count": 1,
},
]
}
Expand Down
9 changes: 6 additions & 3 deletions python/ray/autoscaler/v2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,12 @@ def _parse_resource_demands(

for constraint_request in state.cluster_resource_constraints:
demand = ClusterConstraintDemand(
bundles_by_count=cls._aggregate_resource_requests_by_shape(
constraint_request.min_bundles
),
bundles_by_count=[
ResourceRequestByCount(
bundle=dict(r.request.resources_bundle.items()), count=r.count
)
for r in constraint_request.min_bundles
]
)
constraint_demand.append(demand)

Expand Down
3 changes: 2 additions & 1 deletion python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
int64_t timeout_ms, c_vector[CJobTableData]& result)
CRayStatus RequestClusterResourceConstraint(
int64_t timeout_ms,
const c_vector[unordered_map[c_string, double]] &bundles)
const c_vector[unordered_map[c_string, double]] &bundles,
const c_vector[int64_t] &count_array)
CRayStatus GetClusterStatus(
int64_t timeout_ms,
c_string &serialized_reply)
Expand Down
18 changes: 12 additions & 6 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,18 +399,24 @@ Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms,

Status PythonGcsClient::RequestClusterResourceConstraint(
int64_t timeout_ms,
const std::vector<std::unordered_map<std::string, double>> &bundles) {
const std::vector<std::unordered_map<std::string, double>> &bundles,
const std::vector<int64_t> &count_array) {
grpc::ClientContext context;
GrpcClientContextWithTimeoutMs(context, timeout_ms);

rpc::autoscaler::RequestClusterResourceConstraintRequest request;
rpc::autoscaler::RequestClusterResourceConstraintReply reply;
RAY_CHECK(bundles.size() == count_array.size());
for (size_t i = 0; i < bundles.size(); ++i) {
const auto &bundle = bundles[i];
auto count = count_array[i];

for (auto bundle : bundles) {
request.mutable_cluster_resource_constraint()
->add_min_bundles()
->mutable_resources_bundle()
->insert(bundle.begin(), bundle.end());
auto new_resource_requests_by_count =
request.mutable_cluster_resource_constraint()->add_min_bundles();

new_resource_requests_by_count->mutable_request()->mutable_resources_bundle()->insert(
bundle.begin(), bundle.end());
new_resource_requests_by_count->set_count(count);
}

grpc::Status status =
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_client/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ class RAY_EXPORT PythonGcsClient {
// For rpc::autoscaler::AutoscalerStateService
Status RequestClusterResourceConstraint(
int64_t timeout_ms,
const std::vector<std::unordered_map<std::string, double>> &bundles);
const std::vector<std::unordered_map<std::string, double>> &bundles,
const std::vector<int64_t> &count_array);
Status GetClusterStatus(int64_t timeout_ms, std::string &serialized_reply);

private:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,22 +593,22 @@ TEST_F(GcsAutoscalerStateManagerTest, TestClusterResourcesConstraint) {
// Generate one constraint.
{
RequestClusterResourceConstraint(
Mocker::GenClusterResourcesConstraint({{{"CPU", 2}, {"GPU", 1}}}));
Mocker::GenClusterResourcesConstraint({{{"CPU", 2}, {"GPU", 1}}}, {1}));
const auto &state = GetClusterResourceStateSync();
ASSERT_EQ(state.cluster_resource_constraints_size(), 1);
ASSERT_EQ(state.cluster_resource_constraints(0).min_bundles_size(), 1);
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0),
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0).request(),
{{"CPU", 2}, {"GPU", 1}});
}

// Override it
{
RequestClusterResourceConstraint(
Mocker::GenClusterResourcesConstraint({{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}}}));
RequestClusterResourceConstraint(Mocker::GenClusterResourcesConstraint(
{{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}}}, {1}));
const auto &state = GetClusterResourceStateSync();
ASSERT_EQ(state.cluster_resource_constraints_size(), 1);
ASSERT_EQ(state.cluster_resource_constraints(0).min_bundles_size(), 1);
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0),
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0).request(),
{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}});
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/ray/gcs/test/gcs_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,17 @@ struct Mocker {
return placement_group_table_data;
}
static rpc::autoscaler::ClusterResourceConstraint GenClusterResourcesConstraint(
const std::vector<std::unordered_map<std::string, double>> &request_resources) {
const std::vector<std::unordered_map<std::string, double>> &request_resources,
const std::vector<int64_t> &count_array) {
rpc::autoscaler::ClusterResourceConstraint constraint;
for (const auto &resource : request_resources) {
RAY_CHECK(request_resources.size() == count_array.size());
for (size_t i = 0; i < request_resources.size(); i++) {
auto &resource = request_resources[i];
auto count = count_array[i];
auto bundle = constraint.add_min_bundles();
bundle->mutable_resources_bundle()->insert(resource.begin(), resource.end());
bundle->set_count(count);
bundle->mutable_request()->mutable_resources_bundle()->insert(resource.begin(),
resource.end());
}
return constraint;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/protobuf/experimental/autoscaler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ message GangResourceRequest {
message ClusterResourceConstraint {
// If not emtpy, the cluster should have the capacity (total resource) to fit
// the min_bundles.
repeated ResourceRequest min_bundles = 1;
repeated ResourceRequestByCount min_bundles = 1;
}

// Node status for a ray node.
Expand Down
Loading