Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Async batch fetching for map_batches #31576

Merged
merged 23 commits into from
Jan 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion python/ray/data/_internal/block_batching.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import collections
import itertools
import queue
import sys
from typing import Iterator, Optional, Union
import threading
from typing import Iterator, Optional, TypeVar, Union

import ray
from ray.actor import ActorHandle
Expand All @@ -13,6 +15,7 @@
from ray.types import ObjectRef
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

T = TypeVar("T")

if sys.version_info >= (3, 7):
from contextlib import nullcontext
Expand All @@ -39,6 +42,7 @@ def batch_block_refs(
shuffle_buffer_min_size: Optional[int] = None,
shuffle_seed: Optional[int] = None,
ensure_copy: bool = False,
prefetch_batches: int = 0,
) -> Iterator[DataBatch]:
"""Create formatted batches of data from 1 or more block object references.

Expand Down Expand Up @@ -71,6 +75,12 @@ def batch_block_refs(
shuffle_seed: The seed to use for the local random shuffle.
ensure_copy: Whether batches are always copied from the underlying base
blocks (not zero-copy views).
prefetch_batches: The number of batches to fetch ahead of the current batch to
process. If set to greater than 0, a separate thread will be used to fetch
the specified amount of formatted batches from blocks. This improves
performance for non-CPU bound UDFs, allowing batch fetching compute and
formatting to be overlapped with the UDF. Defaults to 0 (no prefetching
enabled).

Returns:
An iterator over record batches.
Expand Down Expand Up @@ -107,6 +117,7 @@ def batch_block_refs(
shuffle_buffer_min_size=shuffle_buffer_min_size,
shuffle_seed=shuffle_seed,
ensure_copy=ensure_copy,
prefetch_batches=prefetch_batches,
)


Expand All @@ -120,6 +131,7 @@ def batch_blocks(
shuffle_buffer_min_size: Optional[int] = None,
shuffle_seed: Optional[int] = None,
ensure_copy: bool = False,
prefetch_batches: int = 0,
) -> Iterator[DataBatch]:
"""Create formatted batches of data from 1 or more blocks.

Expand All @@ -142,12 +154,56 @@ def batch_blocks(
stats=stats,
)

if prefetch_batches > 0:
batch_iter = _make_async_gen(batch_iter, prefetch_buffer_size=prefetch_batches)

for formatted_batch in batch_iter:
user_timer = stats.iter_user_s.timer() if stats else nullcontext()
with user_timer:
yield formatted_batch


def _make_async_gen(
amogkam marked this conversation as resolved.
Show resolved Hide resolved
base_iterator: Iterator[T], prefetch_buffer_size: int = 1
c21 marked this conversation as resolved.
Show resolved Hide resolved
amogkam marked this conversation as resolved.
Show resolved Hide resolved
) -> Iterator[T]:
"""Returns a new iterator with elements fetched from the base_iterator
in an sync fashion using a background thread.
amogkam marked this conversation as resolved.
Show resolved Hide resolved

Args:
base_iterator: The iterator to asynchronously fetch from.
prefetch_buffer_size: The maximum number of items to prefetch. Increasing the
size allows for more computation overlap for very expensive downstream udfs.
amogkam marked this conversation as resolved.
Show resolved Hide resolved
However it comes at the cost of additional memory overhead.

Returns:
An iterator with the same elements as the base_iterator.
"""

# TODO: Add other async implementations: ThreadPool, Ray Actors.
c21 marked this conversation as resolved.
Show resolved Hide resolved

fetch_queue = queue.Queue(maxsize=prefetch_buffer_size)

def _async_fetch():
for item in base_iterator:
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved
fetch_queue.put(item, block=True)
amogkam marked this conversation as resolved.
Show resolved Hide resolved
# sentinel value.
fetch_queue.put(None, block=True)

fetch_thread = threading.Thread(target=_async_fetch)
fetch_thread.start()

while True:
next_item = fetch_queue.get(block=True)
if next_item is not None:
yield next_item
fetch_queue.task_done()
if next_item is None:
break

fetch_queue.join()
fetch_thread.join()


def _resolve_blocks(
block_ref_iter: Iterator[ObjectRef[Block]],
stats: Optional[Union[DatasetStats, DatasetPipelineStats]] = None,
Expand Down
38 changes: 34 additions & 4 deletions python/ray/data/_internal/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,41 @@ def _apply(
# Bin blocks by target block size.
if target_block_size is not None:
_check_batch_size(blocks_in, target_block_size, name)
block_bundles = _bundle_blocks_up_to_size(
blocks_in, target_block_size, name
)
block_bundles: List[
Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]]
] = _bundle_blocks_up_to_size(blocks_in, target_block_size, name)
else:
block_bundles = [((b,), (m,)) for b, m in blocks_in]
block_bundles: List[
Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]]
] = [((b,), (m,)) for b, m in blocks_in]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a necessary change needed to get the performance improvements for batch prediction.

Before, we would only bundle blocks up to batch size and submit each bundle as a separate actor task. This means we cannot do prefetching when batch size is greater than block size since each bundle is a separate task.

Instead, if the max actor pool size is set, then we bundle up to min(batch size, max actor pool size).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully once we switch to a fully iterator-based implementation, these type of special cases are no longer necessary.


# If the max number of the actor pool is set, the number of bundles should
# always be less than this max_size.
# Otherwise, it leads to inefficiencies with creating extra actor tasks and
# prevents the actor task from doing optimizations such as batch or block prefetching.
if self.max_size and len(block_bundles) > self.max_size:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will become deprecated with new executor backend, cc @clarkzinzow.

amogkam marked this conversation as resolved.
Show resolved Hide resolved

def chunkify(bundles: List, num_chunks: int):
return [bundles[i::num_chunks] for i in range(num_chunks)]
Copy link
Contributor

@clarkzinzow clarkzinzow Jan 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure that I understand the motivation, this is giving us stratified chunking, where a given chunk consists of an equal number of blocks from each of the original bundles (modulo the number of chunks), right? Might be worth leaving a comment as much for those that are less familiar with this pattern.

Two potential issues with this chunking scheme:

  1. This breaks block and therefore row ordering; the previous block bundling and actor compute strategy made sure to preserve it. This doesn't matter for batch prediction workloads but may matter for other workloads that use the actor compute strategy.
  2. There are pathological cases of skewed blocks/bundles that could pop up. E.g. suppose we had bundles = [[1, 2, 3], [4, 5, 6], [7, 8, 9]] (pretend the numbers are block IDs) and num_chunks = 2, and suppose that blocks with odd IDs are much larger than blocks with even IDs; this chunking would produce bundles [[1, 3, 5, 7, 9], [2, 4, 6, 8]], where the first bundle is way, way larger than the second bundle.

Could solve (1) by changing the rechunking to merge adjacent chunks without breaking ordering, but (2) would require rebundling while taking the block sizes into account. I think that (1) is probably a blocker but (2) is not, cc @matthewdeng @c21 for more opinions.

If we are only wanting to solve (1) for now, we could do the simple thing of merging adjacent bundles until we either (1) are at the specified number of chunks (max pool size), or (2) all would-be merged bundles exceed the max target block size threshold (currently 512 MiB by default).

Could do something like the following progressive merging of adjacent bundles, which should preserve block/row order:

def rebundle_to_size(bundles: list, num_bundles: int):
    if len(bundles) <= num_bundles:
        # Already done.
        return bundles
    max_bundle_size = DatasetContext.get_current().target_max_block_size
    # Carry out multiple rounds of merging adjacent blocks, until we have scaled down
    # to num_bundles bundles, or we've stopped making merging progress.
    while len(bundles) > num_bundles:
        new_bundles = []
        num_merges = 0
        for i in range(len(bundles) // 2):
            left, right = bundles[2 * i], bundles[2 * i + 1]
            left_size = sum(meta.size_bytes for _, meta in left)
            right_size = sum(meta.size_bytes for _, meta in right)
            if left_size + right_size <= max_bundle_size:
                # Merging these bundles would be under the max bundle size, so we merge them.
                new_bundles.append(left + right)
                num_merges += 1
                if len(bundles) - num_merges == num_bundles:
                    # This merging round has already brought us to the requisite number of bundles,
                    # so we short-circuit.
                    break
            else:
                new_bundles.extend([left, right])
        if num_merges == 0:
            break
        # Add leftover bundles due to odd number of bundles or short-circuiting to new bundles.
        for j in range(2*i + 1, len(bundles)):
            new_bundles.append(bundles[i])
        bundles = new_bundles
    return bundles


bundle_groups: List[
List[Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]]]
] = chunkify(block_bundles, num_chunks=self.max_size)

final_bundles = []
for bundle_group in bundle_groups:
# bundle_group is a list of tuples of lists.
blocks = []
metadata = []
for bundle in bundle_group:
blocks.extend(list(bundle[0]))
metadata.extend(list(bundle[1]))

consolidated_bundle = (tuple(blocks), tuple(metadata))

final_bundles.append(consolidated_bundle)
block_bundles = final_bundles

del blocks_in
owned_by_consumer = block_list._owned_by_consumer

Expand Down
5 changes: 5 additions & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ def map_batches(
batch_size: Optional[Union[int, Literal["default"]]] = "default",
compute: Optional[Union[str, ComputeStrategy]] = None,
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
prefetch_batches: int = 0,
zero_copy_batch: bool = False,
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -469,6 +470,9 @@ def map_batches(
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or
``"numpy"`` to select ``numpy.ndarray`` for tensor datasets and
``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "default".
prefetch_batches: The number of batches to fetch ahead of the current batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When porting this to the new executor, we should try to consolidate prefetch_batches and prefetch_blocks into a single prefetch_batches argument, where we always prefetch enough blocks to satisfy prefetch_batches, which should be simple enough to implement since we have the size for each to-be-fetched block on hand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep +1!

to process. If set to greater than 0, a separate thread will be used
to fetch the specified amount of formatted batches from blocks. This improves performance for non-CPU bound UDFs, allowing batch fetching compute and formatting to be overlapped with the UDF. Defaults to 0 (no prefetching enabled.) Increasing the number of batches to prefetch can result in higher throughput, at the expense of requiring more heap memory to buffer the batches.
zero_copy_batch: Whether ``fn`` should be provided zero-copy, read-only
batches. If this is ``True`` and no copy is required for the
``batch_format`` conversion, the batch will be a zero-copy, read-only
Expand Down Expand Up @@ -622,6 +626,7 @@ def process_next_batch(batch: DataBatch) -> Iterator[Block]:
batch_size=batch_size,
batch_format=batch_format,
ensure_copy=not zero_copy_batch and batch_size is not None,
prefetch_batches=prefetch_batches,
)

for batch in formatted_batch_iter:
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/dataset_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ def map_batches(
batch_size: Optional[Union[int, Literal["default"]]] = "default",
compute: Optional[Union[str, ComputeStrategy]] = None,
batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
prefetch_batches: int = 0,
fn_args: Optional[Iterable[Any]] = None,
fn_kwargs: Optional[Dict[str, Any]] = None,
fn_constructor_args: Optional[Iterable[Any]] = None,
Expand All @@ -779,6 +780,7 @@ def map_batches(
batch_size=batch_size,
compute=compute,
batch_format=batch_format,
prefetch_batches=prefetch_batches,
fn_args=fn_args,
fn_kwargs=fn_kwargs,
fn_constructor_args=fn_constructor_args,
Expand Down
92 changes: 92 additions & 0 deletions python/ray/data/tests/test_block_batching.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import time
from typing import List
from unittest import mock

Expand All @@ -14,6 +15,7 @@
_prefetch_blocks,
_blocks_to_batches,
_format_batches,
_make_async_gen,
)


Expand Down Expand Up @@ -121,6 +123,96 @@ def test_format_batches(batch_format):
assert isinstance(batch["foo"], np.ndarray)


def test_async_batch_fetching():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a test for map_batches as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it but there's too much time variance for a deterministic small-scale map_batches CI test. I'll confirm the performance improvements via running the batch inference release tests.

blocks = block_generator(num_blocks=5, num_rows=8)

def sleep_batch_format(batch_iter, *args, **kwargs):
for batch in batch_iter:
time.sleep(2)
yield batch

with mock.patch(
"ray.data._internal.block_batching._format_batches", sleep_batch_format
):
batch_iter = batch_blocks(blocks=blocks, prefetch_batches=1)
outputs = []
start_time = time.time()
for batch in batch_iter:
time.sleep(3)
outputs.append(batch)
end_time = time.time()

total_time = end_time - start_time
# Total time should be based on number of times the udf is called
# (which is equal to len(outputs)).
# The 2 seconds sleep in next_batch is overlapped, so does not count
# towards total time.
assert total_time < len(outputs) * 3 + 3

# There should be no dropped rows.
assert sum(len(output_batch) for output_batch in outputs) == 40, sum(
len(output_batch) for output_batch in outputs
) # 5 blocks with 8 rows each.


def test_make_async_gen():
"""Tests that make_async_gen overlaps compute."""

num_items = 10

def gen():
for i in range(num_items):
time.sleep(2)
yield i

def sleep_udf(item):
time.sleep(3)
return item

iterator = _make_async_gen(gen())

start_time = time.time()
outputs = []
for item in iterator:
outputs.append(sleep_udf(item))
end_time = time.time()

assert outputs == list(range(num_items))

assert end_time - start_time < num_items * 3 + 3


def test_make_async_gen_buffer_size():
"""Tests that multiple items can be prefetched at a time
with larger buffer size."""

num_items = 5

def gen():
for i in range(num_items):
time.sleep(1)
yield i

def sleep_udf(item):
time.sleep(5)
return item

iterator = _make_async_gen(gen(), prefetch_buffer_size=4)

start_time = time.time()

# Only sleep for first item.
sleep_udf(next(iterator))

# All subsequent items should already be prefetched and should be ready.
for _ in iterator:
pass
end_time = time.time()

# 1 second for first item, 5 seconds for udf, 0.5 seconds buffer
assert end_time - start_time < 6.5


if __name__ == "__main__":
import sys

Expand Down
1 change: 1 addition & 0 deletions python/ray/train/batch_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def __call__(self, input_batch: DataBatchType) -> DataBatchType:
if self.get_preprocessor() is not None
else predict_stage_batch_format,
batch_size=batch_size,
prefetch_batches=num_gpus_per_worker > 0,
amogkam marked this conversation as resolved.
Show resolved Hide resolved
**ray_remote_args,
)

Expand Down