Skip to content

Commit

Permalink
Add hardware stats to train_head (#46719)
Browse files Browse the repository at this point in the history
CPU utilization and GPU utilization is useful to see next to TrainWorkers.
Fixes actor GPU utilization not working due to bug introduced in #41399
Also refactor to use DataOrganizer to re-use more code with rest of dashboard.

Signed-off-by: Alan Guo <[email protected]>
  • Loading branch information
alanwguo authored Jul 26, 2024
1 parent b4b4c79 commit e1e7558
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ describe("ActorTable", () => {
utilizationGpu: 50,
memoryUsed: 0,
memoryTotal: 20,
processes: [{ pid: 25321, gpuMemoryUsage: 0 }],
processesPids: [{ pid: 25321, gpuMemoryUsage: 0 }],
},
],
},
Expand Down Expand Up @@ -293,7 +293,7 @@ describe("ActorTable", () => {
utilizationGpu: 0,
memoryUsed: 10,
memoryTotal: 20,
processes: [{ pid: 25322, gpuMemoryUsage: 10 }],
processesPids: [{ pid: 25322, gpuMemoryUsage: 10 }],
},
],
},
Expand Down
4 changes: 2 additions & 2 deletions python/ray/dashboard/client/src/pages/node/GPUColumn.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export const WorkerGpuRow = ({
}) => {
const workerGPUEntries = (gpus ?? [])
.map((gpu, i) => {
const process = gpu.processes?.find(
const process = gpu.processesPids?.find(
(process) => process.pid === workerPID,
);
if (!process) {
Expand All @@ -81,7 +81,7 @@ export const getSumGpuUtilization = (
// aggregate of the WorkerGpuRow and follows the same logic.
const workerGPUUtilizationEntries = (gpus ?? [])
.map((gpu, i) => {
const process = gpu.processes?.find(
const process = gpu.processesPids?.find(
(process) => process.pid === workerPID,
);
if (!process) {
Expand Down
4 changes: 2 additions & 2 deletions python/ray/dashboard/client/src/pages/node/GRAMColumn.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const WorkerGRAM = ({
}) => {
const workerGRAMEntries = (gpus ?? [])
.map((gpu, i) => {
const process = gpu.processes?.find(
const process = gpu.processesPids?.find(
(process) => workerPID && process.pid === workerPID,
);
if (!process) {
Expand Down Expand Up @@ -73,7 +73,7 @@ export const getSumGRAMUsage = (
// aggregate of WorkerGRAM and follows the same logic.
const workerGRAMEntries = (gpus ?? [])
.map((gpu, i) => {
const process = gpu.processes?.find(
const process = gpu.processesPids?.find(
(process) => workerPID && process.pid === workerPID,
);
if (!process) {
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dashboard/client/src/type/node.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export type GPUStats = {
utilizationGpu?: number;
memoryUsed: number;
memoryTotal: number;
processes?: ProcessGPUUsage[];
processesPids?: ProcessGPUUsage[];
};

export type NodeDetailExtend = {
Expand Down
2 changes: 1 addition & 1 deletion python/ray/dashboard/datacenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ async def _get_actor(actor):
for gpu_stats in node_physical_stats.get("gpus", []):
# gpu_stats.get("processes") can be None, an empty list or a
# list of dictionaries.
for process in gpu_stats.get("processes") or []:
for process in gpu_stats.get("processesPids") or []:
if process["pid"] == pid:
actor_process_gpu_stats.append(gpu_stats)
break
Expand Down
107 changes: 76 additions & 31 deletions python/ray/dashboard/modules/train/train_head.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
from typing import List

from aiohttp.web import Request, Response

import ray
import ray.dashboard.optional_utils as dashboard_optional_utils
import ray.dashboard.utils as dashboard_utils
from ray.core.generated import gcs_service_pb2, gcs_service_pb2_grpc
from ray.dashboard.modules.actor.actor_head import actor_table_data_to_dict
from ray.core.generated import gcs_service_pb2_grpc
from ray.dashboard.datacenter import DataOrganizer
from ray.dashboard.modules.job.common import JobInfoStorageClient
from ray.dashboard.modules.job.utils import find_jobs_by_job_ids
from ray.util.annotations import DeveloperAPI
Expand All @@ -29,10 +30,7 @@ def __init__(self, dashboard_head):
@DeveloperAPI
async def get_train_runs(self, req: Request) -> Response:
try:
from ray.train._internal.state.schema import (
TrainRunInfoWithDetails,
TrainRunsResponse,
)
from ray.train._internal.state.schema import TrainRunsResponse
except ImportError:
logger.exception(
"Train is not installed. Please run `pip install ray[train]` "
Expand All @@ -58,24 +56,22 @@ async def get_train_runs(self, req: Request) -> Response:
else:
try:
train_runs = await stats_actor.get_all_train_runs.remote()
await self._add_actor_status_and_update_run_status(train_runs)
train_runs_with_details = (
await self._add_actor_status_and_update_run_status(train_runs)
)
# Sort train runs in reverse chronological order
train_runs = sorted(
train_runs.values(),
train_runs_with_details = sorted(
train_runs_with_details,
key=lambda run: run.start_time_ms,
reverse=True,
)
job_details = await find_jobs_by_job_ids(
self._dashboard_head.gcs_aio_client,
self._job_info_client,
[run.job_id for run in train_runs],
[run.job_id for run in train_runs_with_details],
)
train_runs_with_details = [
TrainRunInfoWithDetails(
**run.dict(), job_details=job_details.get(run.job_id)
)
for run in train_runs
]
for run in train_runs_with_details:
run.job_details = job_details.get(run.job_id)
details = TrainRunsResponse(train_runs=train_runs_with_details)
except ray.exceptions.RayTaskError as e:
# Task failure sometimes are due to GCS
Expand All @@ -95,43 +91,92 @@ async def get_train_runs(self, req: Request) -> Response:
)

async def _add_actor_status_and_update_run_status(self, train_runs):
from ray.train._internal.state.schema import ActorStatusEnum, RunStatusEnum
from ray.train._internal.state.schema import (
ActorStatusEnum,
RunStatusEnum,
TrainRunInfoWithDetails,
TrainWorkerInfoWithDetails,
)

actor_status_table = {}
try:
logger.info("Getting all actor info from GCS.")
request = gcs_service_pb2.GetAllActorInfoRequest()
reply = await self._gcs_actor_info_stub.GetAllActorInfo(request, timeout=5)
if reply.status.code == 0:
for message in reply.actor_table_data:
actor_table_data = actor_table_data_to_dict(message)
actor_status_table[actor_table_data["actorId"]] = actor_table_data[
"state"
]
actors = await DataOrganizer.get_all_actors()

except Exception:
logger.exception("Error Getting all actor info from GCS.")

train_runs_with_details: List[TrainRunInfoWithDetails] = []

for train_run in train_runs.values():
worker_infos_with_details: List[TrainWorkerInfoWithDetails] = []

for worker_info in train_run.workers:
worker_info.status = actor_status_table.get(worker_info.actor_id, None)
actor = actors.get(worker_info.actor_id, None)
# Add hardware metrics to API response
if actor:
gpus = [
gpu
for gpu in actor["gpus"]
if worker_info.pid
in [process["pid"] for process in gpu["processesPids"]]
]
# Need to convert processesPids into a proper list.
# It's some weird ImmutableList structureo
# We also convert the list of processes into a single item since
# an actor is only a single process and cannot match multiple
# processes.
formatted_gpus = [
{
**gpu,
"processInfo": [
process
for process in gpu["processesPids"]
if process["pid"] == worker_info.pid
][0],
}
for gpu in gpus
]

worker_info_with_details = TrainWorkerInfoWithDetails.parse_obj(
{
**worker_info.dict(),
"status": actor["state"],
"processStats": actor["processStats"],
"gpus": formatted_gpus,
}
)
else:
worker_info_with_details = TrainWorkerInfoWithDetails.parse_obj(
worker_info.dict()
)

worker_infos_with_details.append(worker_info_with_details)

train_run_with_details = TrainRunInfoWithDetails.parse_obj(
{**train_run.dict(), "workers": worker_infos_with_details}
)

# The train run can be unexpectedly terminated before the final run
# status was updated. This could be due to errors outside of the training
# function (e.g., system failure or user interruption) that crashed the
# train controller.
# We need to detect this case and mark the train run as ABORTED.
controller_actor_status = actor_status_table.get(
controller_actor_status = actors.get(
train_run.controller_actor_id, None
)
).get("state")
if (
controller_actor_status == ActorStatusEnum.DEAD
and train_run.run_status == RunStatusEnum.STARTED
):
train_run.run_status = RunStatusEnum.ABORTED
train_run.status_detail = (
train_run_with_details.run_status = RunStatusEnum.ABORTED
train_run_with_details.status_detail = (
"Unexpectedly terminated due to system errors."
)

train_runs_with_details.append(train_run_with_details)

return train_runs_with_details

@staticmethod
def is_minimal_module():
return False
Expand Down
52 changes: 52 additions & 0 deletions python/ray/train/_internal/state/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,55 @@ class TrainWorkerInfo(BaseModel):
)


@DeveloperAPI
class MemoryInfo(BaseModel):
rss: int
vms: int
pfaults: Optional[int]
pageins: Optional[int]


@DeveloperAPI
class ProcessStats(BaseModel):
cpuPercent: float
# total memory, free memory, memory used ratio
mem: Optional[List[int]]
memoryInfo: MemoryInfo


class ProcessGPUUsage(BaseModel):
# This gpu usage stats from a process
pid: int
gpuMemoryUsage: int


@DeveloperAPI
class GPUStats(BaseModel):
uuid: str
index: int
name: str
utilizationGpu: Optional[float]
memoryUsed: float
memoryTotal: float
processInfo: ProcessGPUUsage


@DeveloperAPI
class TrainWorkerInfoWithDetails(TrainWorkerInfo):
"""Metadata of a Ray Train worker."""

processStats: Optional[ProcessStats] = Field(
None, description="Process stats of the worker."
)
gpus: List[GPUStats] = Field(
default_factory=list,
description=(
"GPU stats of the worker. "
"Only returns GPUs that are attached to the worker process."
),
)


@DeveloperAPI
class TrainDatasetInfo(BaseModel):
name: str = Field(
Expand Down Expand Up @@ -91,6 +140,9 @@ class TrainRunInfo(BaseModel):
class TrainRunInfoWithDetails(TrainRunInfo):
"""Metadata for a Ray Train run and information about its workers."""

workers: List[TrainWorkerInfoWithDetails] = Field(
description="A List of Train workers sorted by global ranks."
)
job_details: Optional[JobDetails] = Field(
None, description="Details of the job that started this Train run."
)
Expand Down

0 comments on commit e1e7558

Please sign in to comment.