From 5cd9a15907ac95a0841f33073cb6b32121e569c7 Mon Sep 17 00:00:00 2001 From: Scott Lee Date: Wed, 7 Aug 2024 11:28:52 -0700 Subject: [PATCH] [Data] Correctly refresh elapsed time in `ProgressBar` output (#46974) When using Ray Data with a slow UDF for map operators, the progress bar's "elapsed time" item is only updated each time a task finishes. If the task is slow, this means that the elapsed time is left "hanging" and appears to be stuck, when this is not necessarily the case. This PR fixes the refreshing of the progress bar, so that the elapsed time ticks up as expected. For the following example code, we can see the change in behavior in the ticking of the progress bar: ``` import time import ray def f(batch): time.sleep(10) return batch ds = ray.data.range(50, override_num_blocks=50).map(f) ds.materialize() ``` - Before (ticks every 10 seconds): https://github.com/user-attachments/assets/d46f0d6f-dacc-4148-a12c-001fcf44f008 - After (ticks every 1 second): https://github.com/user-attachments/assets/88807215-9e81-434f-b186-b5d597bc3c59 --------- Signed-off-by: Scott Lee --- python/ray/data/_internal/execution/streaming_executor.py | 3 +++ .../ray/data/_internal/execution/streaming_executor_state.py | 1 + python/ray/data/_internal/progress_bar.py | 4 ++++ python/ray/experimental/tqdm_ray.py | 1 + 4 files changed, 9 insertions(+) diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 4e9119b0dc4c..3b6ff4090ced 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -320,6 +320,9 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: # Update the progress bar to reflect scheduling decisions. for op_state in topology.values(): op_state.refresh_progress_bar(self._resource_manager) + # Refresh the global progress bar to update elapsed time progress. + if self._global_info: + self._global_info.refresh() self._update_stats_metrics(state="RUNNING") if time.time() - self._last_debug_log_time >= DEBUG_LOG_INTERVAL_SECONDS: diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index fff7c48d3bd1..a5d8cc6276a6 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -254,6 +254,7 @@ def refresh_progress_bar(self, resource_manager: ResourceManager) -> None: """Update the console with the latest operator progress.""" if self.progress_bar: self.progress_bar.set_description(self.summary_str(resource_manager)) + self.progress_bar.refresh() def summary_str(self, resource_manager: ResourceManager) -> str: queued = self.num_queued() + self.op.internal_queue_size() diff --git a/python/ray/data/_internal/progress_bar.py b/python/ray/data/_internal/progress_bar.py index 55b7b969b2bb..3e2eb033e0a5 100644 --- a/python/ray/data/_internal/progress_bar.py +++ b/python/ray/data/_internal/progress_bar.py @@ -174,6 +174,10 @@ def set_description(self, name: str) -> None: self._desc = name self._bar.set_description(self._desc) + def refresh(self): + if self._bar: + self._bar.refresh() + def update(self, i: int = 0, total: Optional[int] = None) -> None: if self._bar and (i != 0 or self._bar.total != total): self._progress += i diff --git a/python/ray/experimental/tqdm_ray.py b/python/ray/experimental/tqdm_ray.py index 42c95ac60867..436cdcb5c8e3 100644 --- a/python/ray/experimental/tqdm_ray.py +++ b/python/ray/experimental/tqdm_ray.py @@ -197,6 +197,7 @@ def update(self, state: ProgressBarState) -> None: delta = state["x"] - self.state["x"] if delta: self.bar.update(delta) + self.bar.refresh() self.state = state def close(self):