From d50b90155e8e1561a634047c7d39428950c3d0bc Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Tue, 22 Nov 2022 10:21:36 -0800 Subject: [PATCH] Revert "Revert "[autoscaler][observability] Autoscaler emit cluster/pending logical resource metrics"" (#30552) See #29820 for most of the details of the PR. The only difference is improving the NullMetric stub to not fail when tagging labesl for metrics. This reverts commit f9092dc55e662db00309c4345f744d399a00b9b9. Co-authored-by: Alex Signed-off-by: Weichen Xu --- python/ray/_private/test_utils.py | 48 ++++++- python/ray/autoscaler/_private/autoscaler.py | 20 ++- python/ray/autoscaler/_private/monitor.py | 17 ++- .../ray/autoscaler/_private/prom_metrics.py | 24 ++++ python/ray/tests/BUILD | 1 + python/ray/tests/test_autoscaler_e2e.py | 122 ++++++++++++++++++ python/ray/tests/test_metrics_agent.py | 2 + .../tests/test_resource_demand_scheduler.py | 55 +------- python/ray/tests/test_scheduling.py | 26 +--- 9 files changed, 239 insertions(+), 76 deletions(-) create mode 100644 python/ray/tests/test_autoscaler_e2e.py diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index ad4ea9630b11..a089c07a1280 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -16,7 +16,7 @@ import traceback from collections import defaultdict from contextlib import contextmanager, redirect_stderr, redirect_stdout -from typing import Any, Dict, List, Optional +from typing import Any, Callable, Dict, List, Optional import uuid from ray._raylet import Config @@ -542,6 +542,52 @@ async def async_wait_for_condition_async_predicate( raise RuntimeError(message) +def get_metric_check_condition( + metrics_to_check: Dict[str, Optional[float]], export_addr: Optional[str] = None +) -> Callable[[], bool]: + """A condition to check if a prometheus metrics reach a certain value. + This is a blocking check that can be passed into a `wait_for_condition` + style function. + + Args: + metrics_to_check: A map of metric lable to values to check, to ensure + that certain conditions have been reached. If a value is None, just check + that the metric was emitted with any value. + + Returns: + A function that returns True if all the metrics are emitted. + """ + node_info = ray.nodes()[0] + metrics_export_port = node_info["MetricsExportPort"] + addr = node_info["NodeManagerAddress"] + prom_addr = export_addr or f"{addr}:{metrics_export_port}" + + def f(): + for metric_name, metric_value in metrics_to_check.items(): + _, metric_names, metric_samples = fetch_prometheus([prom_addr]) + found_metric = False + if metric_name in metric_names: + for sample in metric_samples: + if sample.name != metric_name: + continue + + if metric_value is None: + found_metric = True + elif metric_value == sample.value: + found_metric = True + if not found_metric: + print( + "Didn't find metric, all metric names: ", + metric_names, + "all values", + metric_samples, + ) + return False + return True + + return f + + def wait_for_stdout(strings_to_match: List[str], timeout_s: int): """Returns a decorator which waits until the stdout emitted by a function contains the provided list of strings. diff --git a/python/ray/autoscaler/_private/autoscaler.py b/python/ray/autoscaler/_private/autoscaler.py index cbec78f82255..0458cbabae1e 100644 --- a/python/ray/autoscaler/_private/autoscaler.py +++ b/python/ray/autoscaler/_private/autoscaler.py @@ -107,6 +107,7 @@ class AutoscalerSummary: node_availability_summary: NodeAvailabilitySummary = field( default_factory=lambda: NodeAvailabilitySummary({}) ) + pending_resources: Dict[str, int] = field(default_factory=lambda: {}) class NonTerminatedNodes: @@ -1433,7 +1434,7 @@ def summary(self) -> Optional[AutoscalerSummary]: completed_states = [STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED] is_pending = status not in completed_states if is_pending: - pending_nodes.append((ip, node_type, status)) + pending_nodes.append((node_id, ip, node_type, status)) non_failed.add(node_id) failed_nodes = self.node_tracker.get_all_failed_node_info(non_failed) @@ -1445,13 +1446,28 @@ def summary(self) -> Optional[AutoscalerSummary]: if count: pending_launches[node_type] = count + pending_resources = {} + for node_resources in self.resource_demand_scheduler.calculate_node_resources( + nodes=[node_id for node_id, _, _, _ in pending_nodes], + pending_nodes=pending_launches, + # We don't fill this field out because we're intentionally only + # passing pending nodes (which aren't tracked by load metrics + # anyways). + unused_resources_by_ip={}, + )[0]: + for key, value in node_resources.items(): + pending_resources[key] = value + pending_resources.get(key, 0) + return AutoscalerSummary( # Convert active_nodes from counter to dict for later serialization active_nodes=dict(active_nodes), - pending_nodes=pending_nodes, + pending_nodes=[ + (ip, node_type, status) for _, ip, node_type, status in pending_nodes + ], pending_launches=pending_launches, failed_nodes=failed_nodes, node_availability_summary=self.node_provider_availability_tracker.summary(), + pending_resources=pending_resources, ) def info_string(self): diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index d36a4a845484..41fe84fd48be 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -347,9 +347,10 @@ def _run(self): gcs_request_time = time.time() - gcs_request_start_time self.update_resource_requests() self.update_event_summary() + load_metrics_summary = self.load_metrics.summary() status = { "gcs_request_time": gcs_request_time, - "load_metrics_report": asdict(self.load_metrics.summary()), + "load_metrics_report": asdict(load_metrics_summary), "time": time.time(), "monitor_pid": os.getpid(), } @@ -375,6 +376,20 @@ def _run(self): self.autoscaler.non_terminated_nodes.non_terminated_nodes_time # noqa: E501 ) + for resource_name in ["CPU", "GPU"]: + _, total = load_metrics_summary.usage.get( + resource_name, (0, 0) + ) + pending = autoscaler_summary.pending_resources.get( + resource_name, 0 + ) + self.prom_metrics.cluster_resources.labels( + resource=resource_name + ).set(total) + self.prom_metrics.pending_resources.labels( + resource=resource_name + ).set(pending) + for msg in self.event_summarizer.summary(): # Need to prefix each line of the message for the lines to # get pushed to the driver logs. diff --git a/python/ray/autoscaler/_private/prom_metrics.py b/python/ray/autoscaler/_private/prom_metrics.py index 6ec8020da5c7..d10f7db4b772 100644 --- a/python/ray/autoscaler/_private/prom_metrics.py +++ b/python/ray/autoscaler/_private/prom_metrics.py @@ -13,6 +13,9 @@ def observe(self, *args, **kwargs): def inc(self, *args, **kwargs): pass + def labels(self, *args, **kwargs): + return self + try: @@ -202,6 +205,27 @@ def __init__(self, registry: Optional[CollectorRegistry] = None): namespace="autoscaler", registry=self.registry, ) + # This represents the autoscaler's view of essentially + # `ray.cluster_resources()`, it may be slightly different from the + # core metric from an eventual consistency perspective. + self.cluster_resources: Gauge = Gauge( + "cluster_resources", + "Total logical resources in the cluster.", + unit="resources", + namespace="autoscaler", + registry=self.registry, + labelnames=["resource"], + ) + # This represents the pending launches + nodes being set up for the + # autoscaler. + self.pending_resources: Gauge = Gauge( + "pending_resources", + "Pending logical resources in the cluster.", + unit="resources", + namespace="autoscaler", + registry=self.registry, + labelnames=["resource"], + ) except ImportError: diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 1f34abd0d6af..f2f943562083 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -206,6 +206,7 @@ py_test_module_list( "test_batch_node_provider_integration.py", "test_autoscaler.py", "test_autoscaler_drain_node_api.py", + "test_autoscaler_e2e.py", "test_autoscaler_gcp.py", "test_cli_logger.py", "test_client_metadata.py", diff --git a/python/ray/tests/test_autoscaler_e2e.py b/python/ray/tests/test_autoscaler_e2e.py new file mode 100644 index 000000000000..a1195ad403d0 --- /dev/null +++ b/python/ray/tests/test_autoscaler_e2e.py @@ -0,0 +1,122 @@ +import subprocess +from ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT + +import ray +from ray._private.test_utils import ( + wait_for_condition, + get_metric_check_condition, +) +from ray.cluster_utils import AutoscalingCluster + + +def test_ray_status_e2e(shutdown_only): + cluster = AutoscalingCluster( + head_resources={"CPU": 0}, + worker_node_types={ + "type-i": { + "resources": {"CPU": 1, "fun": 1}, + "node_config": {}, + "min_workers": 1, + "max_workers": 1, + }, + "type-ii": { + "resources": {"CPU": 1, "fun": 100}, + "node_config": {}, + "min_workers": 1, + "max_workers": 1, + }, + }, + ) + + try: + cluster.start() + ray.init(address="auto") + + @ray.remote(num_cpus=0, resources={"fun": 2}) + class Actor: + def ping(self): + return None + + actor = Actor.remote() + ray.get(actor.ping.remote()) + + assert "Demands" in subprocess.check_output("ray status", shell=True).decode() + assert ( + "Total Demands" + not in subprocess.check_output("ray status", shell=True).decode() + ) + assert ( + "Total Demands" + in subprocess.check_output("ray status -v", shell=True).decode() + ) + assert ( + "Total Demands" + in subprocess.check_output("ray status --verbose", shell=True).decode() + ) + finally: + cluster.shutdown() + + +def test_metrics(shutdown_only): + cluster = AutoscalingCluster( + head_resources={"CPU": 0}, + worker_node_types={ + "type-i": { + "resources": {"CPU": 1}, + "node_config": {}, + "min_workers": 1, + "max_workers": 1, + }, + "type-ii": { + "resources": {"CPU": 1}, + "node_config": {}, + "min_workers": 1, + "max_workers": 1, + }, + }, + ) + + try: + cluster.start() + info = ray.init(address="auto") + autoscaler_export_addr = "{}:{}".format( + info.address_info["node_ip_address"], AUTOSCALER_METRIC_PORT + ) + + @ray.remote(num_cpus=1) + class Foo: + def ping(self): + return True + + zero_reported_condition = get_metric_check_condition( + {"autoscaler_cluster_resources": 0, "autoscaler_pending_resources": 0}, + export_addr=autoscaler_export_addr, + ) + wait_for_condition(zero_reported_condition) + + actors = [Foo.remote() for _ in range(2)] + ray.get([actor.ping.remote() for actor in actors]) + + two_cpu_no_pending_condition = get_metric_check_condition( + {"autoscaler_cluster_resources": 2, "autoscaler_pending_resources": 0}, + export_addr=autoscaler_export_addr, + ) + wait_for_condition(two_cpu_no_pending_condition) + # TODO (Alex): Ideally we'd also assert that pending_resources + # eventually became 1 or 2, but it's difficult to do that in a + # non-racey way. (Perhaps we would need to artificially delay the fake + # autoscaler node launch?). + + finally: + cluster.shutdown() + + +if __name__ == "__main__": + import sys + import os + import pytest + + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index aff3c36780ec..44a36f104508 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -106,6 +106,8 @@ "autoscaler_failed_recoveries", "autoscaler_drain_node_exceptions", "autoscaler_update_time", + "autoscaler_cluster_resources", + "autoscaler_pending_resources", ] diff --git a/python/ray/tests/test_resource_demand_scheduler.py b/python/ray/tests/test_resource_demand_scheduler.py index d5e78b756aae..61ec7e56cb68 100644 --- a/python/ray/tests/test_resource_demand_scheduler.py +++ b/python/ray/tests/test_resource_demand_scheduler.py @@ -9,7 +9,6 @@ from datetime import datetime from time import sleep from unittest import mock -import subprocess import pytest import yaml @@ -65,7 +64,6 @@ fill_in_raylet_ids, mock_raylet_id, ) -from ray.cluster_utils import AutoscalingCluster from functools import partial GET_DEFAULT_METHOD = "ray.autoscaler._private.util._get_default_config" @@ -1928,6 +1926,11 @@ def testSummary(self): assert summary.failed_nodes == [("172.0.0.4", "m4.4xlarge")] + assert summary.pending_resources == { + "GPU": 1, + "CPU": 144, + }, summary.pending_resources + # Check dict conversion summary_dict = asdict(summary) assert summary_dict["active_nodes"]["m4.large"] == 2 @@ -3399,54 +3402,6 @@ def test_info_string_failed_node_cap(): assert expected.strip() == actual -def test_ray_status_e2e(shutdown_only): - cluster = AutoscalingCluster( - head_resources={"CPU": 0}, - worker_node_types={ - "type-i": { - "resources": {"CPU": 1, "fun": 1}, - "node_config": {}, - "min_workers": 1, - "max_workers": 1, - }, - "type-ii": { - "resources": {"CPU": 1, "fun": 100}, - "node_config": {}, - "min_workers": 1, - "max_workers": 1, - }, - }, - ) - - try: - cluster.start() - ray.init(address="auto") - - @ray.remote(num_cpus=0, resources={"fun": 2}) - class Actor: - def ping(self): - return None - - actor = Actor.remote() - ray.get(actor.ping.remote()) - - assert "Demands" in subprocess.check_output("ray status", shell=True).decode() - assert ( - "Total Demands" - not in subprocess.check_output("ray status", shell=True).decode() - ) - assert ( - "Total Demands" - in subprocess.check_output("ray status -v", shell=True).decode() - ) - assert ( - "Total Demands" - in subprocess.check_output("ray status --verbose", shell=True).decode() - ) - finally: - cluster.shutdown() - - def test_placement_group_match_string(): assert ( is_placement_group_resource("bundle_group_ffe7d420752c6e8658638d19ecf2b68c") diff --git a/python/ray/tests/test_scheduling.py b/python/ray/tests/test_scheduling.py index da11d1de2b35..70cf0f8d5b43 100644 --- a/python/ray/tests/test_scheduling.py +++ b/python/ray/tests/test_scheduling.py @@ -18,8 +18,8 @@ from ray._private.test_utils import ( Semaphore, SignalActor, - fetch_prometheus, object_memory_usage, + get_metric_check_condition, wait_for_condition, ) @@ -702,12 +702,6 @@ def train(self): indirect=True, ) def test_scheduling_class_depth(ray_start_regular): - - node_info = ray.nodes()[0] - metrics_export_port = node_info["MetricsExportPort"] - addr = node_info["NodeManagerAddress"] - prom_addr = f"{addr}:{metrics_export_port}" - @ray.remote(num_cpus=1000) def infeasible(): pass @@ -723,26 +717,14 @@ def start_infeasible(n): # We expect the 2 calls to `infeasible` to be separate scheduling classes # because one has depth=1, and the other has depth=2. - metric_name = "ray_internal_num_infeasible_scheduling_classes" - def make_condition(n): - def condition(): - _, metric_names, metric_samples = fetch_prometheus([prom_addr]) - if metric_name in metric_names: - for sample in metric_samples: - if sample.name == metric_name and sample.value == n: - return True - return False - - return condition - # timeout=60 necessary to pass on windows debug/asan builds. - wait_for_condition(make_condition(2), timeout=60) + wait_for_condition(get_metric_check_condition({metric_name: 2}), timeout=60) start_infeasible.remote(2) - wait_for_condition(make_condition(3), timeout=60) + wait_for_condition(get_metric_check_condition({metric_name: 3}), timeout=60) start_infeasible.remote(4) - wait_for_condition(make_condition(4), timeout=60) + wait_for_condition(get_metric_check_condition({metric_name: 4}), timeout=60) if __name__ == "__main__":