Skip to content

Commit

Permalink
[State Observability] Warn if callsite is disabled when `ray list obj…
Browse files Browse the repository at this point in the history
…ects` + raise exception on missing output (#26880)

This PR does 3 things.
1. Warn if callsite is disabled when `ray list objects` and `ray summary objects`
2. Decode owner_id for ray list actors
3. Support raise_on_missing_output
  • Loading branch information
rkooo567 authored Jul 25, 2022
1 parent 1ac2a87 commit 15b711a
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 21 deletions.
33 changes: 29 additions & 4 deletions dashboard/state_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from itertools import islice
from typing import List, Tuple

from ray._private.ray_constants import env_integer

import ray.dashboard.memory_utils as memory_utils
import ray.dashboard.utils as dashboard_utils
from ray._private.utils import binary_to_hex
Expand Down Expand Up @@ -196,7 +198,9 @@ async def list_actors(self, *, option: ListApiOptions) -> ListApiResponse:

result = []
for message in reply.actor_table_data:
data = self._message_to_dict(message=message, fields_to_decode=["actor_id"])
data = self._message_to_dict(
message=message, fields_to_decode=["actor_id", "owner_id"]
)
result.append(data)

result = self._filter(result, option.filters, ActorState, option.detail)
Expand Down Expand Up @@ -456,6 +460,17 @@ async def list_objects(self, *, option: ListApiOptions) -> ListApiResponse:
del data["node_ip_address"]
result.append(data)

# Add callsite warnings if it is not configured.
callsite_warning = []
callsite_enabled = env_integer("RAY_record_ref_creation_sites", 0)
if not callsite_enabled:
callsite_warning.append(
"Callsite is not being recorded. "
"To record callsite information for each ObjectRef created, set "
"env variable RAY_record_ref_creation_sites=1 during `ray start` "
"and `ray.init`."
)

result = self._filter(result, option.filters, ObjectState, option.detail)
# Sort to make the output deterministic.
result.sort(key=lambda entry: entry["object_id"])
Expand All @@ -464,6 +479,7 @@ async def list_objects(self, *, option: ListApiOptions) -> ListApiResponse:
result=result,
partial_failure_warning=partial_failure_warning,
total=total_objects,
warnings=callsite_warning,
)

async def list_runtime_envs(self, *, option: ListApiOptions) -> ListApiResponse:
Expand Down Expand Up @@ -553,7 +569,10 @@ async def summarize_tasks(self, option: SummaryApiOptions) -> SummaryApiResponse
}
)
return SummaryApiResponse(
result=summary, partial_failure_warning=result.partial_failure_warning
total=result.total,
result=summary,
partial_failure_warning=result.partial_failure_warning,
warnings=result.warnings,
)

async def summarize_actors(self, option: SummaryApiOptions) -> SummaryApiResponse:
Expand All @@ -567,7 +586,10 @@ async def summarize_actors(self, option: SummaryApiOptions) -> SummaryApiRespons
}
)
return SummaryApiResponse(
result=summary, partial_failure_warning=result.partial_failure_warning
total=result.total,
result=summary,
partial_failure_warning=result.partial_failure_warning,
warnings=result.warnings,
)

async def summarize_objects(self, option: SummaryApiOptions) -> SummaryApiResponse:
Expand All @@ -581,7 +603,10 @@ async def summarize_objects(self, option: SummaryApiOptions) -> SummaryApiRespon
}
)
return SummaryApiResponse(
result=summary, partial_failure_warning=result.partial_failure_warning
total=result.total,
result=summary,
partial_failure_warning=result.partial_failure_warning,
warnings=result.warnings,
)

def _message_to_dict(
Expand Down
4 changes: 3 additions & 1 deletion dashboard/tests/test_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -961,7 +961,9 @@ def test_dashboard_requests_fail_on_missing_deps(ray_start_with_dashboard):

with pytest.raises(ServerUnavailable):
client = StateApiClient(address=DEFAULT_DASHBOARD_ADDRESS)
response = client.list(StateResource.NODES, options=ListApiOptions())
response = client.list(
StateResource.NODES, options=ListApiOptions(), raise_on_missing_output=False
)

# Response should not be populated
assert response is None
Expand Down
Loading

0 comments on commit 15b711a

Please sign in to comment.