Skip to content

Commit

Permalink
[Data] Allow unknown estimate of operator output bundles and `Progres…
Browse files Browse the repository at this point in the history
…sBar` totals (#46601)

Ray Data initially assumes that each read task produces exactly one
block. Furthermore, one-to-one operators assume that the number of
output blocks is the same as its upstream operator. Neither of these
assumptions are always guaranteed to be accurate, which results in
inaccurate progress bar estimations and can cause confusion. This PR
updates `PhysicalOperator.num_outputs_total()` to allow for unknown
estimated number of output bundles, which is the case when no tasks have
finished, so it is not possible to provide a reasonable estimate.

For example, given the following reproducible script:
```
import time
import numpy as np
import ray
ray.init(num_cpus=1)

target_block_size = ray.data.DataContext.get_current().target_max_block_size

def sleep(batch):
    for _ in range(100):
        time.sleep(0.1)
        yield {"batch": np.zeros((target_block_size,), dtype=np.uint8)}

ray.data.range(10, override_num_blocks=10).map_batches(
    sleep, batch_size=None
).materialize()
```

We can compare the behavior before and after this PR (video links):
-
[Before](https://drive.google.com/file/d/1GiWcw2AqyINXFRpbiVIperWTKNV8EcOB/view?usp=sharing)
-
[After](https://drive.google.com/file/d/1jx1oMMuyqBNQLUmMpiJmqMRV8aMOWkOL/view?usp=sharing)

---------

Signed-off-by: Scott Lee <[email protected]>
  • Loading branch information
scottjlee authored Jul 17, 2024
1 parent 593d04a commit abbc6f7
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,18 @@ def progress_str(self) -> str:
"""
return ""

def num_outputs_total(self) -> int:
"""Returns the total number of output bundles of this operator.
def num_outputs_total(self) -> Optional[int]:
"""Returns the total number of output bundles of this operator,
or ``None`` if unable to provide a reasonable estimate (for example,
if no tasks have finished yet).
The value returned may be an estimate based off the consumption so far.
This is useful for reporting progress.
Subclasses should either override this method, or update
``self._estimated_num_output_bundles`` appropriately.
"""
if self._estimated_num_output_bundles is not None:
return self._estimated_num_output_bundles
if len(self.input_dependencies) == 1:
return self.input_dependencies[0].num_outputs_total()
raise AttributeError
return self._estimated_num_output_bundles

def start(self, options: ExecutionOptions) -> None:
"""Called by the executor when execution starts for an operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(
self._stats: StatsDict = {}
super().__init__(name, [input_op], target_max_block_size)

def num_outputs_total(self) -> int:
def num_outputs_total(self) -> Optional[int]:
return (
self._num_outputs
if self._num_outputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
num_output_blocks: The number of output blocks. If not specified, progress
bars total will be set based on num output bundles instead.
"""
super().__init__("Input", [], target_max_block_size=None)
if input_data is not None:
assert input_data_factory is None
# Copy the input data to avoid mutating the original list.
Expand All @@ -41,7 +42,6 @@ def __init__(
self._input_data_factory = input_data_factory
self._is_input_initialized = False
self._input_data_index = 0
super().__init__("Input", [], target_max_block_size=None)

def start(self, options: ExecutionOptions) -> None:
if not self._is_input_initialized:
Expand All @@ -66,9 +66,6 @@ def _get_next_inner(self) -> RefBundle:
self._input_data_index += 1
return bundle

def num_outputs_total(self) -> int:
return self._num_output_bundles

def get_stats(self) -> StatsDict:
return {}

Expand All @@ -77,8 +74,8 @@ def _add_input_inner(self, refs, input_index) -> None:

def _initialize_metadata(self):
assert self._input_data is not None and self._is_input_initialized
self._estimated_num_output_bundles = len(self._input_data)

self._num_output_bundles = len(self._input_data)
block_metadata = []
for bundle in self._input_data:
block_metadata.extend(bundle.metadata)
Expand Down
16 changes: 7 additions & 9 deletions python/ray/data/_internal/execution/operators/limit_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import copy
from collections import deque
from typing import Deque, List, Tuple
from typing import Deque, List, Optional, Tuple

import ray
from ray.data._internal.execution.interfaces import PhysicalOperator, RefBundle
Expand Down Expand Up @@ -81,11 +81,12 @@ def slice_fn(block, metadata, num_rows) -> Tuple[Block, BlockMetadata]:
if self._limit_reached():
self.mark_execution_completed()

# We cannot estimate if we have only consumed empty blocks
if self._consumed_rows > 0:
# We cannot estimate if we have only consumed empty blocks,
# or if the input dependency's total number of output bundles is unknown.
num_inputs = self.input_dependencies[0].num_outputs_total()
if self._consumed_rows > 0 and num_inputs is not None:
# Estimate number of output bundles
# Check the case where _limit > # of input rows
num_inputs = self.input_dependencies[0].num_outputs_total()
estimated_total_output_rows = min(
self._limit, self._consumed_rows / self._cur_output_bundles * num_inputs
)
Expand All @@ -108,15 +109,12 @@ def _get_next_inner(self) -> RefBundle:
def get_stats(self) -> StatsDict:
return {self._name: self._output_metadata}

def num_outputs_total(self) -> int:
def num_outputs_total(self) -> Optional[int]:
# Before execution is completed, we don't know how many output
# bundles we will have. We estimate based off the consumption so far.
if self._execution_completed:
return self._cur_output_bundles
elif self._estimated_num_output_bundles is not None:
return self._estimated_num_output_bundles
else:
return self.input_dependencies[0].num_outputs_total()
return self._estimated_num_output_bundles

def throttling_disabled(self) -> bool:
return True
Expand Down
22 changes: 12 additions & 10 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,18 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]):
self._metrics.on_task_finished(task_index, exception)

# Estimate number of tasks from inputs received and tasks submitted so far
estimated_num_tasks = (
self.input_dependencies[0].num_outputs_total()
/ self._metrics.num_inputs_received
* self._next_data_task_idx
)
self._estimated_num_output_bundles = round(
estimated_num_tasks
* self._metrics.num_outputs_of_finished_tasks
/ self._metrics.num_tasks_finished
)
upstream_op_num_outputs = self.input_dependencies[0].num_outputs_total()
if upstream_op_num_outputs:
estimated_num_tasks = (
upstream_op_num_outputs
/ self._metrics.num_inputs_received
* self._next_data_task_idx
)
self._estimated_num_output_bundles = round(
estimated_num_tasks
* self._metrics.num_outputs_of_finished_tasks
/ self._metrics.num_tasks_finished
)

self._data_tasks.pop(task_index)
# Notify output queue that this task is complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def __init__(
self._locality_hits = 0
self._locality_misses = 0

def num_outputs_total(self) -> Optional[int]:
# OutputSplitter does not change the number of blocks,
# so we can return the number of blocks from the input op.
return self.input_dependencies[0].num_outputs_total()

def start(self, options: ExecutionOptions) -> None:
super().start(options)
# Force disable locality optimization.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional

from ray.data._internal.execution.interfaces import (
ExecutionOptions,
Expand Down Expand Up @@ -46,10 +46,13 @@ def start(self, options: ExecutionOptions):
self._preserve_order = options.preserve_order
super().start(options)

def num_outputs_total(self) -> int:
def num_outputs_total(self) -> Optional[int]:
num_outputs = 0
for input_op in self.input_dependencies:
num_outputs += input_op.num_outputs_total()
input_num_outputs = input_op.num_outputs_total()
if input_num_outputs is None:
return None
num_outputs += input_num_outputs
return num_outputs

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/_internal/execution/operators/zip_operator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import itertools
from typing import List, Tuple
from typing import List, Optional, Tuple

import ray
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
Expand Down Expand Up @@ -43,7 +43,7 @@ def __init__(
"Zip", [left_input_op, right_input_op], target_max_block_size=None
)

def num_outputs_total(self) -> int:
def num_outputs_total(self) -> Optional[int]:
left_num_outputs = self.input_dependencies[0].num_outputs_total()
right_num_outputs = self.input_dependencies[1].num_outputs_total()
if left_num_outputs is not None and right_num_outputs is not None:
Expand Down
12 changes: 10 additions & 2 deletions python/ray/data/_internal/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,26 @@ def set_progress_bars(enabled: bool) -> bool:


class ProgressBar:
"""Thin wrapper around tqdm to handle soft imports."""
"""Thin wrapper around tqdm to handle soft imports.
If `total` is `None` known (for example, it is unknown
because no tasks have finished yet), doesn't display the full
progress bar. Still displays basic progress stats from tqdm."""

def __init__(
self,
name: str,
total: int,
total: Optional[int],
unit: str,
position: int = 0,
enabled: Optional[bool] = None,
):
self._desc = name
self._progress = 0
# Prepend a space to the unit for better formatting.
if unit[0] != " ":
unit = " " + unit

if enabled is None:
from ray.data import DataContext

Expand Down
23 changes: 21 additions & 2 deletions python/ray/data/tests/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,18 @@ def dummy_all_transform(bundles: List[RefBundle], ctx):


def test_num_outputs_total():
# The number of outputs is always known for InputDataBuffer.
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)]))
assert input_op.num_outputs_total() == 100

# Prior to execution, the number of outputs is unknown
# for Map/AllToAllOperator operators.
op1 = MapOperator.create(
_mul2_map_data_prcessor,
input_op=input_op,
name="TestMapper",
)
assert op1.num_outputs_total() == 100
assert op1.num_outputs_total() is None

def dummy_all_transform(bundles: List[RefBundle]):
return make_ref_bundles([[1, 2], [3, 4]]), {"FooStats": []}
Expand All @@ -131,7 +136,21 @@ def dummy_all_transform(bundles: List[RefBundle]):
target_max_block_size=DataContext.get_current().target_max_block_size,
name="TestAll",
)
assert op2.num_outputs_total() == 100
assert op2.num_outputs_total() is None

# Feed data and implement streaming exec.
output = []
op1.start(ExecutionOptions(actor_locality_enabled=True))
while input_op.has_next():
op1.add_input(input_op.get_next(), 0)
while not op1.has_next():
run_one_op_task(op1)
while op1.has_next():
ref = op1.get_next()
assert ref.owns_blocks, ref
_get_blocks(ref, output)
# After op finishes, num_outputs_total is known.
assert op1.num_outputs_total() == 100


@pytest.mark.parametrize("use_actors", [False, True])
Expand Down

0 comments on commit abbc6f7

Please sign in to comment.