Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Simplify and consolidate progress bar outputs #47692

Merged
merged 14 commits into from
Sep 23, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,20 @@ def num_active_tasks(self) -> int:
"""
return len(self.get_active_tasks())

def num_active_actors(self) -> int:
"""Return the number of active actors.

This method is used to display active actor info in the progress bar.
"""
return 0

def num_pending_actors(self) -> int:
"""Return the number of pending actors.

This method is used to display pending actor info in the progress bar.
"""
return 0

def throttling_disabled(self) -> bool:
"""Whether to disable resource throttling for this operator.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
)
from ray.data._internal.execution.operators.map_operator import MapOperator, _map_task
from ray.data._internal.execution.operators.map_transformer import MapTransformer
from ray.data._internal.execution.util import locality_string
from ray.data._internal.remote_fn import _add_system_error_to_retry_exceptions
from ray.data.block import Block, BlockMetadata
from ray.data.context import DataContext
Expand Down Expand Up @@ -270,17 +269,7 @@ def shutdown(self):
)

def progress_str(self) -> str:
base = f"{self._actor_pool.num_running_actors()} actors"
pending = self._actor_pool.num_pending_actors()
if pending:
base += f" ({pending} pending)"
if self._actor_locality_enabled:
base += " " + locality_string(
self._actor_pool._locality_hits, self._actor_pool._locality_misses
)
else:
base += " [locality off]"
return base
return ""

def base_resource_usage(self) -> ExecutionResources:
min_workers = self._actor_pool.min_size()
Expand All @@ -304,6 +293,20 @@ def pending_processor_usage(self) -> ExecutionResources:
gpu=self._ray_remote_args.get("num_gpus", 0) * num_pending_workers,
)

def num_active_actors(self) -> int:
"""Return the number of active actors.

This method is used to display active actor info in the progress bar.
"""
return self._actor_pool.num_running_actors()

def num_pending_actors(self) -> int:
"""Return the number of pending actors.

This method is used to display pending actor info in the progress bar.
"""
return self._actor_pool.num_pending_actors()

def incremental_resource_usage(self) -> ExecutionResources:
# Submitting tasks to existing actors doesn't require additional
# CPU/GPU resources.
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,11 @@ def get_op_usage(self, op: PhysicalOperator) -> ExecutionResources:
def get_op_usage_str(self, op: PhysicalOperator) -> str:
"""Return a human-readable string representation of the resource usage of
the given operator."""
usage_str = f"cpu: {self._op_running_usages[op].cpu:.1f}"
usage_str = f"{self._op_running_usages[op].cpu:.1f} CPU"
if self._op_running_usages[op].gpu:
usage_str += f", gpu: {self._op_running_usages[op].gpu:.1f}"
usage_str += f", {self._op_running_usages[op].gpu:.1f} GPU"
usage_str += (
f", objects: {self._op_running_usages[op].object_store_memory_str()}"
f", {self._op_running_usages[op].object_store_memory_str()} object store"
)
if self._debug:
usage_str += (
Expand All @@ -231,7 +231,7 @@ def get_op_usage_str(self, op: PhysicalOperator) -> str:
budget = self._op_resource_allocator._op_budgets[op]
usage_str += f", budget=(cpu={budget.cpu:.1f}"
usage_str += f",gpu={budget.gpu:.1f}"
usage_str += f",objects={budget.object_store_memory_str()})"
usage_str += f",object store={budget.object_store_memory_str()})"
return usage_str

def get_downstream_fraction(self, op: PhysicalOperator) -> float:
Expand Down
29 changes: 21 additions & 8 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,15 +363,28 @@ def _report_current_usage(self) -> None:
pending_usage = self._resource_manager.get_global_pending_usage()
limits = self._resource_manager.get_global_limits()
resources_status = (
"Active & requested resources: "
f"{running_usage.cpu:.4g} of {limits.cpu:.4g} available CPU, "
f"{running_usage.gpu:.4g} of {limits.gpu:.4g} available GPU, "
f"{running_usage.object_store_memory_str()} of "
f"{limits.object_store_memory_str()} available object_store_memory "
"(pending: "
f"{pending_usage.cpu:.4g} CPU, "
f"{pending_usage.gpu:.4g} GPU)"
# TODO(scottjlee): Add dataset name/ID to progress bar output.
"Running Dataset. Active & requested resources: "
f"{running_usage.cpu:.4g}/{limits.cpu:.4g} CPU, "
)
if running_usage.gpu > 0:
resources_status += f"{running_usage.gpu:.4g}/{limits.gpu:.4g} GPU, "
resources_status += (
f"{running_usage.object_store_memory_str()}/"
f"{limits.object_store_memory_str()} object store"
)

# Only include pending section when there are pending resources.
if pending_usage.cpu or pending_usage.gpu:
if pending_usage.cpu and pending_usage.gpu:
pending_str = (
f"{pending_usage.cpu:.4g} CPU, {pending_usage.gpu:.4g} GPU"
)
elif pending_usage.cpu:
pending_str = f"{pending_usage.cpu:.4g} CPU"
else:
pending_str = f"{pending_usage.gpu:.4g} GPU"
resources_status += f" (pending: {pending_str})"
if self._global_info:
self._global_info.set_description(resources_status)

Expand Down
37 changes: 32 additions & 5 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
OpTask,
Waitable,
)
from ray.data._internal.execution.operators.actor_pool_map_operator import (
ActorPoolMapOperator,
)
from ray.data._internal.execution.operators.base_physical_operator import (
AllToAllOperator,
)
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.execution.resource_manager import ResourceManager
from ray.data._internal.execution.util import locality_string
from ray.data._internal.progress_bar import ProgressBar
from ray.data.context import DataContext

Expand Down Expand Up @@ -257,18 +261,41 @@ def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
self.progress_bar.refresh()

def summary_str(self, resource_manager: ResourceManager) -> str:
queued = self.num_queued() + self.op.internal_queue_size()
# Active tasks
active = self.op.num_active_tasks()
desc = f"- {self.op.name}: {active} active, {queued} queued"
desc = f"- {self.op.name}: Tasks: {active}"
if (
self.op._in_task_submission_backpressure
or self.op._in_task_output_backpressure
):
desc += " 🚧"
desc += f", [{resource_manager.get_op_usage_str(self.op)}]"
desc += " [backpressured]"

# Active/pending actors
active = self.op.num_active_actors()
pending = self.op.num_pending_actors()
if active or pending:
assert isinstance(self.op, ActorPoolMapOperator)
actor_str = f"; Actors: {active}"
if pending > 0:
actor_str += f", (pending: {pending})"
if self.op._actor_locality_enabled:
actor_str += " " + locality_string(
self._actor_pool._locality_hits, self._actor_pool._locality_misses
Copy link
Contributor Author

@scottjlee scottjlee Sep 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW @raulchen i still had to leave some ActorPoolMapOperator specific logic, in order to add the locality string (checking the attribute if self.op._actor_locality_enabled which only exists for ActorPoolMapOperator). but we know that if active or pending is true, this must be an operator with actors, so i just use assert isinstance(self.op, ActorPoolMapOperator) check to satisfy type checking. let me know if you prefer another approach

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I think we can still keep the locality info at the end (still reported by progress_str), because it's operator-specific

)
else:
actor_str += " [locality off]"
desc += actor_str

# Queued blocks
queued = self.num_queued() + self.op.internal_queue_size()
desc += f"; Queued blocks: {queued}"
desc += f"; Resources: {resource_manager.get_op_usage_str(self.op)}"

# Any additional operator specific information.
suffix = self.op.progress_str()
if suffix:
desc += f", {suffix}"
desc += f"; {suffix}"

return desc

def dispatch_next_task(self) -> None:
Expand Down