Skip to content

Commit

Permalink
[autoscaler] Pick autosclaer v2 change 2/x (#37876)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: rickyyx <[email protected]>
  • Loading branch information
rickyyx authored Jul 28, 2023
1 parent b03a76b commit d05116e
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 60 deletions.
3 changes: 2 additions & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2319,12 +2319,13 @@ cdef class GcsClient:
def request_cluster_resource_constraint(
self,
bundles: c_vector[unordered_map[c_string, double]],
count_array: c_vector[int64_t],
timeout_s=None):
cdef:
int64_t timeout_ms = round(1000 * timeout_s) if timeout_s else -1
with nogil:
check_status(self.inner.get().RequestClusterResourceConstraint(
timeout_ms, bundles))
timeout_ms, bundles, count_array))

@_auto_reconnect
def get_cluster_status(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/autoscaler/_private/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ def format_info_string(

failure_lines = []
for ip, node_type in autoscaler_summary.failed_nodes:
line = f" {node_type}: RayletUnexpectedlyDied (ip: {ip})"
line = f" {node_type}: NodeTerminated (ip: {ip})"
failure_lines.append(line)
if autoscaler_summary.node_availability_summary:
records = sorted(
Expand Down
6 changes: 5 additions & 1 deletion python/ray/autoscaler/v2/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from enum import Enum
from typing import Dict, List, Optional

NODE_DEATH_CAUSE_RAYLET_DIED = "RayletUnexpectedlyDied"
# TODO(rickyx): once we have graceful shutdown, we could populate
# the failure detail with the actual termination message. As of now,
# we will use a more generic message to include cases such as:
# (idle termination, node death, crash, preemption, etc)
NODE_DEATH_CAUSE_RAYLET_DIED = "NodeTerminated"


@dataclass
Expand Down
18 changes: 17 additions & 1 deletion python/ray/autoscaler/v2/sdk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from collections import defaultdict
from typing import List, Optional

import ray
Expand Down Expand Up @@ -43,7 +44,22 @@ def request_cluster_resources(
timeout: Timeout in seconds for the request to be timeout
"""
get_gcs_client().request_cluster_resource_constraint(to_request, timeout_s=timeout)

# Aggregate bundle by shape.
resource_requests_by_count = defaultdict(int)
for request in to_request:
bundle = frozenset(request.items())
resource_requests_by_count[bundle] += 1

bundles = []
counts = []
for bundle, count in resource_requests_by_count.items():
bundles.append(dict(bundle))
counts.append(count)

get_gcs_client().request_cluster_resource_constraint(
bundles, counts, timeout_s=timeout
)


def get_cluster_status(
Expand Down
36 changes: 28 additions & 8 deletions python/ray/autoscaler/v2/tests/test_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def get_node_ids() -> Tuple[str, List[str]]:


def assert_cluster_resource_constraints(
state: ClusterResourceState, expected: List[dict]
state: ClusterResourceState, expected_bundles: List[dict], expected_count: List[int]
):
"""
Assert a GetClusterResourceStateReply has cluster_resource_constraints that
Expand All @@ -62,16 +62,26 @@ def assert_cluster_resource_constraints(
assert len(state.cluster_resource_constraints) == 1

min_bundles = state.cluster_resource_constraints[0].min_bundles
assert len(min_bundles) == len(expected)
assert len(min_bundles) == len(expected_bundles) == len(expected_count)

# Sort all the bundles by bundle's resource names
min_bundles = sorted(
min_bundles, key=lambda bundle: "".join(bundle.resources_bundle.keys())
min_bundles,
key=lambda bundle_by_count: "".join(
bundle_by_count.request.resources_bundle.keys()
),
)
expected = zip(expected_bundles, expected_count)
expected = sorted(
expected, key=lambda bundle_count: "".join(bundle_count[0].keys())
)
expected = sorted(expected, key=lambda bundle: "".join(bundle.keys()))

for actual_bundle, expected_bundle in zip(min_bundles, expected):
assert dict(actual_bundle.resources_bundle) == expected_bundle
for actual_bundle_count, expected_bundle_count in zip(min_bundles, expected):
assert (
dict(actual_bundle_count.request.resources_bundle)
== expected_bundle_count[0]
)
assert actual_bundle_count.count == expected_bundle_count[1]


@dataclass
Expand Down Expand Up @@ -240,7 +250,7 @@ def test_request_cluster_resources_basic(shutdown_only):

def verify():
state = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(state, [{"CPU": 1}])
assert_cluster_resource_constraints(state, [{"CPU": 1}], [1])
return True

wait_for_condition(verify)
Expand All @@ -250,7 +260,17 @@ def verify():

def verify():
state = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(state, [{"CPU": 2, "GPU": 1}, {"CPU": 1}])
assert_cluster_resource_constraints(
state, [{"CPU": 2, "GPU": 1}, {"CPU": 1}], [1, 1]
)
return True

# Request multiple is aggregated by shape.
request_cluster_resources([{"CPU": 1}] * 100)

def verify():
state = get_cluster_resource_state(stub)
assert_cluster_resource_constraints(state, [{"CPU": 1}], [100])
return True

wait_for_condition(verify)
Expand Down
9 changes: 6 additions & 3 deletions python/ray/autoscaler/v2/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ def test_cluster_status_parser_cluster_resource_state():
{
"min_bundles": [
{
"resources_bundle": {"GPU": 2, "CPU": 100},
"placement_constraints": [],
"request": {
"resources_bundle": {"GPU": 2, "CPU": 100},
"placement_constraints": [],
},
"count": 1,
},
]
}
Expand Down Expand Up @@ -510,7 +513,7 @@ def test_cluster_status_formatter():
127.0.0.3: worker_node, starting ray
Recent failures:
worker_node: LaunchFailed (latest_attempt: 02:46:40) - Insufficient capacity
worker_node: RayletUnexpectedlyDied (ip: 127.0.0.5)
worker_node: NodeTerminated (ip: 127.0.0.5)
Resources
--------------------------------------------------------
Expand Down
9 changes: 6 additions & 3 deletions python/ray/autoscaler/v2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,12 @@ def _parse_resource_demands(

for constraint_request in state.cluster_resource_constraints:
demand = ClusterConstraintDemand(
bundles_by_count=cls._aggregate_resource_requests_by_shape(
constraint_request.min_bundles
),
bundles_by_count=[
ResourceRequestByCount(
bundle=dict(r.request.resources_bundle.items()), count=r.count
)
for r in constraint_request.min_bundles
]
)
constraint_demand.append(demand)

Expand Down
3 changes: 2 additions & 1 deletion python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ cdef extern from "ray/gcs/gcs_client/gcs_client.h" nogil:
int64_t timeout_ms, c_vector[CJobTableData]& result)
CRayStatus RequestClusterResourceConstraint(
int64_t timeout_ms,
const c_vector[unordered_map[c_string, double]] &bundles)
const c_vector[unordered_map[c_string, double]] &bundles,
const c_vector[int64_t] &count_array)
CRayStatus GetClusterStatus(
int64_t timeout_ms,
c_string &serialized_reply)
Expand Down
50 changes: 25 additions & 25 deletions python/ray/tests/test_resource_demand_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3138,7 +3138,7 @@ def test_info_string():
1.2.3.4: m4.4xlarge, waiting-for-ssh
1.2.3.5: m4.4xlarge, waiting-for-ssh
Recent failures:
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.6)
p3.2xlarge: NodeTerminated (ip: 1.2.3.6)
Resources
--------------------------------------------------------
Expand Down Expand Up @@ -3218,7 +3218,7 @@ def test_info_string_verbose():
1.2.3.4: m4.4xlarge, waiting-for-ssh
1.2.3.5: m4.4xlarge, waiting-for-ssh
Recent failures:
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.6)
p3.2xlarge: NodeTerminated (ip: 1.2.3.6)
Resources
--------------------------------------------------------
Expand Down Expand Up @@ -3322,7 +3322,7 @@ def test_info_string_verbose_node_types():
1.2.3.4: m4.4xlarge, waiting-for-ssh
1.2.3.5: m4.4xlarge, waiting-for-ssh
Recent failures:
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.6)
p3.2xlarge: NodeTerminated (ip: 1.2.3.6)
Resources
--------------------------------------------------------
Expand Down Expand Up @@ -3410,7 +3410,7 @@ def test_info_string_verbose_no_breakdown():
1.2.3.4: m4.4xlarge, waiting-for-ssh
1.2.3.5: m4.4xlarge, waiting-for-ssh
Recent failures:
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.6)
p3.2xlarge: NodeTerminated (ip: 1.2.3.6)
Resources
--------------------------------------------------------
Expand Down Expand Up @@ -3501,7 +3501,7 @@ def test_info_string_with_launch_failures():
Recent failures:
A100: InstanceLimitExceeded (latest_attempt: 13:03:02)
Inferentia-Spot: InsufficientInstanceCapacity (latest_attempt: 13:03:01)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.6)
p3.2xlarge: NodeTerminated (ip: 1.2.3.6)
Resources
--------------------------------------------------------
Expand Down Expand Up @@ -3590,7 +3590,7 @@ def test_info_string_with_launch_failures_verbose():
Recent failures:
A100: InstanceLimitExceeded (latest_attempt: 13:03:02) - you should fix it
Inferentia-Spot: InsufficientInstanceCapacity (latest_attempt: 13:03:01) - desc
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.6)
p3.2xlarge: NodeTerminated (ip: 1.2.3.6)
Resources
--------------------------------------------------------
Expand Down Expand Up @@ -3657,25 +3657,25 @@ def test_info_string_failed_node_cap():
1.2.3.4: m4.4xlarge, waiting-for-ssh
1.2.3.5: m4.4xlarge, waiting-for-ssh
Recent failures:
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.99)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.98)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.97)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.96)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.95)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.94)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.93)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.92)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.91)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.90)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.89)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.88)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.87)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.86)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.85)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.84)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.83)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.82)
p3.2xlarge: RayletUnexpectedlyDied (ip: 1.2.3.81)
p3.2xlarge: NodeTerminated (ip: 1.2.3.99)
p3.2xlarge: NodeTerminated (ip: 1.2.3.98)
p3.2xlarge: NodeTerminated (ip: 1.2.3.97)
p3.2xlarge: NodeTerminated (ip: 1.2.3.96)
p3.2xlarge: NodeTerminated (ip: 1.2.3.95)
p3.2xlarge: NodeTerminated (ip: 1.2.3.94)
p3.2xlarge: NodeTerminated (ip: 1.2.3.93)
p3.2xlarge: NodeTerminated (ip: 1.2.3.92)
p3.2xlarge: NodeTerminated (ip: 1.2.3.91)
p3.2xlarge: NodeTerminated (ip: 1.2.3.90)
p3.2xlarge: NodeTerminated (ip: 1.2.3.89)
p3.2xlarge: NodeTerminated (ip: 1.2.3.88)
p3.2xlarge: NodeTerminated (ip: 1.2.3.87)
p3.2xlarge: NodeTerminated (ip: 1.2.3.86)
p3.2xlarge: NodeTerminated (ip: 1.2.3.85)
p3.2xlarge: NodeTerminated (ip: 1.2.3.84)
p3.2xlarge: NodeTerminated (ip: 1.2.3.83)
p3.2xlarge: NodeTerminated (ip: 1.2.3.82)
p3.2xlarge: NodeTerminated (ip: 1.2.3.81)
Resources
--------------------------------------------------------
Expand Down
18 changes: 12 additions & 6 deletions src/ray/gcs/gcs_client/gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,18 +399,24 @@ Status PythonGcsClient::GetAllJobInfo(int64_t timeout_ms,

Status PythonGcsClient::RequestClusterResourceConstraint(
int64_t timeout_ms,
const std::vector<std::unordered_map<std::string, double>> &bundles) {
const std::vector<std::unordered_map<std::string, double>> &bundles,
const std::vector<int64_t> &count_array) {
grpc::ClientContext context;
GrpcClientContextWithTimeoutMs(context, timeout_ms);

rpc::autoscaler::RequestClusterResourceConstraintRequest request;
rpc::autoscaler::RequestClusterResourceConstraintReply reply;
RAY_CHECK(bundles.size() == count_array.size());
for (size_t i = 0; i < bundles.size(); ++i) {
const auto &bundle = bundles[i];
auto count = count_array[i];

for (auto bundle : bundles) {
request.mutable_cluster_resource_constraint()
->add_min_bundles()
->mutable_resources_bundle()
->insert(bundle.begin(), bundle.end());
auto new_resource_requests_by_count =
request.mutable_cluster_resource_constraint()->add_min_bundles();

new_resource_requests_by_count->mutable_request()->mutable_resources_bundle()->insert(
bundle.begin(), bundle.end());
new_resource_requests_by_count->set_count(count);
}

grpc::Status status =
Expand Down
3 changes: 2 additions & 1 deletion src/ray/gcs/gcs_client/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ class RAY_EXPORT PythonGcsClient {
// For rpc::autoscaler::AutoscalerStateService
Status RequestClusterResourceConstraint(
int64_t timeout_ms,
const std::vector<std::unordered_map<std::string, double>> &bundles);
const std::vector<std::unordered_map<std::string, double>> &bundles,
const std::vector<int64_t> &count_array);
Status GetClusterStatus(int64_t timeout_ms, std::string &serialized_reply);

private:
Expand Down
10 changes: 5 additions & 5 deletions src/ray/gcs/gcs_server/test/gcs_autoscaler_state_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -593,22 +593,22 @@ TEST_F(GcsAutoscalerStateManagerTest, TestClusterResourcesConstraint) {
// Generate one constraint.
{
RequestClusterResourceConstraint(
Mocker::GenClusterResourcesConstraint({{{"CPU", 2}, {"GPU", 1}}}));
Mocker::GenClusterResourcesConstraint({{{"CPU", 2}, {"GPU", 1}}}, {1}));
const auto &state = GetClusterResourceStateSync();
ASSERT_EQ(state.cluster_resource_constraints_size(), 1);
ASSERT_EQ(state.cluster_resource_constraints(0).min_bundles_size(), 1);
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0),
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0).request(),
{{"CPU", 2}, {"GPU", 1}});
}

// Override it
{
RequestClusterResourceConstraint(
Mocker::GenClusterResourcesConstraint({{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}}}));
RequestClusterResourceConstraint(Mocker::GenClusterResourcesConstraint(
{{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}}}, {1}));
const auto &state = GetClusterResourceStateSync();
ASSERT_EQ(state.cluster_resource_constraints_size(), 1);
ASSERT_EQ(state.cluster_resource_constraints(0).min_bundles_size(), 1);
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0),
CheckResourceRequest(state.cluster_resource_constraints(0).min_bundles(0).request(),
{{"CPU", 4}, {"GPU", 5}, {"TPU", 1}});
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/ray/gcs/test/gcs_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,17 @@ struct Mocker {
return placement_group_table_data;
}
static rpc::autoscaler::ClusterResourceConstraint GenClusterResourcesConstraint(
const std::vector<std::unordered_map<std::string, double>> &request_resources) {
const std::vector<std::unordered_map<std::string, double>> &request_resources,
const std::vector<int64_t> &count_array) {
rpc::autoscaler::ClusterResourceConstraint constraint;
for (const auto &resource : request_resources) {
RAY_CHECK(request_resources.size() == count_array.size());
for (size_t i = 0; i < request_resources.size(); i++) {
auto &resource = request_resources[i];
auto count = count_array[i];
auto bundle = constraint.add_min_bundles();
bundle->mutable_resources_bundle()->insert(resource.begin(), resource.end());
bundle->set_count(count);
bundle->mutable_request()->mutable_resources_bundle()->insert(resource.begin(),
resource.end());
}
return constraint;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/protobuf/experimental/autoscaler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ message GangResourceRequest {
message ClusterResourceConstraint {
// If not emtpy, the cluster should have the capacity (total resource) to fit
// the min_bundles.
repeated ResourceRequest min_bundles = 1;
repeated ResourceRequestByCount min_bundles = 1;
}

// Node status for a ray node.
Expand Down

0 comments on commit d05116e

Please sign in to comment.