diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 4e9119b0dc4c..3b6ff4090ced 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -320,6 +320,9 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: # Update the progress bar to reflect scheduling decisions. for op_state in topology.values(): op_state.refresh_progress_bar(self._resource_manager) + # Refresh the global progress bar to update elapsed time progress. + if self._global_info: + self._global_info.refresh() self._update_stats_metrics(state="RUNNING") if time.time() - self._last_debug_log_time >= DEBUG_LOG_INTERVAL_SECONDS: diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index fff7c48d3bd1..a5d8cc6276a6 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -254,6 +254,7 @@ def refresh_progress_bar(self, resource_manager: ResourceManager) -> None: """Update the console with the latest operator progress.""" if self.progress_bar: self.progress_bar.set_description(self.summary_str(resource_manager)) + self.progress_bar.refresh() def summary_str(self, resource_manager: ResourceManager) -> str: queued = self.num_queued() + self.op.internal_queue_size() diff --git a/python/ray/data/_internal/progress_bar.py b/python/ray/data/_internal/progress_bar.py index 55b7b969b2bb..3e2eb033e0a5 100644 --- a/python/ray/data/_internal/progress_bar.py +++ b/python/ray/data/_internal/progress_bar.py @@ -174,6 +174,10 @@ def set_description(self, name: str) -> None: self._desc = name self._bar.set_description(self._desc) + def refresh(self): + if self._bar: + self._bar.refresh() + def update(self, i: int = 0, total: Optional[int] = None) -> None: if self._bar and (i != 0 or self._bar.total != total): self._progress += i diff --git a/python/ray/experimental/tqdm_ray.py b/python/ray/experimental/tqdm_ray.py index 42c95ac60867..436cdcb5c8e3 100644 --- a/python/ray/experimental/tqdm_ray.py +++ b/python/ray/experimental/tqdm_ray.py @@ -197,6 +197,7 @@ def update(self, state: ProgressBarState) -> None: delta = state["x"] - self.state["x"] if delta: self.bar.update(delta) + self.bar.refresh() self.state = state def close(self):