Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Allow unknown estimate of operator output bundles and ProgressBar totals #46601

Merged
merged 11 commits into from
Jul 17, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,18 @@ def progress_str(self) -> str:
"""
return ""

def num_outputs_total(self) -> int:
"""Returns the total number of output bundles of this operator.
def num_outputs_total(self) -> Optional[int]:
"""Returns the total number of output bundles of this operator,
or ``None`` if unable to provide a reasonable estimate (for example,
if no tasks have finished yet).

The value returned may be an estimate based off the consumption so far.
This is useful for reporting progress.

Subclasses should either override this method, or update
``self._estimated_num_output_bundles`` appropriately.
"""
if self._estimated_num_output_bundles is not None:
return self._estimated_num_output_bundles
if len(self.input_dependencies) == 1:
return self.input_dependencies[0].num_outputs_total()
raise AttributeError
return self._estimated_num_output_bundles
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment that subclasses should either override this method or update _estimated_num_output_bundles?


def start(self, options: ExecutionOptions) -> None:
"""Called by the executor when execution starts for an operator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(
self._stats: StatsDict = {}
super().__init__(name, [input_op], target_max_block_size)

def num_outputs_total(self) -> int:
def num_outputs_total(self) -> Optional[int]:
return (
self._num_outputs
if self._num_outputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
num_output_blocks: The number of output blocks. If not specified, progress
bars total will be set based on num output bundles instead.
"""
super().__init__("Input", [], target_max_block_size=None)
if input_data is not None:
assert input_data_factory is None
# Copy the input data to avoid mutating the original list.
Expand All @@ -41,7 +42,6 @@ def __init__(
self._input_data_factory = input_data_factory
self._is_input_initialized = False
self._input_data_index = 0
super().__init__("Input", [], target_max_block_size=None)

def start(self, options: ExecutionOptions) -> None:
if not self._is_input_initialized:
Expand All @@ -66,9 +66,6 @@ def _get_next_inner(self) -> RefBundle:
self._input_data_index += 1
return bundle

def num_outputs_total(self) -> int:
return self._num_output_bundles

def get_stats(self) -> StatsDict:
return {}

Expand All @@ -77,8 +74,8 @@ def _add_input_inner(self, refs, input_index) -> None:

def _initialize_metadata(self):
assert self._input_data is not None and self._is_input_initialized
self._estimated_num_output_bundles = len(self._input_data)

self._num_output_bundles = len(self._input_data)
block_metadata = []
for bundle in self._input_data:
block_metadata.extend(bundle.metadata)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import copy
from collections import deque
from typing import Deque, List, Tuple
from typing import Deque, List, Optional, Tuple

import ray
from ray.data._internal.execution.interfaces import PhysicalOperator, RefBundle
Expand Down Expand Up @@ -81,11 +81,12 @@ def slice_fn(block, metadata, num_rows) -> Tuple[Block, BlockMetadata]:
if self._limit_reached():
self.mark_execution_completed()

# We cannot estimate if we have only consumed empty blocks
if self._consumed_rows > 0:
# We cannot estimate if we have only consumed empty blocks,
# or if the input dependency's total number of output bundles is unknown.
num_inputs = self.input_dependencies[0].num_outputs_total()
if self._consumed_rows > 0 and num_inputs is not None:
# Estimate number of output bundles
# Check the case where _limit > # of input rows
num_inputs = self.input_dependencies[0].num_outputs_total()
estimated_total_output_rows = min(
self._limit, self._consumed_rows / self._cur_output_bundles * num_inputs
)
Expand All @@ -108,15 +109,12 @@ def _get_next_inner(self) -> RefBundle:
def get_stats(self) -> StatsDict:
return {self._name: self._output_metadata}

def num_outputs_total(self) -> int:
def num_outputs_total(self) -> Optional[int]:
# Before execution is completed, we don't know how many output
# bundles we will have. We estimate based off the consumption so far.
if self._execution_completed:
return self._cur_output_bundles
elif self._estimated_num_output_bundles is not None:
return self._estimated_num_output_bundles
else:
return self.input_dependencies[0].num_outputs_total()
return self._estimated_num_output_bundles

def throttling_disabled(self) -> bool:
return True
Expand Down
22 changes: 12 additions & 10 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,18 @@ def _task_done_callback(task_index: int, exception: Optional[Exception]):
self._metrics.on_task_finished(task_index, exception)

# Estimate number of tasks from inputs received and tasks submitted so far
estimated_num_tasks = (
self.input_dependencies[0].num_outputs_total()
/ self._metrics.num_inputs_received
* self._next_data_task_idx
)
self._estimated_num_output_bundles = round(
estimated_num_tasks
* self._metrics.num_outputs_of_finished_tasks
/ self._metrics.num_tasks_finished
)
upstream_op_num_outputs = self.input_dependencies[0].num_outputs_total()
if upstream_op_num_outputs:
estimated_num_tasks = (
upstream_op_num_outputs
/ self._metrics.num_inputs_received
* self._next_data_task_idx
)
self._estimated_num_output_bundles = round(
estimated_num_tasks
* self._metrics.num_outputs_of_finished_tasks
/ self._metrics.num_tasks_finished
)

self._data_tasks.pop(task_index)
# Notify output queue that this task is complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ def __init__(
self._locality_hits = 0
self._locality_misses = 0

def num_outputs_total(self) -> Optional[int]:
# OutputSplitter does not change the number of blocks,
# so we can return the number of blocks from the input op.
return self.input_dependencies[0].num_outputs_total()

def start(self, options: ExecutionOptions) -> None:
super().start(options)
# Force disable locality optimization.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import List, Optional

from ray.data._internal.execution.interfaces import (
ExecutionOptions,
Expand Down Expand Up @@ -46,10 +46,13 @@ def start(self, options: ExecutionOptions):
self._preserve_order = options.preserve_order
super().start(options)

def num_outputs_total(self) -> int:
def num_outputs_total(self) -> Optional[int]:
num_outputs = 0
for input_op in self.input_dependencies:
num_outputs += input_op.num_outputs_total()
input_num_outputs = input_op.num_outputs_total()
if input_num_outputs is None:
return None
num_outputs += input_num_outputs
return num_outputs

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import itertools
from typing import List, Tuple
from typing import List, Optional, Tuple

import ray
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
Expand Down Expand Up @@ -43,7 +43,7 @@ def __init__(
"Zip", [left_input_op, right_input_op], target_max_block_size=None
)

def num_outputs_total(self) -> int:
def num_outputs_total(self) -> Optional[int]:
left_num_outputs = self.input_dependencies[0].num_outputs_total()
right_num_outputs = self.input_dependencies[1].num_outputs_total()
if left_num_outputs is not None and right_num_outputs is not None:
Expand Down
12 changes: 10 additions & 2 deletions python/ray/data/_internal/progress_bar.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,26 @@ def set_progress_bars(enabled: bool) -> bool:


class ProgressBar:
"""Thin wrapper around tqdm to handle soft imports."""
"""Thin wrapper around tqdm to handle soft imports.

If `total` is `None` known (for example, it is unknown
because no tasks have finished yet), doesn't display the full
progress bar. Still displays basic progress stats from tqdm."""

def __init__(
self,
name: str,
total: int,
total: Optional[int],
unit: str,
position: int = 0,
enabled: Optional[bool] = None,
):
self._desc = name
self._progress = 0
# Prepend a space to the unit for better formatting.
if unit[0] != " ":
unit = " " + unit

if enabled is None:
from ray.data import DataContext

Expand Down
23 changes: 21 additions & 2 deletions python/ray/data/tests/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,18 @@ def dummy_all_transform(bundles: List[RefBundle], ctx):


def test_num_outputs_total():
# The number of outputs is always known for InputDataBuffer.
input_op = InputDataBuffer(make_ref_bundles([[i] for i in range(100)]))
assert input_op.num_outputs_total() == 100

# Prior to execution, the number of outputs is unknown
# for Map/AllToAllOperator operators.
op1 = MapOperator.create(
_mul2_map_data_prcessor,
input_op=input_op,
name="TestMapper",
)
assert op1.num_outputs_total() == 100
assert op1.num_outputs_total() is None

def dummy_all_transform(bundles: List[RefBundle]):
return make_ref_bundles([[1, 2], [3, 4]]), {"FooStats": []}
Expand All @@ -131,7 +136,21 @@ def dummy_all_transform(bundles: List[RefBundle]):
target_max_block_size=DataContext.get_current().target_max_block_size,
name="TestAll",
)
assert op2.num_outputs_total() == 100
assert op2.num_outputs_total() is None

# Feed data and implement streaming exec.
output = []
op1.start(ExecutionOptions(actor_locality_enabled=True))
while input_op.has_next():
op1.add_input(input_op.get_next(), 0)
while not op1.has_next():
run_one_op_task(op1)
while op1.has_next():
ref = op1.get_next()
assert ref.owns_blocks, ref
_get_blocks(ref, output)
# After op finishes, num_outputs_total is known.
assert op1.num_outputs_total() == 100


@pytest.mark.parametrize("use_actors", [False, True])
Expand Down