Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][state][no_early_kickoff] Add "humanify" feature to StateSchema #35059

Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
24158f2
add new module
ProjectsByJackHe Feb 27, 2023
b323ff8
create human readable class
ProjectsByJackHe Feb 27, 2023
6dd01e2
lint
ProjectsByJackHe Feb 27, 2023
135d804
add humanify()
ProjectsByJackHe Apr 3, 2023
310e0ab
remove human readable subclass
ProjectsByJackHe Apr 3, 2023
676d4e9
remove human readable subclass in state_cli
ProjectsByJackHe Apr 3, 2023
a2284e5
move humanify logic into base class
ProjectsByJackHe Apr 17, 2023
86f67ab
merge upstream
ProjectsByJackHe Apr 30, 2023
f8f82a5
resolve merge conflicts
ProjectsByJackHe Apr 30, 2023
b3c5488
lint state_cli
ProjectsByJackHe Apr 30, 2023
0b8c88e
add more default format functions and refactor state_column()
ProjectsByJackHe Apr 30, 2023
f7b25fb
run unit tests, add event humanifier
ProjectsByJackHe May 1, 2023
d21cba6
merge common.py
ProjectsByJackHe May 1, 2023
a05f225
revert buggy changes
ProjectsByJackHe May 1, 2023
e93dd82
add unit testing
ProjectsByJackHe May 3, 2023
226bde0
add memory function
ProjectsByJackHe May 4, 2023
049ea07
remove complex regex unit test
ProjectsByJackHe May 4, 2023
f810bbb
revert rename of local variable
ProjectsByJackHe May 4, 2023
21c3290
remove requiring all details
ProjectsByJackHe May 6, 2023
31246b9
add more default converter functions for node resource
ProjectsByJackHe May 6, 2023
75a06f6
add more default format_fn
ProjectsByJackHe May 6, 2023
1b678b4
remove is_valid_state
ProjectsByJackHe May 6, 2023
54cd9a5
Merge remote-tracking branch 'upstream/master' into jackhe/readable-s…
ProjectsByJackHe May 6, 2023
0c4b566
avoid unspecified behavior
ProjectsByJackHe May 6, 2023
3c5a2ee
remove dead code
ProjectsByJackHe May 6, 2023
8ae3ff9
ensure inputs are not none
ProjectsByJackHe May 7, 2023
92269b9
use format_fn=
ProjectsByJackHe May 8, 2023
7a185fc
lint
ProjectsByJackHe May 8, 2023
8b8eb35
remove unused code
ProjectsByJackHe May 8, 2023
b629102
remove unused code
ProjectsByJackHe May 8, 2023
f58d757
Merge branch 'jackhe/readable-state-api_schema-try3' of https://githu…
ProjectsByJackHe May 8, 2023
ff592dc
add try except, truncate memory
ProjectsByJackHe May 9, 2023
3ba5046
update unit tests, add non-null checks
ProjectsByJackHe May 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 104 additions & 16 deletions python/ray/experimental/state/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import json
import logging
import sys
Expand Down Expand Up @@ -75,6 +76,43 @@ class SummaryResource(Enum):
PredicateType = str # Literal["=", "!="]


class Humanify:
"""A class containing default methods to
convert units into a human readable string."""

def timestamp(x: float):
"""Converts miliseconds to a datetime object."""
return str(datetime.datetime.fromtimestamp(x / 1000))

def memory(x: int):
"""Converts raw bytes to a human readable memory size."""
if x >= 2**30:
ProjectsByJackHe marked this conversation as resolved.
Show resolved Hide resolved
return str(x / (2**30)) + " GiB"
elif x >= 2**20:
return str(x / (2**20)) + " MiB"
elif x >= 2**10:
return str(x / (2**10)) + " KiB"
return str(x) + " B"

def duration(x: int):
"""Converts miliseconds to a human readable duration."""
return str(datetime.timedelta(milliseconds=x))

def events(events: List[dict]):
"""Converts a list of task events into a human readable format."""
for event in events:
if "created_ms" in event:
event["created_ms"] = Humanify.timestamp(event["created_ms"])
return events

def node_resources(resources: dict):
"""Converts a node's resources into a human readable format."""
for resource in resources:
if "memory" in resource:
resources[resource] = Humanify.memory(resources[resource])
return resources


@dataclass(init=True)
class ListApiOptions:
# Maximum number of entries to return
Expand Down Expand Up @@ -144,7 +182,7 @@ class SummaryApiOptions:
summary_by: Optional[str] = None


def state_column(*, filterable: bool, detail: bool = False, **kwargs):
def state_column(*, filterable: bool, detail: bool = False, format_fn=None, **kwargs):
"""A wrapper around dataclass.field to add additional metadata.

The metadata is used to define detail / filterable option of
Expand All @@ -155,15 +193,16 @@ def state_column(*, filterable: bool, detail: bool = False, **kwargs):
filterable: If True, the column can be used for filtering.
kwargs: The same kwargs for the `dataclasses.field` function.
"""
m = {"detail": detail, "filterable": filterable}

m = {"detail": detail, "filterable": filterable, "format_fn": format_fn}
# Default for detail field is None since it could be missing.
if detail and "default" not in kwargs:
kwargs["default"] = None

if "metadata" in kwargs:
# Metadata explicitly specified, so add detail and filterable if missing.
kwargs["metadata"].update(m)
else:
# Metadata not explicitly specified, so add it.
kwargs["metadata"] = m
return field(**kwargs)

Expand Down Expand Up @@ -193,8 +232,29 @@ class State(StateSchema):
# Returns {"column_a", "column_b"}
s.columns()
```

In addition, the schema also provides a humanify abstract method to
convert the state object into something human readable, ready for printing.

Subclasses should override this method, providing logic to convert its own fields
to something human readable, packaged and returned in a dict.

Each field that wants to be humanified should include a 'format_fn' key in its
metadata dictionary.
"""

@classmethod
def humanify(cls, state: dict) -> dict:
"""Convert the given state object into something human readable."""
for f in fields(cls):
if (
f.metadata.get("format_fn") is not None
and f.name in state
and state[f.name] is not None
):
state[f.name] = f.metadata["format_fn"](state[f.name])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let' also add a try...except around this and logger errors instead of failing:

                try:
                    state[f.name] = f.metadata["format_fn"](state[f.name])
                except Exception as e:
                    logger.error(f"Failed to format {f.name}:{state[f.name]} with {e}")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i found out an issue which raised error due to some of the data not correct, I think let's do best effort here instead of disallowing users to see the outputs.

#35130

Copy link
Contributor Author

@ProjectsByJackHe ProjectsByJackHe May 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the try-except.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just confirming, the issue doesn't have anything to do with this PR? I tried running without the Humanify logic, but still get the incorrect data (year 584556019 for worker_launch_time)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i found out an issue which raised error due to some of the data not correct, I think let's do best effort here instead of disallowing users to see the outputs.

#35130

I am new to this;
By "best effort," what did you have in mind? If there is an error in the format_fn, we just leave the data as is. Where do we disallow users to see the outputs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let' also add a try...except around this and logger errors instead of failing:

                try:
                    state[f.name] = f.metadata["format_fn"](state[f.name])
                except Exception as e:
                    logger.error(f"Failed to format {f.name}:{state[f.name]} with {e}")

Also, added a null check to pass state_log unit test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just confirming, the issue doesn't have anything to do with this PR? I tried running without the Humanify logic, but still get the incorrect data (year 584556019 for worker_launch_time)

Yes, it's not caused by this PR.

return state

@classmethod
def list_columns(cls, detail: bool = True) -> List[str]:
"""Return a list of columns."""
Expand Down Expand Up @@ -428,13 +488,19 @@ class NodeState(StateSchema):
#: The name of the node if it is given by the name argument.
node_name: str = state_column(filterable=True)
#: The total resources of the node.
resources_total: dict = state_column(filterable=False)
resources_total: dict = state_column(
filterable=False, format_fn=Humanify.node_resources
)
#: The time when the node (raylet) starts.
start_time_ms: Optional[int] = state_column(filterable=False, detail=True)
start_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)
#: The time when the node exits. The timestamp could be delayed
#: if the node is dead unexpectedly (could be delayed
# up to 30 seconds).
end_time_ms: Optional[int] = state_column(filterable=False, detail=True)
end_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)


@dataclass(init=True)
Expand Down Expand Up @@ -493,17 +559,25 @@ class WorkerState(StateSchema):
#: -> worker_launched_time_ms (process started).
#: -> start_time_ms (worker is ready to be used).
#: -> end_time_ms (worker is destroyed).
worker_launch_time_ms: Optional[int] = state_column(filterable=False, detail=True)
worker_launch_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)
#: The time worker is succesfully launched
#: -1 if the value doesn't exist.
worker_launched_time_ms: Optional[int] = state_column(filterable=False, detail=True)
worker_launched_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)
#: The time when the worker is started and initialized.
#: 0 if the value doesn't exist.
start_time_ms: Optional[int] = state_column(filterable=False, detail=True)
start_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)
#: The time when the worker exits. The timestamp could be delayed
#: if the worker is dead unexpectedly.
#: 0 if the value doesn't exist.
end_time_ms: Optional[int] = state_column(filterable=False, detail=True)
end_time_ms: Optional[int] = state_column(
filterable=False, detail=True, format_fn=Humanify.timestamp
)


@dataclass(init=True)
Expand Down Expand Up @@ -571,15 +645,27 @@ class TaskState(StateSchema):
#: The list of events of the given task.
#: Refer to src/ray/protobuf/common.proto for a detailed explanation of the state
#: breakdowns and typical state transition flow.
events: Optional[List[dict]] = state_column(detail=True, filterable=False)
events: Optional[List[dict]] = state_column(
detail=True, filterable=False, format_fn=Humanify.events
)
#: The list of profile events of the given task.
profiling_data: Optional[dict] = state_column(detail=True, filterable=False)
#: The time when the task is created. A Unix timestamp in ms.
creation_time_ms: Optional[int] = state_column(detail=True, filterable=False)
creation_time_ms: Optional[int] = state_column(
detail=True,
filterable=False,
format_fn=Humanify.timestamp,
)
#: The time when the task starts to run. A Unix timestamp in ms.
start_time_ms: Optional[int] = state_column(detail=True, filterable=False)
start_time_ms: Optional[int] = state_column(
detail=True,
filterable=False,
format_fn=Humanify.timestamp,
)
#: The time when the task is finished or failed. A Unix timestamp in ms.
end_time_ms: Optional[int] = state_column(detail=True, filterable=False)
end_time_ms: Optional[int] = state_column(
detail=True, filterable=False, format_fn=Humanify.timestamp
)
#: The task logs info, e.g. offset into the worker log file when the task
#: starts/finishes.
task_log_info: Optional[dict] = state_column(detail=True, filterable=False)
Expand All @@ -594,7 +680,7 @@ class ObjectState(StateSchema):
#: The id of the object.
object_id: str = state_column(filterable=True)
#: The size of the object in mb.
object_size: int = state_column(filterable=True)
object_size: int = state_column(filterable=True, format_fn=Humanify.memory)
#: The status of the task that creates the object.
#:
#: - NIL: We don't have a status for this task because we are not the owner or the
Expand Down Expand Up @@ -653,7 +739,9 @@ class RuntimeEnvState(StateSchema):
success: bool = state_column(filterable=True)
#: The latency of creating the runtime environment.
#: Available if the runtime env is successfully created.
creation_time_ms: Optional[float] = state_column(filterable=False)
creation_time_ms: Optional[float] = state_column(
filterable=False, format_fn=Humanify.timestamp
)
#: The node id of this runtime environment.
node_id: str = state_column(filterable=True)
#: The number of actors and tasks that use this runtime environment.
Expand Down
5 changes: 5 additions & 0 deletions python/ray/experimental/state/state_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ def output_with_format(
format: AvailableFormat = AvailableFormat.DEFAULT,
detail: bool = False,
) -> str:
# humanify all input state data
state_data = [schema.humanify(state) for state in state_data]

if format == AvailableFormat.DEFAULT:
return get_table_output(state_data, schema, detail)
if format == AvailableFormat.YAML:
Expand Down Expand Up @@ -292,8 +295,10 @@ def format_get_api_output(
) -> str:
if not state_data or isinstance(state_data, list) and len(state_data) == 0:
return f"Resource with id={id} not found in the cluster."

if not isinstance(state_data, list):
state_data = [state_data]

state_data = [dataclasses.asdict(state) for state in state_data]
return output_with_format(state_data, schema=schema, format=format, detail=True)

Expand Down
13 changes: 13 additions & 0 deletions python/ray/tests/test_state_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from unittest.mock import MagicMock

import pytest
from ray.experimental.state.common import Humanify
from ray._private.gcs_utils import GcsAioClient
import yaml
from click.testing import CliRunner
Expand Down Expand Up @@ -1696,6 +1697,18 @@ def ready(self):
assert result.total == 6


def test_humanify():
raw_bytes = 1024
assert Humanify.memory(raw_bytes) == "1.0 KiB"
raw_bytes *= 1024
assert Humanify.memory(raw_bytes) == "1.0 MiB"
raw_bytes *= 1024
assert Humanify.memory(raw_bytes) == "1.0 GiB"
timestamp = 1610000000
assert "1970-01" in Humanify.timestamp(timestamp)
assert Humanify.duration(timestamp) == "18 days, 15:13:20"


@pytest.mark.asyncio
async def test_state_data_source_client_limit_distributed_sources(ray_start_cluster):
cluster = ray_start_cluster
Expand Down