Skip to content

Commit

Permalink
Remove metrics-based progress-bar endpoints (ray-project#31702)
Browse files Browse the repository at this point in the history
Signed-off-by: Alan Guo <[email protected]>

This is no longer necessary after ray-project#31577

Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
alanwguo authored and edoakes committed Mar 22, 2023
1 parent 90bbfd4 commit 8d90f70
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 359 deletions.
13 changes: 0 additions & 13 deletions dashboard/client/src/service/job.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import {
JobListRsp,
JobProgressByTaskNameRsp,
JobProgressRsp,
StateApiJobProgressByTaskNameRsp,
StateApiNestedJobProgressRsp,
UnifiedJob,
Expand All @@ -16,17 +14,6 @@ export const getJobDetail = (id: string) => {
return get<UnifiedJob>(`api/jobs/${id}`);
};

export const getJobProgress = (jobId?: string) => {
const jobIdQuery = jobId ? `?job_id=${jobId}` : "";
return get<JobProgressRsp>(`api/progress${jobIdQuery}`);
};

export const getJobProgressByTaskName = (jobId: string) => {
return get<JobProgressByTaskNameRsp>(
`api/progress_by_task_name?job_id=${jobId}`,
);
};

export const getStateApiJobProgressByTaskName = (jobId: string) => {
return get<StateApiJobProgressByTaskNameRsp>(
`api/v0/tasks/summarize?filter_keys=job_id&filter_predicates=%3D&filter_values=${jobId}`,
Expand Down
183 changes: 1 addition & 182 deletions dashboard/modules/metrics/metrics_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
import os
import shutil

from typing import Any, Dict, Optional, List
from typing import Optional

import psutil

from pydantic import BaseModel
from urllib.parse import quote
from ray.dashboard.modules.metrics.grafana_dashboard_factory import (
generate_grafana_dashboard,
Expand Down Expand Up @@ -50,26 +49,6 @@
GRAFANA_CONFIG_INPUT_PATH = os.path.join(METRICS_INPUT_ROOT, "grafana")
GRAFANA_HEALTHCHECK_PATH = "api/health"


class TaskProgress(BaseModel):
num_finished: int = 0
num_pending_args_avail: int = 0
num_submitted_to_worker: int = 0
num_running: int = 0
num_pending_node_assignment: int = 0
num_failed: int = 0
num_unknown: int = 0


class TaskProgressWithTaskName(BaseModel):
name: str
progress: TaskProgress


class TaskProgressByTaskNameResponse(BaseModel):
tasks: List[TaskProgressWithTaskName]


PROMETHEUS_METRIC_MAP = {
"FINISHED": "num_finished",
"PENDING_ARGS_AVAIL": "num_pending_args_avail",
Expand Down Expand Up @@ -210,75 +189,6 @@ async def prometheus_health(self, req):
success=False, message="prometheus healthcheck failed.", reason=str(e)
)

# TODO(aguo): DEPRECATED: Delete this endpoint
@routes.get("/api/progress")
async def get_progress(self, req):
"""
Fetches the progress of tasks by job id. If job_id is not provided,
then we will fetch the progress across all jobs.
"""
job_id = req.query.get("job_id")

job_id_filter = f'JobId="{job_id}"' if job_id else None

query = self._create_prometheus_query_for_progress(
[job_id_filter] if job_id_filter else [], ["State"]
)

try:
prom_data = await self._query_prometheus(query)
progress = _format_prometheus_output(prom_data) or TaskProgress()
return dashboard_optional_utils.rest_response(
success=True, message="success", detail=progress.dict()
)

except PrometheusQueryError as e:
return dashboard_optional_utils.rest_response(
success=False,
message=e.message,
)
except aiohttp.client_exceptions.ClientConnectorError as e:
return dashboard_optional_utils.rest_response(
success=False,
message=str(e),
)

# TODO(aguo): DEPRECATED: Delete this endpoint
@routes.get("/api/progress_by_task_name")
async def get_progress_by_task_name(self, req):
"""
Fetches the progress of tasks by job id. If job_id is not provided,
then we will fetch the progress across all jobs.
"""
if "job_id" not in req.query:
return dashboard_optional_utils.rest_response(
success=False,
message="job_id query is required!",
)

job_id = req.query["job_id"]
job_id_filter = f'JobId="{job_id}"'
query = self._create_prometheus_query_for_progress(
[job_id_filter], ["State", "Name"]
)

try:
prom_data = await self._query_prometheus(query)
progress = _format_prometheus_output_by_task_names(prom_data)
return dashboard_optional_utils.rest_response(
success=True, message="success", detail=progress.dict()
)
except PrometheusQueryError as e:
return dashboard_optional_utils.rest_response(
success=False,
message=e.message,
)
except aiohttp.client_exceptions.ClientConnectorError as e:
return dashboard_optional_utils.rest_response(
success=False,
message=str(e),
)

@staticmethod
def is_minimal_module():
return False
Expand Down Expand Up @@ -387,43 +297,6 @@ async def run(self, server):
f"Generated prometheus and grafana configurations in: {self._metrics_root}"
)

def _create_prometheus_query_for_progress(
self, filters: List[str], sum_by: List[str]
) -> str:
filter_for_terminal_states = [
'State=~"FINISHED|FAILED"',
f'SessionName="{self._session_name}"',
] + filters
filter_for_non_terminal_states = [
'State!~"FINISHED|FAILED"',
f'SessionName="{self._session_name}"',
] + filters

filter_for_terminal_states_str = ",".join(filter_for_terminal_states)
filter_for_non_terminal_states_str = ",".join(filter_for_non_terminal_states)
sum_by_str = ",".join(sum_by)

# Ray does not currently permanently track worker task metrics.
# The metric is cleared after a worker exits. We need to work around
# these restrictions when we query metrics.

# For terminal states (Finished, Failed), we know that the count can
# never decrease. Therefore, we get the latest count of tasks by
# fetching the max value over the past 14 days.
query_for_terminal_states = (
"sum(max_over_time("
f"ray_tasks{{{filter_for_terminal_states_str}}}[14d])) by ({sum_by_str})"
)

# For non-terminal states, we assume that if a worker has at least
# one task in one of these states, the worker has not exited. Therefore,
# we fetch the current count.
query_for_non_terminal_states = (
f"clamp_min(sum(ray_tasks{{{filter_for_non_terminal_states_str}}}) "
f"by ({sum_by_str}), 0)"
)
return f"{query_for_terminal_states} or {query_for_non_terminal_states}"

async def _query_prometheus(self, query):
async with self.http_session.get(
f"{self.prometheus_host}/api/v1/query?query={quote(query)}"
Expand All @@ -434,57 +307,3 @@ async def _query_prometheus(self, query):

message = await resp.text()
raise PrometheusQueryError(resp.status, message)


def _format_prometheus_output(prom_data: Dict[str, Any]) -> Optional[TaskProgress]:
if prom_data["status"] == "success" and prom_data["data"]["resultType"] == "vector":
metrics = prom_data["data"]["result"]
kwargs = {}
for metric in metrics:
metric_name = metric["metric"]["State"]
kwarg_name = (
PROMETHEUS_METRIC_MAP[metric_name]
if metric_name in PROMETHEUS_METRIC_MAP
else "num_unknown"
)
# metric["value"] is a tuple where first item is a timestamp
# and second item is the value.
metric_value = int(metric["value"][1])
kwargs[kwarg_name] = kwargs.get(kwarg_name, 0) + metric_value

return TaskProgress(**kwargs)

return None


def _format_prometheus_output_by_task_names(
prom_data: Dict[str, Any]
) -> TaskProgressByTaskNameResponse:
"""
Returns a list of task names with number of tasks for
each state with that task name.
"""
task_map = {}

if prom_data["status"] == "success" and prom_data["data"]["resultType"] == "vector":
metrics = prom_data["data"]["result"]
for metric in metrics:
task_name = metric["metric"]["Name"]
metric_name = metric["metric"]["State"]
kwargs = task_map.setdefault(task_name, {})
kwarg_name = (
PROMETHEUS_METRIC_MAP[metric_name]
if metric_name in PROMETHEUS_METRIC_MAP
else "num_unknown"
)
# metric["value"] is a tuple where first item is a timestamp
# and second item is the value.
metric_value = int(metric["value"][1])
kwargs[kwarg_name] = kwargs.get(kwarg_name, 0) + metric_value

tasks = [
TaskProgressWithTaskName(name=task_name, progress=TaskProgress(**kwargs))
for task_name, kwargs in task_map.items()
]

return TaskProgressByTaskNameResponse(tasks=tasks)
Loading

0 comments on commit 8d90f70

Please sign in to comment.