diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 4f2623e19446..ddf612984fd4 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -6,7 +6,7 @@ import time import traceback from collections import namedtuple -from typing import List, Tuple, Any, Dict +from typing import List, Tuple, Any, Dict, Set from prometheus_client.core import ( CounterMetricFamily, @@ -527,6 +527,9 @@ def __init__( component_timeout_s=int(os.getenv(RAY_WORKER_TIMEOUT_S, 120)), ) + # Registered view names. + self._registered_views: Set[str] = set() + def record_and_export(self, records: List[Record], global_tags=None): """Directly record and export stats from the same process.""" global_tags = global_tags or {} @@ -541,18 +544,16 @@ def record_and_export(self, records: List[Record], global_tags=None): self._record_gauge(gauge, value, {**tags, **global_tags}) def _record_gauge(self, gauge: Gauge, value: float, tags: dict): - view_data = self.view_manager.get_view(gauge.name) - if not view_data: + if gauge.name not in self._registered_views: self.view_manager.register_view(gauge.view) - # Reobtain the view. - view = self.view_manager.get_view(gauge.name).view + self._registered_views.add(gauge.name) measurement_map = self.stats_recorder.new_measurement_map() tag_map = tag_map_module.TagMap() for key, tag_val in tags.items(): tag_key = tag_key_module.TagKey(key) tag_value = tag_value_module.TagValue(tag_val) tag_map.insert(tag_key, tag_value) - measurement_map.measure_float_put(view.measure, value) + measurement_map.measure_float_put(gauge.measure, value) # NOTE: When we record this metric, timestamp will be renewed. measurement_map.record(tag_map)