Skip to content

Commit

Permalink
Revert "Revert "[autoscaler][observability] Autoscaler emit cluster/p…
Browse files Browse the repository at this point in the history
…ending logical resource metrics"" (ray-project#30552)

See ray-project#29820 for most of the details of the PR.

The only difference is improving the NullMetric stub to not fail when tagging labesl for metrics.

This reverts commit f9092dc.

Co-authored-by: Alex <[email protected]>
Signed-off-by: Weichen Xu <[email protected]>
  • Loading branch information
2 people authored and WeichenXu123 committed Dec 19, 2022
1 parent c1b4c81 commit d50b901
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 76 deletions.
48 changes: 47 additions & 1 deletion python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import traceback
from collections import defaultdict
from contextlib import contextmanager, redirect_stderr, redirect_stdout
from typing import Any, Dict, List, Optional
from typing import Any, Callable, Dict, List, Optional
import uuid
from ray._raylet import Config

Expand Down Expand Up @@ -542,6 +542,52 @@ async def async_wait_for_condition_async_predicate(
raise RuntimeError(message)


def get_metric_check_condition(
metrics_to_check: Dict[str, Optional[float]], export_addr: Optional[str] = None
) -> Callable[[], bool]:
"""A condition to check if a prometheus metrics reach a certain value.
This is a blocking check that can be passed into a `wait_for_condition`
style function.
Args:
metrics_to_check: A map of metric lable to values to check, to ensure
that certain conditions have been reached. If a value is None, just check
that the metric was emitted with any value.
Returns:
A function that returns True if all the metrics are emitted.
"""
node_info = ray.nodes()[0]
metrics_export_port = node_info["MetricsExportPort"]
addr = node_info["NodeManagerAddress"]
prom_addr = export_addr or f"{addr}:{metrics_export_port}"

def f():
for metric_name, metric_value in metrics_to_check.items():
_, metric_names, metric_samples = fetch_prometheus([prom_addr])
found_metric = False
if metric_name in metric_names:
for sample in metric_samples:
if sample.name != metric_name:
continue

if metric_value is None:
found_metric = True
elif metric_value == sample.value:
found_metric = True
if not found_metric:
print(
"Didn't find metric, all metric names: ",
metric_names,
"all values",
metric_samples,
)
return False
return True

return f


def wait_for_stdout(strings_to_match: List[str], timeout_s: int):
"""Returns a decorator which waits until the stdout emitted
by a function contains the provided list of strings.
Expand Down
20 changes: 18 additions & 2 deletions python/ray/autoscaler/_private/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class AutoscalerSummary:
node_availability_summary: NodeAvailabilitySummary = field(
default_factory=lambda: NodeAvailabilitySummary({})
)
pending_resources: Dict[str, int] = field(default_factory=lambda: {})


class NonTerminatedNodes:
Expand Down Expand Up @@ -1433,7 +1434,7 @@ def summary(self) -> Optional[AutoscalerSummary]:
completed_states = [STATUS_UP_TO_DATE, STATUS_UPDATE_FAILED]
is_pending = status not in completed_states
if is_pending:
pending_nodes.append((ip, node_type, status))
pending_nodes.append((node_id, ip, node_type, status))
non_failed.add(node_id)

failed_nodes = self.node_tracker.get_all_failed_node_info(non_failed)
Expand All @@ -1445,13 +1446,28 @@ def summary(self) -> Optional[AutoscalerSummary]:
if count:
pending_launches[node_type] = count

pending_resources = {}
for node_resources in self.resource_demand_scheduler.calculate_node_resources(
nodes=[node_id for node_id, _, _, _ in pending_nodes],
pending_nodes=pending_launches,
# We don't fill this field out because we're intentionally only
# passing pending nodes (which aren't tracked by load metrics
# anyways).
unused_resources_by_ip={},
)[0]:
for key, value in node_resources.items():
pending_resources[key] = value + pending_resources.get(key, 0)

return AutoscalerSummary(
# Convert active_nodes from counter to dict for later serialization
active_nodes=dict(active_nodes),
pending_nodes=pending_nodes,
pending_nodes=[
(ip, node_type, status) for _, ip, node_type, status in pending_nodes
],
pending_launches=pending_launches,
failed_nodes=failed_nodes,
node_availability_summary=self.node_provider_availability_tracker.summary(),
pending_resources=pending_resources,
)

def info_string(self):
Expand Down
17 changes: 16 additions & 1 deletion python/ray/autoscaler/_private/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ def _run(self):
gcs_request_time = time.time() - gcs_request_start_time
self.update_resource_requests()
self.update_event_summary()
load_metrics_summary = self.load_metrics.summary()
status = {
"gcs_request_time": gcs_request_time,
"load_metrics_report": asdict(self.load_metrics.summary()),
"load_metrics_report": asdict(load_metrics_summary),
"time": time.time(),
"monitor_pid": os.getpid(),
}
Expand All @@ -375,6 +376,20 @@ def _run(self):
self.autoscaler.non_terminated_nodes.non_terminated_nodes_time # noqa: E501
)

for resource_name in ["CPU", "GPU"]:
_, total = load_metrics_summary.usage.get(
resource_name, (0, 0)
)
pending = autoscaler_summary.pending_resources.get(
resource_name, 0
)
self.prom_metrics.cluster_resources.labels(
resource=resource_name
).set(total)
self.prom_metrics.pending_resources.labels(
resource=resource_name
).set(pending)

for msg in self.event_summarizer.summary():
# Need to prefix each line of the message for the lines to
# get pushed to the driver logs.
Expand Down
24 changes: 24 additions & 0 deletions python/ray/autoscaler/_private/prom_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ def observe(self, *args, **kwargs):
def inc(self, *args, **kwargs):
pass

def labels(self, *args, **kwargs):
return self


try:

Expand Down Expand Up @@ -202,6 +205,27 @@ def __init__(self, registry: Optional[CollectorRegistry] = None):
namespace="autoscaler",
registry=self.registry,
)
# This represents the autoscaler's view of essentially
# `ray.cluster_resources()`, it may be slightly different from the
# core metric from an eventual consistency perspective.
self.cluster_resources: Gauge = Gauge(
"cluster_resources",
"Total logical resources in the cluster.",
unit="resources",
namespace="autoscaler",
registry=self.registry,
labelnames=["resource"],
)
# This represents the pending launches + nodes being set up for the
# autoscaler.
self.pending_resources: Gauge = Gauge(
"pending_resources",
"Pending logical resources in the cluster.",
unit="resources",
namespace="autoscaler",
registry=self.registry,
labelnames=["resource"],
)

except ImportError:

Expand Down
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ py_test_module_list(
"test_batch_node_provider_integration.py",
"test_autoscaler.py",
"test_autoscaler_drain_node_api.py",
"test_autoscaler_e2e.py",
"test_autoscaler_gcp.py",
"test_cli_logger.py",
"test_client_metadata.py",
Expand Down
122 changes: 122 additions & 0 deletions python/ray/tests/test_autoscaler_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import subprocess
from ray.autoscaler._private.constants import AUTOSCALER_METRIC_PORT

import ray
from ray._private.test_utils import (
wait_for_condition,
get_metric_check_condition,
)
from ray.cluster_utils import AutoscalingCluster


def test_ray_status_e2e(shutdown_only):
cluster = AutoscalingCluster(
head_resources={"CPU": 0},
worker_node_types={
"type-i": {
"resources": {"CPU": 1, "fun": 1},
"node_config": {},
"min_workers": 1,
"max_workers": 1,
},
"type-ii": {
"resources": {"CPU": 1, "fun": 100},
"node_config": {},
"min_workers": 1,
"max_workers": 1,
},
},
)

try:
cluster.start()
ray.init(address="auto")

@ray.remote(num_cpus=0, resources={"fun": 2})
class Actor:
def ping(self):
return None

actor = Actor.remote()
ray.get(actor.ping.remote())

assert "Demands" in subprocess.check_output("ray status", shell=True).decode()
assert (
"Total Demands"
not in subprocess.check_output("ray status", shell=True).decode()
)
assert (
"Total Demands"
in subprocess.check_output("ray status -v", shell=True).decode()
)
assert (
"Total Demands"
in subprocess.check_output("ray status --verbose", shell=True).decode()
)
finally:
cluster.shutdown()


def test_metrics(shutdown_only):
cluster = AutoscalingCluster(
head_resources={"CPU": 0},
worker_node_types={
"type-i": {
"resources": {"CPU": 1},
"node_config": {},
"min_workers": 1,
"max_workers": 1,
},
"type-ii": {
"resources": {"CPU": 1},
"node_config": {},
"min_workers": 1,
"max_workers": 1,
},
},
)

try:
cluster.start()
info = ray.init(address="auto")
autoscaler_export_addr = "{}:{}".format(
info.address_info["node_ip_address"], AUTOSCALER_METRIC_PORT
)

@ray.remote(num_cpus=1)
class Foo:
def ping(self):
return True

zero_reported_condition = get_metric_check_condition(
{"autoscaler_cluster_resources": 0, "autoscaler_pending_resources": 0},
export_addr=autoscaler_export_addr,
)
wait_for_condition(zero_reported_condition)

actors = [Foo.remote() for _ in range(2)]
ray.get([actor.ping.remote() for actor in actors])

two_cpu_no_pending_condition = get_metric_check_condition(
{"autoscaler_cluster_resources": 2, "autoscaler_pending_resources": 0},
export_addr=autoscaler_export_addr,
)
wait_for_condition(two_cpu_no_pending_condition)
# TODO (Alex): Ideally we'd also assert that pending_resources
# eventually became 1 or 2, but it's difficult to do that in a
# non-racey way. (Perhaps we would need to artificially delay the fake
# autoscaler node launch?).

finally:
cluster.shutdown()


if __name__ == "__main__":
import sys
import os
import pytest

if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
2 changes: 2 additions & 0 deletions python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
"autoscaler_failed_recoveries",
"autoscaler_drain_node_exceptions",
"autoscaler_update_time",
"autoscaler_cluster_resources",
"autoscaler_pending_resources",
]


Expand Down
Loading

0 comments on commit d50b901

Please sign in to comment.