From 8d90f70994f2945e7b780d8139aa0e9ada3c5cec Mon Sep 17 00:00:00 2001 From: Alan Guo Date: Tue, 7 Feb 2023 01:01:28 -0800 Subject: [PATCH] Remove metrics-based progress-bar endpoints (#31702) Signed-off-by: Alan Guo This is no longer necessary after #31577 Signed-off-by: Edward Oakes --- dashboard/client/src/service/job.ts | 13 -- dashboard/modules/metrics/metrics_head.py | 183 +--------------------- python/ray/tests/test_metrics_head.py | 164 ------------------- 3 files changed, 1 insertion(+), 359 deletions(-) diff --git a/dashboard/client/src/service/job.ts b/dashboard/client/src/service/job.ts index dad2959cc1eb..41e80cbfc28b 100644 --- a/dashboard/client/src/service/job.ts +++ b/dashboard/client/src/service/job.ts @@ -1,7 +1,5 @@ import { JobListRsp, - JobProgressByTaskNameRsp, - JobProgressRsp, StateApiJobProgressByTaskNameRsp, StateApiNestedJobProgressRsp, UnifiedJob, @@ -16,17 +14,6 @@ export const getJobDetail = (id: string) => { return get(`api/jobs/${id}`); }; -export const getJobProgress = (jobId?: string) => { - const jobIdQuery = jobId ? `?job_id=${jobId}` : ""; - return get(`api/progress${jobIdQuery}`); -}; - -export const getJobProgressByTaskName = (jobId: string) => { - return get( - `api/progress_by_task_name?job_id=${jobId}`, - ); -}; - export const getStateApiJobProgressByTaskName = (jobId: string) => { return get( `api/v0/tasks/summarize?filter_keys=job_id&filter_predicates=%3D&filter_values=${jobId}`, diff --git a/dashboard/modules/metrics/metrics_head.py b/dashboard/modules/metrics/metrics_head.py index 864e36704ea7..3f45eab6aea6 100644 --- a/dashboard/modules/metrics/metrics_head.py +++ b/dashboard/modules/metrics/metrics_head.py @@ -4,11 +4,10 @@ import os import shutil -from typing import Any, Dict, Optional, List +from typing import Optional import psutil -from pydantic import BaseModel from urllib.parse import quote from ray.dashboard.modules.metrics.grafana_dashboard_factory import ( generate_grafana_dashboard, @@ -50,26 +49,6 @@ GRAFANA_CONFIG_INPUT_PATH = os.path.join(METRICS_INPUT_ROOT, "grafana") GRAFANA_HEALTHCHECK_PATH = "api/health" - -class TaskProgress(BaseModel): - num_finished: int = 0 - num_pending_args_avail: int = 0 - num_submitted_to_worker: int = 0 - num_running: int = 0 - num_pending_node_assignment: int = 0 - num_failed: int = 0 - num_unknown: int = 0 - - -class TaskProgressWithTaskName(BaseModel): - name: str - progress: TaskProgress - - -class TaskProgressByTaskNameResponse(BaseModel): - tasks: List[TaskProgressWithTaskName] - - PROMETHEUS_METRIC_MAP = { "FINISHED": "num_finished", "PENDING_ARGS_AVAIL": "num_pending_args_avail", @@ -210,75 +189,6 @@ async def prometheus_health(self, req): success=False, message="prometheus healthcheck failed.", reason=str(e) ) - # TODO(aguo): DEPRECATED: Delete this endpoint - @routes.get("/api/progress") - async def get_progress(self, req): - """ - Fetches the progress of tasks by job id. If job_id is not provided, - then we will fetch the progress across all jobs. - """ - job_id = req.query.get("job_id") - - job_id_filter = f'JobId="{job_id}"' if job_id else None - - query = self._create_prometheus_query_for_progress( - [job_id_filter] if job_id_filter else [], ["State"] - ) - - try: - prom_data = await self._query_prometheus(query) - progress = _format_prometheus_output(prom_data) or TaskProgress() - return dashboard_optional_utils.rest_response( - success=True, message="success", detail=progress.dict() - ) - - except PrometheusQueryError as e: - return dashboard_optional_utils.rest_response( - success=False, - message=e.message, - ) - except aiohttp.client_exceptions.ClientConnectorError as e: - return dashboard_optional_utils.rest_response( - success=False, - message=str(e), - ) - - # TODO(aguo): DEPRECATED: Delete this endpoint - @routes.get("/api/progress_by_task_name") - async def get_progress_by_task_name(self, req): - """ - Fetches the progress of tasks by job id. If job_id is not provided, - then we will fetch the progress across all jobs. - """ - if "job_id" not in req.query: - return dashboard_optional_utils.rest_response( - success=False, - message="job_id query is required!", - ) - - job_id = req.query["job_id"] - job_id_filter = f'JobId="{job_id}"' - query = self._create_prometheus_query_for_progress( - [job_id_filter], ["State", "Name"] - ) - - try: - prom_data = await self._query_prometheus(query) - progress = _format_prometheus_output_by_task_names(prom_data) - return dashboard_optional_utils.rest_response( - success=True, message="success", detail=progress.dict() - ) - except PrometheusQueryError as e: - return dashboard_optional_utils.rest_response( - success=False, - message=e.message, - ) - except aiohttp.client_exceptions.ClientConnectorError as e: - return dashboard_optional_utils.rest_response( - success=False, - message=str(e), - ) - @staticmethod def is_minimal_module(): return False @@ -387,43 +297,6 @@ async def run(self, server): f"Generated prometheus and grafana configurations in: {self._metrics_root}" ) - def _create_prometheus_query_for_progress( - self, filters: List[str], sum_by: List[str] - ) -> str: - filter_for_terminal_states = [ - 'State=~"FINISHED|FAILED"', - f'SessionName="{self._session_name}"', - ] + filters - filter_for_non_terminal_states = [ - 'State!~"FINISHED|FAILED"', - f'SessionName="{self._session_name}"', - ] + filters - - filter_for_terminal_states_str = ",".join(filter_for_terminal_states) - filter_for_non_terminal_states_str = ",".join(filter_for_non_terminal_states) - sum_by_str = ",".join(sum_by) - - # Ray does not currently permanently track worker task metrics. - # The metric is cleared after a worker exits. We need to work around - # these restrictions when we query metrics. - - # For terminal states (Finished, Failed), we know that the count can - # never decrease. Therefore, we get the latest count of tasks by - # fetching the max value over the past 14 days. - query_for_terminal_states = ( - "sum(max_over_time(" - f"ray_tasks{{{filter_for_terminal_states_str}}}[14d])) by ({sum_by_str})" - ) - - # For non-terminal states, we assume that if a worker has at least - # one task in one of these states, the worker has not exited. Therefore, - # we fetch the current count. - query_for_non_terminal_states = ( - f"clamp_min(sum(ray_tasks{{{filter_for_non_terminal_states_str}}}) " - f"by ({sum_by_str}), 0)" - ) - return f"{query_for_terminal_states} or {query_for_non_terminal_states}" - async def _query_prometheus(self, query): async with self.http_session.get( f"{self.prometheus_host}/api/v1/query?query={quote(query)}" @@ -434,57 +307,3 @@ async def _query_prometheus(self, query): message = await resp.text() raise PrometheusQueryError(resp.status, message) - - -def _format_prometheus_output(prom_data: Dict[str, Any]) -> Optional[TaskProgress]: - if prom_data["status"] == "success" and prom_data["data"]["resultType"] == "vector": - metrics = prom_data["data"]["result"] - kwargs = {} - for metric in metrics: - metric_name = metric["metric"]["State"] - kwarg_name = ( - PROMETHEUS_METRIC_MAP[metric_name] - if metric_name in PROMETHEUS_METRIC_MAP - else "num_unknown" - ) - # metric["value"] is a tuple where first item is a timestamp - # and second item is the value. - metric_value = int(metric["value"][1]) - kwargs[kwarg_name] = kwargs.get(kwarg_name, 0) + metric_value - - return TaskProgress(**kwargs) - - return None - - -def _format_prometheus_output_by_task_names( - prom_data: Dict[str, Any] -) -> TaskProgressByTaskNameResponse: - """ - Returns a list of task names with number of tasks for - each state with that task name. - """ - task_map = {} - - if prom_data["status"] == "success" and prom_data["data"]["resultType"] == "vector": - metrics = prom_data["data"]["result"] - for metric in metrics: - task_name = metric["metric"]["Name"] - metric_name = metric["metric"]["State"] - kwargs = task_map.setdefault(task_name, {}) - kwarg_name = ( - PROMETHEUS_METRIC_MAP[metric_name] - if metric_name in PROMETHEUS_METRIC_MAP - else "num_unknown" - ) - # metric["value"] is a tuple where first item is a timestamp - # and second item is the value. - metric_value = int(metric["value"][1]) - kwargs[kwarg_name] = kwargs.get(kwarg_name, 0) + metric_value - - tasks = [ - TaskProgressWithTaskName(name=task_name, progress=TaskProgress(**kwargs)) - for task_name, kwargs in task_map.items() - ] - - return TaskProgressByTaskNameResponse(tasks=tasks) diff --git a/python/ray/tests/test_metrics_head.py b/python/ray/tests/test_metrics_head.py index 3a43c79bbcef..5084ed3a4968 100644 --- a/python/ray/tests/test_metrics_head.py +++ b/python/ray/tests/test_metrics_head.py @@ -6,13 +6,6 @@ import sys import tempfile -from ray.dashboard.modules.metrics.metrics_head import ( - _format_prometheus_output, - _format_prometheus_output_by_task_names, - TaskProgressByTaskNameResponse, - TaskProgressWithTaskName, - TaskProgress, -) from ray.dashboard.modules.metrics.grafana_dashboard_factory import GRAFANA_PANELS from ray.tests.conftest import _ray_start @@ -97,163 +90,6 @@ def test_metrics_folder_when_dashboard_disabled(): assert not os.path.exists(f"{session_dir}/metrics/prometheus/prometheus.yml") -def test_format_prometheus_output(): - prom_output = { - "status": "success", - "data": { - "resultType": "vector", - "result": [ - {"metric": {"State": "RUNNING"}, "value": [1664330796.832, "2"]}, - { - "metric": {"State": "RUNNING_IN_RAY_GET"}, - "value": [1664330796.832, "4"], - }, - { - "metric": {"State": "RUNNING_IN_RAY_WAIT"}, - "value": [1664330796.832, "3"], - }, - { - "metric": {"State": "SUBMITTED_TO_WORKER"}, - "value": [1664330796.832, "5"], - }, - {"metric": {"State": "FINISHED"}, "value": [1664330796.832, "3"]}, - { - "metric": {"State": "PENDING_ARGS_AVAIL"}, - "value": [1664330796.832, "5"], - }, - { - "metric": {"State": "PENDING_NODE_ASSIGNMENT"}, - "value": [1664330796.832, "2"], - }, - { - "metric": {"State": "PENDING_ARGS_FETCH"}, - "value": [1664330796.832, "7"], - }, - { - "metric": {"State": "PENDING_OBJ_STORE_MEM_AVAIL"}, - "value": [1664330796.832, "8"], - }, - { - "metric": {"State": "FAILED"}, - "value": [1664330796.832, "6"], - }, - ], - }, - } - assert _format_prometheus_output(prom_output) == TaskProgress( - num_finished=3, - num_pending_args_avail=5, - num_pending_node_assignment=17, - num_running=9, - num_submitted_to_worker=5, - num_unknown=0, - num_failed=6, - ) - - # With unknown states from prometheus - prom_output_with_unknown = { - "status": "success", - "data": { - "resultType": "vector", - "result": [ - {"metric": {"State": "RUNNING"}, "value": [1664330796.832, "10"]}, - { - "metric": {"State": "RUNNING_IN_RAY_GET"}, - "value": [1664330796.832, "4"], - }, - {"metric": {"State": "FINISHED"}, "value": [1664330796.832, "20"]}, - { - "metric": {"State": "PENDING_ARGS_AVAIL"}, - "value": [1664330796.832, "5"], - }, - { - "metric": {"State": "SOME_NEW_VARIABLE"}, - "value": [1664330796.832, "3"], - }, - { - "metric": {"State": "FAILED"}, - "value": [1664330796.832, "3"], - }, - ], - }, - } - assert _format_prometheus_output(prom_output_with_unknown) == TaskProgress( - num_finished=20, - num_pending_args_avail=5, - num_pending_node_assignment=0, - num_running=14, - num_submitted_to_worker=0, - num_unknown=3, - num_failed=3, - ) - - -def test_format_prometheus_output_by_task_names(): - prom_output = { - "status": "success", - "data": { - "resultType": "vector", - "result": [ - { - "metric": {"Name": "step1", "State": "RUNNING"}, - "value": [1666390500.167, "3"], - }, - { - "metric": {"Name": "step1", "State": "SUBMITTED_TO_WORKER"}, - "value": [1666390500.167, "3"], - }, - { - "metric": {"Name": "step1", "State": "PENDING_ARGS_AVAIL"}, - "value": [1666390500.167, "0"], - }, - { - "metric": {"Name": "step1", "State": "PENDING_NODE_ASSIGNMENT"}, - "value": [1666390500.167, "0"], - }, - { - "metric": {"Name": "step2", "State": "RUNNING"}, - "value": [1666390500.167, "2"], - }, - { - "metric": {"Name": "step2", "State": "SUBMITTED_TO_WORKER"}, - "value": [1666390500.167, "0"], - }, - { - "metric": {"Name": "step2", "State": "PENDING_ARGS_AVAIL"}, - "value": [1666390500.167, "3"], - }, - { - "metric": {"Name": "step3", "State": "PENDING_ARGS_AVAIL"}, - "value": [1666390500.167, "1"], - }, - ], - }, - } - assert _format_prometheus_output_by_task_names( - prom_output - ) == TaskProgressByTaskNameResponse( - tasks=[ - TaskProgressWithTaskName( - name="step1", - progress=TaskProgress( - num_running=3, - num_submitted_to_worker=3, - ), - ), - TaskProgressWithTaskName( - name="step2", - progress=TaskProgress( - num_running=2, - num_pending_args_avail=3, - ), - ), - TaskProgressWithTaskName( - name="step3", progress=TaskProgress(num_pending_args_avail=1) - ), - ] - ) - - def test_default_dashboard_utilizes_global_filters(): for panel in GRAFANA_PANELS: for target in panel.targets: