Skip to content

Commit

Permalink
[data] Only dump state once on progress bar close (ray-project#46928)
Browse files Browse the repository at this point in the history
Close ray-project#44979. This also
addresses an issue where the bars were misordered in jupyter notebooks
(the overall dataset bar showed up last).

---------

Signed-off-by: Matthew Owen <[email protected]>
Signed-off-by: Dev <[email protected]>
  • Loading branch information
omatthew98 authored and dev-goyal committed Aug 8, 2024
1 parent 9a5b922 commit 5521609
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
18 changes: 10 additions & 8 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ def execute(

logger.debug("Execution config: %s", self._options)

# Note: DAG must be initialized in order to query num_outputs_total.
# Note: Initialize global progress bar before building the streaming
# topology so bars are created in the same order as they should be
# displayed. This is done to ensure correct ordering within notebooks.
# TODO(zhilong): Implement num_output_rows_total for all
# AllToAllOperators
self._global_info = ProgressBar(
"Running", dag.num_output_rows_total(), unit="row"
)

# Setup the streaming DAG topology and start the runner thread.
self._topology, _ = build_streaming_topology(dag, self._options)
self._resource_manager = ResourceManager(
Expand All @@ -126,14 +136,6 @@ def execute(

self._has_op_completed = {op: False for op in self._topology}

if not isinstance(dag, InputDataBuffer):
# Note: DAG must be initialized in order to query num_outputs_total.
# TODO(zhilong): Implement num_output_rows_total for all
# AllToAllOperators
self._global_info = ProgressBar(
"Running", dag.num_output_rows_total(), unit="row"
)

self._output_node: OpState = self._topology[dag]
StatsManager.register_dataset_to_stats_actor(
self._dataset_tag,
Expand Down
4 changes: 4 additions & 0 deletions python/ray/experimental/tqdm_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,12 @@ def update_bar(self, state: ProgressBarState) -> None:
def close_bar(self, state: ProgressBarState) -> None:
"""Remove a bar from this group."""
bar = self.bars_by_uuid[state["uuid"]]
# Note: Hide and then unhide bars to prevent flashing of the
# last bar when we are closing multiple bars sequentially.
instance().hide_bars()
bar.close()
del self.bars_by_uuid[state["uuid"]]
instance().unhide_bars()

def slots_required(self):
"""Return the number of pos slots we need to accomodate bars in this group."""
Expand Down

0 comments on commit 5521609

Please sign in to comment.