Skip to content

Commit

Permalink
[Data] Correctly refresh elapsed time in ProgressBar output (#46974)
Browse files Browse the repository at this point in the history
When using Ray Data with a slow UDF for map operators, the progress
bar's "elapsed time" item is only updated each time a task finishes. If
the task is slow, this means that the elapsed time is left "hanging" and
appears to be stuck, when this is not necessarily the case.

This PR fixes the refreshing of the progress bar, so that the elapsed
time ticks up as expected. For the following example code, we can see
the change in behavior in the ticking of the progress bar:
```
import time
import ray

def f(batch):
    time.sleep(10)
    return batch

ds = ray.data.range(50, override_num_blocks=50).map(f)
ds.materialize()
```

- Before (ticks every 10 seconds):

https://github.com/user-attachments/assets/d46f0d6f-dacc-4148-a12c-001fcf44f008


- After (ticks every 1 second):

https://github.com/user-attachments/assets/88807215-9e81-434f-b186-b5d597bc3c59

---------

Signed-off-by: Scott Lee <[email protected]>
  • Loading branch information
scottjlee authored Aug 7, 2024
1 parent 9844797 commit 5cd9a15
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 0 deletions.
3 changes: 3 additions & 0 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
# Update the progress bar to reflect scheduling decisions.
for op_state in topology.values():
op_state.refresh_progress_bar(self._resource_manager)
# Refresh the global progress bar to update elapsed time progress.
if self._global_info:
self._global_info.refresh()

self._update_stats_metrics(state="RUNNING")
if time.time() - self._last_debug_log_time >= DEBUG_LOG_INTERVAL_SECONDS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ def refresh_progress_bar(self, resource_manager: ResourceManager) -> None:
"""Update the console with the latest operator progress."""
if self.progress_bar:
self.progress_bar.set_description(self.summary_str(resource_manager))
self.progress_bar.refresh()

def summary_str(self, resource_manager: ResourceManager) -> str:
queued = self.num_queued() + self.op.internal_queue_size()
Expand Down
4 changes: 4 additions & 0 deletions python/ray/data/_internal/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,10 @@ def set_description(self, name: str) -> None:
self._desc = name
self._bar.set_description(self._desc)

def refresh(self):
if self._bar:
self._bar.refresh()

def update(self, i: int = 0, total: Optional[int] = None) -> None:
if self._bar and (i != 0 or self._bar.total != total):
self._progress += i
Expand Down
1 change: 1 addition & 0 deletions python/ray/experimental/tqdm_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ def update(self, state: ProgressBarState) -> None:
delta = state["x"] - self.state["x"]
if delta:
self.bar.update(delta)
self.bar.refresh()
self.state = state

def close(self):
Expand Down

0 comments on commit 5cd9a15

Please sign in to comment.