From 3d977b8b6e65058e509d3bf25a9105b5a09a053c Mon Sep 17 00:00:00 2001 From: "Jack He (Github)" Date: Sun, 14 May 2023 22:18:32 -0700 Subject: [PATCH] [core][state][no_early_kickoff] Add "humanify" feature to StateSchema (#35059) This PR introduces a framework to address this issue: #31876. Essentially, we add a humanify() method to the base stateSchema class, and any subclasses would provide relevant format_fn as a metadata argument to any of its fields, and the humanify() method would aggregate the output from the lambdas. This PR is meant to introduce the general framework, and any additions (new format_fn) can be added by request. --- python/ray/experimental/state/common.py | 123 ++++++++++++++++++--- python/ray/experimental/state/state_cli.py | 5 + python/ray/tests/test_state_api.py | 13 +++ 3 files changed, 125 insertions(+), 16 deletions(-) diff --git a/python/ray/experimental/state/common.py b/python/ray/experimental/state/common.py index 3471ad03a35b..21ae132c44c2 100644 --- a/python/ray/experimental/state/common.py +++ b/python/ray/experimental/state/common.py @@ -1,3 +1,4 @@ +import datetime import json import logging import sys @@ -75,6 +76,43 @@ class SummaryResource(Enum): PredicateType = str # Literal["=", "!="] +class Humanify: + """A class containing default methods to + convert units into a human readable string.""" + + def timestamp(x: float): + """Converts miliseconds to a datetime object.""" + return str(datetime.datetime.fromtimestamp(x / 1000)) + + def memory(x: int): + """Converts raw bytes to a human readable memory size.""" + if x >= 2**30: + return str(format(x / (2**30), ".3f")) + " GiB" + elif x >= 2**20: + return str(format(x / (2**20), ".3f")) + " MiB" + elif x >= 2**10: + return str(format(x / (2**10), ".3f")) + " KiB" + return str(format(x, ".3f")) + " B" + + def duration(x: int): + """Converts miliseconds to a human readable duration.""" + return str(datetime.timedelta(milliseconds=x)) + + def events(events: List[dict]): + """Converts a list of task events into a human readable format.""" + for event in events: + if "created_ms" in event: + event["created_ms"] = Humanify.timestamp(event["created_ms"]) + return events + + def node_resources(resources: dict): + """Converts a node's resources into a human readable format.""" + for resource in resources: + if "memory" in resource: + resources[resource] = Humanify.memory(resources[resource]) + return resources + + @dataclass(init=True) class ListApiOptions: # Maximum number of entries to return @@ -144,7 +182,7 @@ class SummaryApiOptions: summary_by: Optional[str] = None -def state_column(*, filterable: bool, detail: bool = False, **kwargs): +def state_column(*, filterable: bool, detail: bool = False, format_fn=None, **kwargs): """A wrapper around dataclass.field to add additional metadata. The metadata is used to define detail / filterable option of @@ -155,15 +193,16 @@ def state_column(*, filterable: bool, detail: bool = False, **kwargs): filterable: If True, the column can be used for filtering. kwargs: The same kwargs for the `dataclasses.field` function. """ - m = {"detail": detail, "filterable": filterable} - + m = {"detail": detail, "filterable": filterable, "format_fn": format_fn} # Default for detail field is None since it could be missing. if detail and "default" not in kwargs: kwargs["default"] = None if "metadata" in kwargs: + # Metadata explicitly specified, so add detail and filterable if missing. kwargs["metadata"].update(m) else: + # Metadata not explicitly specified, so add it. kwargs["metadata"] = m return field(**kwargs) @@ -193,8 +232,32 @@ class State(StateSchema): # Returns {"column_a", "column_b"} s.columns() ``` + + In addition, the schema also provides a humanify abstract method to + convert the state object into something human readable, ready for printing. + + Subclasses should override this method, providing logic to convert its own fields + to something human readable, packaged and returned in a dict. + + Each field that wants to be humanified should include a 'format_fn' key in its + metadata dictionary. """ + @classmethod + def humanify(cls, state: dict) -> dict: + """Convert the given state object into something human readable.""" + for f in fields(cls): + if ( + f.metadata.get("format_fn") is not None + and f.name in state + and state[f.name] is not None + ): + try: + state[f.name] = f.metadata["format_fn"](state[f.name]) + except Exception as e: + logger.error(f"Failed to format {f.name}:{state[f.name]} with {e}") + return state + @classmethod def list_columns(cls, detail: bool = True) -> List[str]: """Return a list of columns.""" @@ -426,13 +489,19 @@ class NodeState(StateSchema): #: The name of the node if it is given by the name argument. node_name: str = state_column(filterable=True) #: The total resources of the node. - resources_total: dict = state_column(filterable=False) + resources_total: dict = state_column( + filterable=False, format_fn=Humanify.node_resources + ) #: The time when the node (raylet) starts. - start_time_ms: Optional[int] = state_column(filterable=False, detail=True) + start_time_ms: Optional[int] = state_column( + filterable=False, detail=True, format_fn=Humanify.timestamp + ) #: The time when the node exits. The timestamp could be delayed #: if the node is dead unexpectedly (could be delayed # up to 30 seconds). - end_time_ms: Optional[int] = state_column(filterable=False, detail=True) + end_time_ms: Optional[int] = state_column( + filterable=False, detail=True, format_fn=Humanify.timestamp + ) @dataclass(init=True) @@ -491,17 +560,25 @@ class WorkerState(StateSchema): #: -> worker_launched_time_ms (process started). #: -> start_time_ms (worker is ready to be used). #: -> end_time_ms (worker is destroyed). - worker_launch_time_ms: Optional[int] = state_column(filterable=False, detail=True) + worker_launch_time_ms: Optional[int] = state_column( + filterable=False, detail=True, format_fn=Humanify.timestamp + ) #: The time worker is succesfully launched #: -1 if the value doesn't exist. - worker_launched_time_ms: Optional[int] = state_column(filterable=False, detail=True) + worker_launched_time_ms: Optional[int] = state_column( + filterable=False, detail=True, format_fn=Humanify.timestamp + ) #: The time when the worker is started and initialized. #: 0 if the value doesn't exist. - start_time_ms: Optional[int] = state_column(filterable=False, detail=True) + start_time_ms: Optional[int] = state_column( + filterable=False, detail=True, format_fn=Humanify.timestamp + ) #: The time when the worker exits. The timestamp could be delayed #: if the worker is dead unexpectedly. #: 0 if the value doesn't exist. - end_time_ms: Optional[int] = state_column(filterable=False, detail=True) + end_time_ms: Optional[int] = state_column( + filterable=False, detail=True, format_fn=Humanify.timestamp + ) @dataclass(init=True) @@ -569,15 +646,27 @@ class TaskState(StateSchema): #: The list of events of the given task. #: Refer to src/ray/protobuf/common.proto for a detailed explanation of the state #: breakdowns and typical state transition flow. - events: Optional[List[dict]] = state_column(detail=True, filterable=False) + events: Optional[List[dict]] = state_column( + detail=True, filterable=False, format_fn=Humanify.events + ) #: The list of profile events of the given task. profiling_data: Optional[dict] = state_column(detail=True, filterable=False) #: The time when the task is created. A Unix timestamp in ms. - creation_time_ms: Optional[int] = state_column(detail=True, filterable=False) + creation_time_ms: Optional[int] = state_column( + detail=True, + filterable=False, + format_fn=Humanify.timestamp, + ) #: The time when the task starts to run. A Unix timestamp in ms. - start_time_ms: Optional[int] = state_column(detail=True, filterable=False) + start_time_ms: Optional[int] = state_column( + detail=True, + filterable=False, + format_fn=Humanify.timestamp, + ) #: The time when the task is finished or failed. A Unix timestamp in ms. - end_time_ms: Optional[int] = state_column(detail=True, filterable=False) + end_time_ms: Optional[int] = state_column( + detail=True, filterable=False, format_fn=Humanify.timestamp + ) #: The task logs info, e.g. offset into the worker log file when the task #: starts/finishes. task_log_info: Optional[dict] = state_column(detail=True, filterable=False) @@ -592,7 +681,7 @@ class ObjectState(StateSchema): #: The id of the object. object_id: str = state_column(filterable=True) #: The size of the object in mb. - object_size: int = state_column(filterable=True) + object_size: int = state_column(filterable=True, format_fn=Humanify.memory) #: The status of the task that creates the object. #: #: - NIL: We don't have a status for this task because we are not the owner or the @@ -651,7 +740,9 @@ class RuntimeEnvState(StateSchema): success: bool = state_column(filterable=True) #: The latency of creating the runtime environment. #: Available if the runtime env is successfully created. - creation_time_ms: Optional[float] = state_column(filterable=False) + creation_time_ms: Optional[float] = state_column( + filterable=False, format_fn=Humanify.timestamp + ) #: The node id of this runtime environment. node_id: str = state_column(filterable=True) #: The number of actors and tasks that use this runtime environment. diff --git a/python/ray/experimental/state/state_cli.py b/python/ray/experimental/state/state_cli.py index 1b19db874701..ee7449f902ba 100644 --- a/python/ray/experimental/state/state_cli.py +++ b/python/ray/experimental/state/state_cli.py @@ -172,6 +172,9 @@ def output_with_format( format: AvailableFormat = AvailableFormat.DEFAULT, detail: bool = False, ) -> str: + # humanify all input state data + if schema: + state_data = [schema.humanify(state) for state in state_data] if format == AvailableFormat.DEFAULT: return get_table_output(state_data, schema, detail) if format == AvailableFormat.YAML: @@ -292,8 +295,10 @@ def format_get_api_output( ) -> str: if not state_data or isinstance(state_data, list) and len(state_data) == 0: return f"Resource with id={id} not found in the cluster." + if not isinstance(state_data, list): state_data = [state_data] + state_data = [dataclasses.asdict(state) for state in state_data] return output_with_format(state_data, schema=schema, format=format, detail=True) diff --git a/python/ray/tests/test_state_api.py b/python/ray/tests/test_state_api.py index c9b70b1d9ce0..aa68bf05fd09 100644 --- a/python/ray/tests/test_state_api.py +++ b/python/ray/tests/test_state_api.py @@ -8,6 +8,7 @@ from unittest.mock import MagicMock import pytest +from ray.experimental.state.common import Humanify from ray._private.gcs_utils import GcsAioClient import yaml from click.testing import CliRunner @@ -1696,6 +1697,18 @@ def ready(self): assert result.total == 6 +def test_humanify(): + raw_bytes = 1024 + assert Humanify.memory(raw_bytes) == "1.000 KiB" + raw_bytes *= 1024 + assert Humanify.memory(raw_bytes) == "1.000 MiB" + raw_bytes *= 1024 + assert Humanify.memory(raw_bytes) == "1.000 GiB" + timestamp = 1610000000 + assert "1970-01" in Humanify.timestamp(timestamp) + assert Humanify.duration(timestamp) == "18 days, 15:13:20" + + @pytest.mark.asyncio async def test_state_data_source_client_limit_distributed_sources(ray_start_cluster): cluster = ray_start_cluster