Skip to content

Commit

Permalink
[core][state][no_early_kickoff] Add "humanify" feature to StateSchema (
Browse files Browse the repository at this point in the history
…#35059)

This PR introduces a framework to address this issue: #31876.

Essentially, we add a humanify() method to the base stateSchema class, and any subclasses would provide relevant format_fn as a metadata argument to any of its fields, and the humanify() method would aggregate the output from the lambdas.

This PR is meant to introduce the general framework, and any additions (new format_fn) can be added by request.
  • Loading branch information
ProjectsByJackHe authored May 15, 2023
1 parent ecd9996 commit 3d977b8
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 16 deletions.
123 changes: 107 additions & 16 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import json
import logging
import sys
Expand Down Expand Up @@ -75,6 +76,43 @@ class SummaryResource(Enum):
PredicateType = str # Literal["=", "!="]


class Humanify:
"""A class containing default methods to
convert units into a human readable string."""

def timestamp(x: float):
"""Converts miliseconds to a datetime object."""
return str(datetime.datetime.fromtimestamp(x / 1000))

def memory(x: int):
"""Converts raw bytes to a human readable memory size."""
if x >= 2**30:
return str(format(x / (2**30), ".3f")) + " GiB"
elif x >= 2**20:
return str(format(x / (2**20), ".3f")) + " MiB"
elif x >= 2**10:
return str(format(x / (2**10), ".3f")) + " KiB"
return str(format(x, ".3f")) + " B"

def duration(x: int):
"""Converts miliseconds to a human readable duration."""
return str(datetime.timedelta(milliseconds=x))

def events(events: List[dict]):
"""Converts a list of task events into a human readable format."""
for event in events:
if "created_ms" in event:
event["created_ms"] = Humanify.timestamp(event["created_ms"])
return events

def node_resources(resources: dict):
"""Converts a node's resources into a human readable format."""
for resource in resources:
if "memory" in resource:
resources[resource] = Humanify.memory(resources[resource])
return resources


@dataclass(init=True)
class ListApiOptions:
# Maximum number of entries to return
Expand Down Expand Up @@ -144,7 +182,7 @@ class SummaryApiOptions:
summary_by: Optional[str] = None


def state_column(*, filterable: bool, detail: bool = False, **kwargs):
def state_column(*, filterable: bool, detail: bool = False, format_fn=None, **kwargs):
"""A wrapper around dataclass.field to add additional metadata.
The metadata is used to define detail / filterable option of
Expand All @@ -155,15 +193,16 @@ def state_column(*, filterable: bool, detail: bool = False, **kwargs):
filterable: If True, the column can be used for filtering.
kwargs: The same kwargs for the `dataclasses.field` function.
"""
m = {"detail": detail, "filterable": filterable}

m = {"detail": detail, "filterable": filterable, "format_fn": format_fn}
# Default for detail field is None since it could be missing.
if detail and "default" not in kwargs:
kwargs["default"] = None

if "metadata" in kwargs:
# Metadata explicitly specified, so add detail and filterable if missing.
kwargs["metadata"].update(m)
else:
# Metadata not explicitly specified, so add it.
kwargs["metadata"] = m
return field(**kwargs)

Expand Down Expand Up @@ -193,8 +232,32 @@ class State(StateSchema):
# Returns {"column_a", "column_b"}
s.columns()
```
In addition, the schema also provides a humanify abstract method to
convert the state object into something human readable, ready for printing.
Subclasses should override this method, providing logic to convert its own fields
to something human readable, packaged and returned in a dict.
Each field that wants to be humanified should include a 'format_fn' key in its
metadata dictionary.
"""

@classmethod
def humanify(cls, state: dict) -> dict:
"""Convert the given state object into something human readable."""
for f in fields(cls):
if (
f.metadata.get("format_fn") is not None
and f.name in state
and state[f.name] is not None
):
try:
state[f.name] = f.metadata["format_fn"](state[f.name])
except Exception as e:
logger.error(f"Failed to format {f.name}:{state[f.name]} with {e}")
return state

@classmethod
def list_columns(cls, detail: bool = True) -> List[str]:
"""Return a list of columns."""
Expand Down Expand Up @@ -426,13 +489,19 @@ class NodeState(StateSchema):
#: The name of the node if it is given by the name argument.
node_name: str = state_column(filterable=True)
#: The total resources of the node.
resources_total: dict = state_column(filterable=False)
resources_total: dict = state_column(
filterable=False, format_fn=Humanify.node_resources
)
#: The time when the node (raylet) starts.
start_time_ms: Optional[int] = state_column(filterable=False, detail=True)
start_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)
#: The time when the node exits. The timestamp could be delayed
#: if the node is dead unexpectedly (could be delayed
# up to 30 seconds).
end_time_ms: Optional[int] = state_column(filterable=False, detail=True)
end_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)


@dataclass(init=True)
Expand Down Expand Up @@ -491,17 +560,25 @@ class WorkerState(StateSchema):
#: -> worker_launched_time_ms (process started).
#: -> start_time_ms (worker is ready to be used).
#: -> end_time_ms (worker is destroyed).
worker_launch_time_ms: Optional[int] = state_column(filterable=False, detail=True)
worker_launch_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)
#: The time worker is succesfully launched
#: -1 if the value doesn't exist.
worker_launched_time_ms: Optional[int] = state_column(filterable=False, detail=True)
worker_launched_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)
#: The time when the worker is started and initialized.
#: 0 if the value doesn't exist.
start_time_ms: Optional[int] = state_column(filterable=False, detail=True)
start_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)
#: The time when the worker exits. The timestamp could be delayed
#: if the worker is dead unexpectedly.
#: 0 if the value doesn't exist.
end_time_ms: Optional[int] = state_column(filterable=False, detail=True)
end_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)


@dataclass(init=True)
Expand Down Expand Up @@ -569,15 +646,27 @@ class TaskState(StateSchema):
#: The list of events of the given task.
#: Refer to src/ray/protobuf/common.proto for a detailed explanation of the state
#: breakdowns and typical state transition flow.
events: Optional[List[dict]] = state_column(detail=True, filterable=False)
events: Optional[List[dict]] = state_column(
detail=True, filterable=False, format_fn=Humanify.events
)
#: The list of profile events of the given task.
profiling_data: Optional[dict] = state_column(detail=True, filterable=False)
#: The time when the task is created. A Unix timestamp in ms.
creation_time_ms: Optional[int] = state_column(detail=True, filterable=False)
creation_time_ms: Optional[int] = state_column(
detail=True,
filterable=False,
format_fn=Humanify.timestamp,
)
#: The time when the task starts to run. A Unix timestamp in ms.
start_time_ms: Optional[int] = state_column(detail=True, filterable=False)
start_time_ms: Optional[int] = state_column(
detail=True,
filterable=False,
format_fn=Humanify.timestamp,
)
#: The time when the task is finished or failed. A Unix timestamp in ms.
end_time_ms: Optional[int] = state_column(detail=True, filterable=False)
end_time_ms: Optional[int] = state_column(
detail=True, filterable=False, format_fn=Humanify.timestamp
)
#: The task logs info, e.g. offset into the worker log file when the task
#: starts/finishes.
task_log_info: Optional[dict] = state_column(detail=True, filterable=False)
Expand All @@ -592,7 +681,7 @@ class ObjectState(StateSchema):
#: The id of the object.
object_id: str = state_column(filterable=True)
#: The size of the object in mb.
object_size: int = state_column(filterable=True)
object_size: int = state_column(filterable=True, format_fn=Humanify.memory)
#: The status of the task that creates the object.
#:
#: - NIL: We don't have a status for this task because we are not the owner or the
Expand Down Expand Up @@ -651,7 +740,9 @@ class RuntimeEnvState(StateSchema):
success: bool = state_column(filterable=True)
#: The latency of creating the runtime environment.
#: Available if the runtime env is successfully created.
creation_time_ms: Optional[float] = state_column(filterable=False)
creation_time_ms: Optional[float] = state_column(
filterable=False, format_fn=Humanify.timestamp
)
#: The node id of this runtime environment.
node_id: str = state_column(filterable=True)
#: The number of actors and tasks that use this runtime environment.
Expand Down
5 changes: 5 additions & 0 deletions python/ray/experimental/state/state_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ def output_with_format(
format: AvailableFormat = AvailableFormat.DEFAULT,
detail: bool = False,
) -> str:
# humanify all input state data
if schema:
state_data = [schema.humanify(state) for state in state_data]
if format == AvailableFormat.DEFAULT:
return get_table_output(state_data, schema, detail)
if format == AvailableFormat.YAML:
Expand Down Expand Up @@ -292,8 +295,10 @@ def format_get_api_output(
) -> str:
if not state_data or isinstance(state_data, list) and len(state_data) == 0:
return f"Resource with id={id} not found in the cluster."

if not isinstance(state_data, list):
state_data = [state_data]

state_data = [dataclasses.asdict(state) for state in state_data]
return output_with_format(state_data, schema=schema, format=format, detail=True)

Expand Down
13 changes: 13 additions & 0 deletions python/ray/tests/test_state_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from unittest.mock import MagicMock

import pytest
from ray.experimental.state.common import Humanify
from ray._private.gcs_utils import GcsAioClient
import yaml
from click.testing import CliRunner
Expand Down Expand Up @@ -1696,6 +1697,18 @@ def ready(self):
assert result.total == 6


def test_humanify():
raw_bytes = 1024
assert Humanify.memory(raw_bytes) == "1.000 KiB"
raw_bytes *= 1024
assert Humanify.memory(raw_bytes) == "1.000 MiB"
raw_bytes *= 1024
assert Humanify.memory(raw_bytes) == "1.000 GiB"
timestamp = 1610000000
assert "1970-01" in Humanify.timestamp(timestamp)
assert Humanify.duration(timestamp) == "18 days, 15:13:20"


@pytest.mark.asyncio
async def test_state_data_source_client_limit_distributed_sources(ray_start_cluster):
cluster = ray_start_cluster
Expand Down

0 comments on commit 3d977b8

Please sign in to comment.