Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "[autoscaler][observability] Autoscaler emit cluster/pending logical resource metrics"" #30552

Merged
Merged
48 changes: 47 additions & 1 deletion python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import traceback
from collections import defaultdict
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from typing import Any, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional
import uuid
from ray._raylet import Config

Expand Down Expand Up @@ -542,6 +542,52 @@ async def async_wait_for_condition_async_predicate(
raise RuntimeError(message)


def get_metric_check_condition(
metrics_to_check: Dict[str, Optional[float]], export_addr: Optional[str] = None
) -> Callable[[], bool]:
"""A condition to check if a prometheus metrics reach a certain value.
This is a blocking check that can be passed into a `wait_for_condition`
style function.

Args:
metrics_to_check: A map of metric lable to values to check, to ensure
that certain conditions have been reached. If a value is None, just check
that the metric was emitted with any value.

Returns:
A function that returns True if all the metrics are emitted.
"""
node_info = ray.nodes()[0]
metrics_export_port = node_info["MetricsExportPort"]
addr = node_info["NodeManagerAddress"]
prom_addr = export_addr or f"{addr}:{metrics_export_port}"

def f():
for metric_name, metric_value in metrics_to_check.items():
_, metric_names, metric_samples = fetch_prometheus([prom_addr])
found_metric = False
if metric_name in metric_names:
for sample in metric_samples:
if sample.name != metric_name:
continue

if metric_value is None:
found_metric = True
elif metric_value == sample.value:
found_metric = True
if not found_metric:
print(
"Didn't find metric, all metric names: ",
metric_names,
"all values",
metric_samples,
)
return False
return True

return f


def wait_for_stdout(strings_to_match: List[str], timeout_s: int):
"""Returns a decorator which waits until the stdout emitted
by a function contains the provided list of strings.
Expand Down
20 changes: 18 additions & 2 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class AutoscalerSummary:
node_availability_summary: NodeAvailabilitySummary = field(
default_factory=lambda: NodeAvailabilitySummary({})
)
pending_resources: Dict[str, int] = field(default_factory=lambda: {})


class NonTerminatedNodes:
Expand Down Expand Up @@ -1433,7 +1434,7 @@ def summary(self) -> Optional[AutoscalerSummary]:
completed_states = [STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED]
is_pending = status not in completed_states
if is_pending:
pending_nodes.append((ip, node_type, status))
pending_nodes.append((node_id, ip, node_type, status))
non_failed.add(node_id)

failed_nodes = self.node_tracker.get_all_failed_node_info(non_failed)
Expand All @@ -1445,13 +1446,28 @@ def summary(self) -> Optional[AutoscalerSummary]:
if count:
pending_launches[node_type] = count

pending_resources = {}
for node_resources in self.resource_demand_scheduler.calculate_node_resources(
nodes=[node_id for node_id, _, _, _ in pending_nodes],
pending_nodes=pending_launches,
# We don't fill this field out because we're intentionally only
# passing pending nodes (which aren't tracked by load metrics
# anyways).
unused_resources_by_ip={},
)[0]:
for key, value in node_resources.items():
pending_resources[key] = value + pending_resources.get(key, 0)

return AutoscalerSummary(
# Convert active_nodes from counter to dict for later serialization
active_nodes=dict(active_nodes),
pending_nodes=pending_nodes,
pending_nodes=[
(ip, node_type, status) for _, ip, node_type, status in pending_nodes
],
pending_launches=pending_launches,
failed_nodes=failed_nodes,
node_availability_summary=self.node_provider_availability_tracker.summary(),
pending_resources=pending_resources,
)

def info_string(self):
Expand Down
17 changes: 16 additions & 1 deletion python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ def _run(self):
gcs_request_time = time.time() - gcs_request_start_time
self.update_resource_requests()
self.update_event_summary()
load_metrics_summary = self.load_metrics.summary()
status = {
"gcs_request_time": gcs_request_time,
"load_metrics_report": asdict(self.load_metrics.summary()),
"load_metrics_report": asdict(load_metrics_summary),
"time": time.time(),
"monitor_pid": os.getpid(),
}
Expand All @@ -375,6 +376,20 @@ def _run(self):
self.autoscaler.non_terminated_nodes.non_terminated_nodes_time # noqa: E501
)

for resource_name in ["CPU", "GPU"]:
_, total = load_metrics_summary.usage.get(
resource_name, (0, 0)
)
pending = autoscaler_summary.pending_resources.get(
resource_name, 0
)
self.prom_metrics.cluster_resources.labels(
resource=resource_name
).set(total)
self.prom_metrics.pending_resources.labels(
resource=resource_name
).set(pending)

for msg in self.event_summarizer.summary():
# Need to prefix each line of the message for the lines to
# get pushed to the driver logs.
Expand Down
24 changes: 24 additions & 0 deletions python/ray/autoscaler/_private/prom_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ def observe(self, *args, **kwargs):
def inc(self, *args, **kwargs):
pass

def labels(self, *args, **kwargs):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only difference

return self


try:

Expand Down Expand Up @@ -202,6 +205,27 @@ def __init__(self, registry: Optional[CollectorRegistry] = None):
namespace="autoscaler",
registry=self.registry,
)
# This represents the autoscaler's view of essentially
# `ray.cluster_resources()`, it may be slightly different from the
# core metric from an eventual consistency perspective.
self.cluster_resources: Gauge = Gauge(
"cluster_resources",
"Total logical resources in the cluster.",
unit="resources",
namespace="autoscaler",
registry=self.registry,
labelnames=["resource"],
)
# This represents the pending launches + nodes being set up for the
# autoscaler.
self.pending_resources: Gauge = Gauge(
"pending_resources",
"Pending logical resources in the cluster.",
unit="resources",
namespace="autoscaler",
registry=self.registry,
labelnames=["resource"],
)

except ImportError:

Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ py_test_module_list(
"test_batch_node_provider_integration.py",
"test_autoscaler.py",
"test_autoscaler_drain_node_api.py",
"test_autoscaler_e2e.py",
"test_autoscaler_gcp.py",
"test_cli_logger.py",
"test_client_metadata.py",
Expand Down
122 changes: 122 additions & 0 deletions python/ray/tests/test_autoscaler_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import subprocess
from ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT

import ray
from ray._private.test_utils import (
wait_for_condition,
get_metric_check_condition,
)
from ray.cluster_utils import AutoscalingCluster


def test_ray_status_e2e(shutdown_only):
cluster = AutoscalingCluster(
head_resources={"CPU": 0},
worker_node_types={
"type-i": {
"resources": {"CPU": 1, "fun": 1},
"node_config": {},
"min_workers": 1,
"max_workers": 1,
},
"type-ii": {
"resources": {"CPU": 1, "fun": 100},
"node_config": {},
"min_workers": 1,
"max_workers": 1,
},
},
)

try:
cluster.start()
ray.init(address="auto")

@ray.remote(num_cpus=0, resources={"fun": 2})
class Actor:
def ping(self):
return None

actor = Actor.remote()
ray.get(actor.ping.remote())

assert "Demands" in subprocess.check_output("ray status", shell=True).decode()
assert (
"Total Demands"
not in subprocess.check_output("ray status", shell=True).decode()
)
assert (
"Total Demands"
in subprocess.check_output("ray status -v", shell=True).decode()
)
assert (
"Total Demands"
in subprocess.check_output("ray status --verbose", shell=True).decode()
)
finally:
cluster.shutdown()


def test_metrics(shutdown_only):
cluster = AutoscalingCluster(
head_resources={"CPU": 0},
worker_node_types={
"type-i": {
"resources": {"CPU": 1},
"node_config": {},
"min_workers": 1,
"max_workers": 1,
},
"type-ii": {
"resources": {"CPU": 1},
"node_config": {},
"min_workers": 1,
"max_workers": 1,
},
},
)

try:
cluster.start()
info = ray.init(address="auto")
autoscaler_export_addr = "{}:{}".format(
info.address_info["node_ip_address"], AUTOSCALER_METRIC_PORT
)

@ray.remote(num_cpus=1)
class Foo:
def ping(self):
return True

zero_reported_condition = get_metric_check_condition(
{"autoscaler_cluster_resources": 0, "autoscaler_pending_resources": 0},
export_addr=autoscaler_export_addr,
)
wait_for_condition(zero_reported_condition)

actors = [Foo.remote() for _ in range(2)]
ray.get([actor.ping.remote() for actor in actors])

two_cpu_no_pending_condition = get_metric_check_condition(
{"autoscaler_cluster_resources": 2, "autoscaler_pending_resources": 0},
export_addr=autoscaler_export_addr,
)
wait_for_condition(two_cpu_no_pending_condition)
# TODO (Alex): Ideally we'd also assert that pending_resources
# eventually became 1 or 2, but it's difficult to do that in a
# non-racey way. (Perhaps we would need to artificially delay the fake
# autoscaler node launch?).

finally:
cluster.shutdown()


if __name__ == "__main__":
import sys
import os
import pytest

if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 2 additions & 0 deletions python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
"autoscaler_failed_recoveries",
"autoscaler_drain_node_exceptions",
"autoscaler_update_time",
"autoscaler_cluster_resources",
"autoscaler_pending_resources",
]


Expand Down
Loading