From 326376b09b9747e477a86ddd8565d59385340177 Mon Sep 17 00:00:00 2001 From: amogkam Date: Tue, 10 Jan 2023 15:03:40 -0800 Subject: [PATCH 01/20] add Signed-off-by: amogkam --- python/ray/data/_internal/block_batching.py | 58 +++++++++++- python/ray/data/dataset.py | 5 ++ python/ray/data/dataset_pipeline.py | 2 + python/ray/data/tests/test_block_batching.py | 92 ++++++++++++++++++++ 4 files changed, 156 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/block_batching.py b/python/ray/data/_internal/block_batching.py index 9d387bbeda42..10eb4a10f8fe 100644 --- a/python/ray/data/_internal/block_batching.py +++ b/python/ray/data/_internal/block_batching.py @@ -1,7 +1,9 @@ import collections import itertools +import queue import sys -from typing import Iterator, Optional, Union +import threading +from typing import Iterator, Optional, TypeVar, Union import ray from ray.actor import ActorHandle @@ -13,6 +15,7 @@ from ray.types import ObjectRef from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy +T = TypeVar("T") if sys.version_info >= (3, 7): from contextlib import nullcontext @@ -39,6 +42,7 @@ def batch_block_refs( shuffle_buffer_min_size: Optional[int] = None, shuffle_seed: Optional[int] = None, ensure_copy: bool = False, + prefetch_batches: int = 0, ) -> Iterator[DataBatch]: """Create formatted batches of data from 1 or more block object references. @@ -71,6 +75,12 @@ def batch_block_refs( shuffle_seed: The seed to use for the local random shuffle. ensure_copy: Whether batches are always copied from the underlying base blocks (not zero-copy views). + prefetch_batches: The number of batches to fetch ahead of the current batch to + process. If set to greater than 0, a separate thread will be used to fetch + the specified amount of formatted batches from blocks. This improves + performance for non-CPU bound UDFs, allowing batch fetching compute and + formatting to be overlapped with the UDF. Defaults to 0 (no prefetching + enabled). Returns: An iterator over record batches. @@ -107,6 +117,7 @@ def batch_block_refs( shuffle_buffer_min_size=shuffle_buffer_min_size, shuffle_seed=shuffle_seed, ensure_copy=ensure_copy, + prefetch_batches=prefetch_batches, ) @@ -120,6 +131,7 @@ def batch_blocks( shuffle_buffer_min_size: Optional[int] = None, shuffle_seed: Optional[int] = None, ensure_copy: bool = False, + prefetch_batches: int = 0, ) -> Iterator[DataBatch]: """Create formatted batches of data from 1 or more blocks. @@ -142,12 +154,56 @@ def batch_blocks( stats=stats, ) + if prefetch_batches > 0: + batch_iter = _make_async_gen(batch_iter, prefetch_buffer_size=prefetch_batches) + for formatted_batch in batch_iter: user_timer = stats.iter_user_s.timer() if stats else nullcontext() with user_timer: yield formatted_batch +def _make_async_gen( + base_iterator: Iterator[T], prefetch_buffer_size: int = 1 +) -> Iterator[T]: + """Returns a new iterator with elements fetched from the base_iterator + in an sync fashion using a background thread. + + Args: + base_iterator: The iterator to asynchronously fetch from. + prefetch_buffer_size: The maximum number of items to prefetch. Increasing the + size allows for more computation overlap for very expensive downstream udfs. + However it comes at the cost of additional memory overhead. + + Returns: + An iterator with the same elements as the base_iterator. + """ + + # TODO: Add other async implementations: ThreadPool, Ray Actors. + + fetch_queue = queue.Queue(maxsize=prefetch_buffer_size) + + def _async_fetch(): + for item in base_iterator: + fetch_queue.put(item, block=True) + # sentinel value. + fetch_queue.put(None, block=True) + + fetch_thread = threading.Thread(target=_async_fetch) + fetch_thread.start() + + while True: + next_item = fetch_queue.get(block=True) + if next_item is not None: + yield next_item + fetch_queue.task_done() + if next_item is None: + break + + fetch_queue.join() + fetch_thread.join() + + def _resolve_blocks( block_ref_iter: Iterator[ObjectRef[Block]], stats: Optional[Union[DatasetStats, DatasetPipelineStats]] = None, diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 103f4cf54bef..4d1609dd06f3 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -325,6 +325,7 @@ def map_batches( batch_size: Optional[Union[int, Literal["default"]]] = "default", compute: Optional[Union[str, ComputeStrategy]] = None, batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default", + prefetch_batches: int = 0, zero_copy_batch: bool = False, fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, @@ -469,6 +470,9 @@ def map_batches( ``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or ``"numpy"`` to select ``numpy.ndarray`` for tensor datasets and ``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "default". + prefetch_batches: The number of batches to fetch ahead of the current batch + to process. If set to greater than 0, a separate thread will be used + to fetch the specified amount of formatted batches from blocks. This improves performance for non-CPU bound UDFs, allowing batch fetching compute and formatting to be overlapped with the UDF. Defaults to 0 (no prefetching enabled.) Increasing the number of batches to prefetch can result in higher throughput, at the expense of requiring more heap memory to buffer the batches. zero_copy_batch: Whether ``fn`` should be provided zero-copy, read-only batches. If this is ``True`` and no copy is required for the ``batch_format`` conversion, the batch will be a zero-copy, read-only @@ -622,6 +626,7 @@ def process_next_batch(batch: DataBatch) -> Iterator[Block]: batch_size=batch_size, batch_format=batch_format, ensure_copy=not zero_copy_batch and batch_size is not None, + prefetch_batches=prefetch_batches, ) for batch in formatted_batch_iter: diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 9a3e41016da3..39522b6f0e29 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -765,6 +765,7 @@ def map_batches( batch_size: Optional[Union[int, Literal["default"]]] = "default", compute: Optional[Union[str, ComputeStrategy]] = None, batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default", + prefetch_batches: int = 0, fn_args: Optional[Iterable[Any]] = None, fn_kwargs: Optional[Dict[str, Any]] = None, fn_constructor_args: Optional[Iterable[Any]] = None, @@ -779,6 +780,7 @@ def map_batches( batch_size=batch_size, compute=compute, batch_format=batch_format, + prefetch_batches=prefetch_batches, fn_args=fn_args, fn_kwargs=fn_kwargs, fn_constructor_args=fn_constructor_args, diff --git a/python/ray/data/tests/test_block_batching.py b/python/ray/data/tests/test_block_batching.py index 1bb8e0a907d6..80a831359e37 100644 --- a/python/ray/data/tests/test_block_batching.py +++ b/python/ray/data/tests/test_block_batching.py @@ -1,4 +1,5 @@ import pytest +import time from typing import List from unittest import mock @@ -14,6 +15,7 @@ _prefetch_blocks, _blocks_to_batches, _format_batches, + _make_async_gen, ) @@ -121,6 +123,96 @@ def test_format_batches(batch_format): assert isinstance(batch["foo"], np.ndarray) +def test_async_batch_fetching(): + blocks = block_generator(num_blocks=5, num_rows=8) + + def sleep_batch_format(batch_iter, *args, **kwargs): + for batch in batch_iter: + time.sleep(2) + yield batch + + with mock.patch( + "ray.data._internal.block_batching._format_batches", sleep_batch_format + ): + batch_iter = batch_blocks(blocks=blocks, prefetch_batches=1) + outputs = [] + start_time = time.time() + for batch in batch_iter: + time.sleep(3) + outputs.append(batch) + end_time = time.time() + + total_time = end_time - start_time + # Total time should be based on number of times the udf is called + # (which is equal to len(outputs)). + # The 2 seconds sleep in next_batch is overlapped, so does not count + # towards total time. + assert total_time < len(outputs) * 3 + 3 + + # There should be no dropped rows. + assert sum(len(output_batch) for output_batch in outputs) == 40, sum( + len(output_batch) for output_batch in outputs + ) # 5 blocks with 8 rows each. + + +def test_make_async_gen(): + """Tests that make_async_gen overlaps compute.""" + + num_items = 10 + + def gen(): + for i in range(num_items): + time.sleep(2) + yield i + + def sleep_udf(item): + time.sleep(3) + return item + + iterator = _make_async_gen(gen()) + + start_time = time.time() + outputs = [] + for item in iterator: + outputs.append(sleep_udf(item)) + end_time = time.time() + + assert outputs == list(range(num_items)) + + assert end_time - start_time < num_items * 3 + 3 + + +def test_make_async_gen_buffer_size(): + """Tests that multiple items can be prefetched at a time + with larger buffer size.""" + + num_items = 5 + + def gen(): + for i in range(num_items): + time.sleep(1) + yield i + + def sleep_udf(item): + time.sleep(5) + return item + + iterator = _make_async_gen(gen(), prefetch_buffer_size=4) + + start_time = time.time() + + # Only sleep for first item. + sleep_udf(next(iterator)) + + # All subsequent items should already be prefetched and should be ready. + for _ in iterator: + pass + end_time = time.time() + + # 1 second for first item, 5 seconds for udf, 0.5 seconds buffer + assert end_time - start_time < 6.5 + + if __name__ == "__main__": import sys From 58d592f5296c30589bd8251dda10d5631bc7b656 Mon Sep 17 00:00:00 2001 From: amogkam Date: Tue, 10 Jan 2023 15:16:15 -0800 Subject: [PATCH 02/20] bundling Signed-off-by: amogkam --- python/ray/data/_internal/compute.py | 38 +++++++++++++++++++++++++--- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index eed0337a8e59..abc8e04bfc5f 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -258,11 +258,41 @@ def _apply( # Bin blocks by target block size. if target_block_size is not None: _check_batch_size(blocks_in, target_block_size, name) - block_bundles = _bundle_blocks_up_to_size( - blocks_in, target_block_size, name - ) + block_bundles: List[ + Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]] + ] = _bundle_blocks_up_to_size(blocks_in, target_block_size, name) else: - block_bundles = [((b,), (m,)) for b, m in blocks_in] + block_bundles: List[ + Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]] + ] = [((b,), (m,)) for b, m in blocks_in] + + # If the max number of the actor pool is set, the number of bundles should + # always be less than this max_size. + # Otherwise, it leads to inefficiencies with creating extra actor tasks and + # prevents the actor task from doing optimizations such as batch or block prefetching. + if self.max_size and len(block_bundles) > self.max_size: + + def chunkify(bundles: List, num_chunks: int): + return [bundles[i::num_chunks] for i in range(num_chunks)] + + bundle_groups: List[ + List[Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]]] + ] = chunkify(block_bundles, num_chunks=self.max_size) + + final_bundles = [] + for bundle_group in bundle_groups: + # bundle_group is a list of tuples of lists. + blocks = [] + metadata = [] + for bundle in bundle_group: + blocks.extend(list(bundle[0])) + metadata.extend(list(bundle[1])) + + consolidated_bundle = (tuple(blocks), tuple(metadata)) + + final_bundles.append(consolidated_bundle) + block_bundles = final_bundles + del blocks_in owned_by_consumer = block_list._owned_by_consumer From 8f53df8b45f2fd53e0a368423c6597baefa06a76 Mon Sep 17 00:00:00 2001 From: amogkam Date: Tue, 10 Jan 2023 15:17:30 -0800 Subject: [PATCH 03/20] batch predictor Signed-off-by: amogkam --- python/ray/train/batch_predictor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/train/batch_predictor.py b/python/ray/train/batch_predictor.py index 302adc67e5fc..9ca2bf534ab8 100644 --- a/python/ray/train/batch_predictor.py +++ b/python/ray/train/batch_predictor.py @@ -306,6 +306,7 @@ def __call__(self, input_batch: DataBatchType) -> DataBatchType: if self.get_preprocessor() is not None else predict_stage_batch_format, batch_size=batch_size, + prefetch_batches=num_gpus_per_worker > 0, **ray_remote_args, ) From 2952c581b644e4c58aaee55727ad24516edac93d Mon Sep 17 00:00:00 2001 From: amogkam Date: Tue, 10 Jan 2023 15:30:04 -0800 Subject: [PATCH 04/20] update Signed-off-by: amogkam --- python/ray/data/tests/test_block_batching.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_block_batching.py b/python/ray/data/tests/test_block_batching.py index 80a831359e37..830d4f95589a 100644 --- a/python/ray/data/tests/test_block_batching.py +++ b/python/ray/data/tests/test_block_batching.py @@ -145,7 +145,7 @@ def sleep_batch_format(batch_iter, *args, **kwargs): total_time = end_time - start_time # Total time should be based on number of times the udf is called # (which is equal to len(outputs)). - # The 2 seconds sleep in next_batch is overlapped, so does not count + # The 2 seconds sleep in sleep_batch_format is overlapped, so does not count # towards total time. assert total_time < len(outputs) * 3 + 3 From adb942af2655bfeba91e3511a58a102a0c50ae50 Mon Sep 17 00:00:00 2001 From: amogkam Date: Wed, 11 Jan 2023 12:08:52 -0800 Subject: [PATCH 05/20] address comments Signed-off-by: amogkam --- python/ray/data/_internal/block_batching.py | 4 +--- python/ray/data/_internal/compute.py | 3 ++- python/ray/train/batch_predictor.py | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/python/ray/data/_internal/block_batching.py b/python/ray/data/_internal/block_batching.py index 10eb4a10f8fe..4ee8ae18b5ad 100644 --- a/python/ray/data/_internal/block_batching.py +++ b/python/ray/data/_internal/block_batching.py @@ -173,14 +173,12 @@ def _make_async_gen( base_iterator: The iterator to asynchronously fetch from. prefetch_buffer_size: The maximum number of items to prefetch. Increasing the size allows for more computation overlap for very expensive downstream udfs. - However it comes at the cost of additional memory overhead. + However it comes at the cost of additional memory overhead. Defaults to 1. Returns: An iterator with the same elements as the base_iterator. """ - # TODO: Add other async implementations: ThreadPool, Ray Actors. - fetch_queue = queue.Queue(maxsize=prefetch_buffer_size) def _async_fetch(): diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index abc8e04bfc5f..505ae196412e 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -269,7 +269,8 @@ def _apply( # If the max number of the actor pool is set, the number of bundles should # always be less than this max_size. # Otherwise, it leads to inefficiencies with creating extra actor tasks and - # prevents the actor task from doing optimizations such as batch or block prefetching. + # prevents the actor task from doing optimizations + # such as batch or block prefetching. if self.max_size and len(block_bundles) > self.max_size: def chunkify(bundles: List, num_chunks: int): diff --git a/python/ray/train/batch_predictor.py b/python/ray/train/batch_predictor.py index 9ca2bf534ab8..0176881e307b 100644 --- a/python/ray/train/batch_predictor.py +++ b/python/ray/train/batch_predictor.py @@ -306,7 +306,7 @@ def __call__(self, input_batch: DataBatchType) -> DataBatchType: if self.get_preprocessor() is not None else predict_stage_batch_format, batch_size=batch_size, - prefetch_batches=num_gpus_per_worker > 0, + prefetch_batches=int(num_gpus_per_worker > 0), **ray_remote_args, ) From b761421f99e8b976a2b967b5b57b82898f600128 Mon Sep 17 00:00:00 2001 From: amogkam Date: Wed, 11 Jan 2023 12:11:18 -0800 Subject: [PATCH 06/20] update sentinel value Signed-off-by: amogkam --- python/ray/data/_internal/block_batching.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/block_batching.py b/python/ray/data/_internal/block_batching.py index 4ee8ae18b5ad..8906efea2795 100644 --- a/python/ray/data/_internal/block_batching.py +++ b/python/ray/data/_internal/block_batching.py @@ -181,11 +181,14 @@ def _make_async_gen( fetch_queue = queue.Queue(maxsize=prefetch_buffer_size) + sentinel = object() + def _async_fetch(): for item in base_iterator: fetch_queue.put(item, block=True) - # sentinel value. - fetch_queue.put(None, block=True) + + # Indicate done adding items. + fetch_queue.put(sentinel, block=True) fetch_thread = threading.Thread(target=_async_fetch) fetch_thread.start() @@ -195,7 +198,7 @@ def _async_fetch(): if next_item is not None: yield next_item fetch_queue.task_done() - if next_item is None: + if next_item is sentinel: break fetch_queue.join() From ddf78b8ab09b1d60794a18e7a4d5101417ad8b39 Mon Sep 17 00:00:00 2001 From: amogkam Date: Wed, 11 Jan 2023 12:20:25 -0800 Subject: [PATCH 07/20] add tests Signed-off-by: amogkam --- python/ray/data/_internal/block_batching.py | 2 +- python/ray/data/tests/test_block_batching.py | 71 +++++++++++--------- 2 files changed, 40 insertions(+), 33 deletions(-) diff --git a/python/ray/data/_internal/block_batching.py b/python/ray/data/_internal/block_batching.py index 8906efea2795..321b26773e0d 100644 --- a/python/ray/data/_internal/block_batching.py +++ b/python/ray/data/_internal/block_batching.py @@ -195,7 +195,7 @@ def _async_fetch(): while True: next_item = fetch_queue.get(block=True) - if next_item is not None: + if next_item is not sentinel: yield next_item fetch_queue.task_done() if next_item is sentinel: diff --git a/python/ray/data/tests/test_block_batching.py b/python/ray/data/tests/test_block_batching.py index 830d4f95589a..3cfdf80eef61 100644 --- a/python/ray/data/tests/test_block_batching.py +++ b/python/ray/data/tests/test_block_batching.py @@ -123,38 +123,6 @@ def test_format_batches(batch_format): assert isinstance(batch["foo"], np.ndarray) -def test_async_batch_fetching(): - blocks = block_generator(num_blocks=5, num_rows=8) - - def sleep_batch_format(batch_iter, *args, **kwargs): - for batch in batch_iter: - time.sleep(2) - yield batch - - with mock.patch( - "ray.data._internal.block_batching._format_batches", sleep_batch_format - ): - batch_iter = batch_blocks(blocks=blocks, prefetch_batches=1) - outputs = [] - start_time = time.time() - for batch in batch_iter: - time.sleep(3) - outputs.append(batch) - end_time = time.time() - - total_time = end_time - start_time - # Total time should be based on number of times the udf is called - # (which is equal to len(outputs)). - # The 2 seconds sleep in sleep_batch_format is overlapped, so does not count - # towards total time. - assert total_time < len(outputs) * 3 + 3 - - # There should be no dropped rows. - assert sum(len(output_batch) for output_batch in outputs) == 40, sum( - len(output_batch) for output_batch in outputs - ) # 5 blocks with 8 rows each. - - def test_make_async_gen(): """Tests that make_async_gen overlaps compute.""" @@ -213,6 +181,45 @@ def sleep_udf(item): assert end_time - start_time < 6.5 +# Test for 3 cases +# 1. Batch size is less than block size +# 2. Batch size is more than block size +# 3. Block size is not divisble by batch size +@pytest.mark.parametrize("batch_size", [4, 10, 7]) +def test_async_batch_fetching(batch_size): + blocks = block_generator(num_blocks=5, num_rows=8) + + def sleep_batch_format(batch_iter, *args, **kwargs): + for batch in batch_iter: + time.sleep(2) + yield batch + + with mock.patch( + "ray.data._internal.block_batching._format_batches", sleep_batch_format + ): + batch_iter = batch_blocks( + batch_size=batch_size, blocks=blocks, prefetch_batches=1 + ) + outputs = [] + start_time = time.time() + for batch in batch_iter: + time.sleep(3) + outputs.append(batch) + end_time = time.time() + + total_time = end_time - start_time + # Total time should be based on number of times the udf is called + # (which is equal to len(outputs)). + # The 2 seconds sleep in sleep_batch_format is overlapped, so does not count + # towards total time. + assert total_time < len(outputs) * 3 + 3 + + # There should be no dropped rows. + assert sum(len(output_batch) for output_batch in outputs) == 40, sum( + len(output_batch) for output_batch in outputs + ) # 5 blocks with 8 rows each. + + if __name__ == "__main__": import sys From 68590129ba2712c618bfe54912f4e15a955c6c5e Mon Sep 17 00:00:00 2001 From: amogkam Date: Thu, 12 Jan 2023 17:28:03 -0800 Subject: [PATCH 08/20] update release test Signed-off-by: amogkam --- .../air_tests/air_benchmarks/workloads/gpu_batch_prediction.py | 2 ++ release/release_tests.yaml | 2 ++ 2 files changed, 4 insertions(+) diff --git a/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py b/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py index 8f1cc456f531..a3f52ca4e7d9 100644 --- a/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py +++ b/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py @@ -60,6 +60,8 @@ def to_tensor(batch: np.ndarray) -> torch.Tensor: predictor.predict( dataset, num_gpus_per_worker=int(not smoke_test), + min_scoring_workers=1, + max_scoring_workers=ray.cluster_resources()["GPU"], batch_size=512, ) total_time_s = round(time.time() - start, 2) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 1c6d26d10831..e41d2299c85a 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -327,6 +327,8 @@ script: python workloads/gpu_batch_prediction.py --data-size-gb 100 type: job + wait_for_nodes: + num_nodes: 4 alert: default From 973fe12c0ea7848f9ba8bcb20e63e70ecfc1323e Mon Sep 17 00:00:00 2001 From: amogkam Date: Fri, 13 Jan 2023 14:26:03 -0800 Subject: [PATCH 09/20] int Signed-off-by: amogkam --- .../air_tests/air_benchmarks/workloads/gpu_batch_prediction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py b/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py index a3f52ca4e7d9..fe54b5cd1a95 100644 --- a/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py +++ b/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py @@ -61,7 +61,7 @@ def to_tensor(batch: np.ndarray) -> torch.Tensor: dataset, num_gpus_per_worker=int(not smoke_test), min_scoring_workers=1, - max_scoring_workers=ray.cluster_resources()["GPU"], + max_scoring_workers=int(ray.cluster_resources()["GPU"]), batch_size=512, ) total_time_s = round(time.time() - start, 2) From e37e46bf2a25f9a18dc381477d93076c9935a05a Mon Sep 17 00:00:00 2001 From: amogkam Date: Wed, 18 Jan 2023 16:05:26 -0800 Subject: [PATCH 10/20] address comments Signed-off-by: amogkam --- python/ray/data/_internal/block_batching.py | 4 ++-- python/ray/data/_internal/compute.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/block_batching.py b/python/ray/data/_internal/block_batching.py index 321b26773e0d..e0b1e2d2f83f 100644 --- a/python/ray/data/_internal/block_batching.py +++ b/python/ray/data/_internal/block_batching.py @@ -167,12 +167,12 @@ def _make_async_gen( base_iterator: Iterator[T], prefetch_buffer_size: int = 1 ) -> Iterator[T]: """Returns a new iterator with elements fetched from the base_iterator - in an sync fashion using a background thread. + in an async fashion using a background thread. Args: base_iterator: The iterator to asynchronously fetch from. prefetch_buffer_size: The maximum number of items to prefetch. Increasing the - size allows for more computation overlap for very expensive downstream udfs. + size allows for more computation overlap for very expensive downstream UDFs. However it comes at the cost of additional memory overhead. Defaults to 1. Returns: diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 505ae196412e..9e97945523e2 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -271,7 +271,7 @@ def _apply( # Otherwise, it leads to inefficiencies with creating extra actor tasks and # prevents the actor task from doing optimizations # such as batch or block prefetching. - if self.max_size and len(block_bundles) > self.max_size: + if len(block_bundles) > self.max_size: def chunkify(bundles: List, num_chunks: int): return [bundles[i::num_chunks] for i in range(num_chunks)] From 3cf414b206a4262c43cfdf691d05ff3ddf7e4a94 Mon Sep 17 00:00:00 2001 From: amogkam Date: Thu, 19 Jan 2023 16:13:41 -0800 Subject: [PATCH 11/20] update Signed-off-by: amogkam --- python/ray/data/_internal/compute.py | 62 +++++++++------------------- 1 file changed, 20 insertions(+), 42 deletions(-) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 9e97945523e2..8be34e0c8648 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -91,7 +91,7 @@ def _apply( # Bin blocks by target block size. if target_block_size is not None: _check_batch_size(blocks, target_block_size, name) - block_bundles = _bundle_blocks_up_to_size(blocks, target_block_size, name) + block_bundles = _bundle_blocks_up_to_size(blocks, target_block_size) else: block_bundles = [((b,), (m,)) for b, m in blocks] del blocks @@ -254,45 +254,24 @@ def _apply( if name is None: name = "map" - blocks_in = block_list.get_blocks_with_metadata() - # Bin blocks by target block size. + blocks_in: List[ + Tuple[ObjectRef[Block], BlockMetadata] + ] = block_list.get_blocks_with_metadata() + + # We bin blocks according to the following rules: + # 1. Attempt to bin up to the target block size. + # 2. If the max concurrency of the ActorPool is set, then + # cap the number of bundles to match the size of the ActorPool. + # This avoids additional overhead in submitting new actor tasks and allows + # the actor task to do optimizations such as batch prefetching. + total_size = sum(metadata.num_rows for _, metadata in blocks_in) if target_block_size is not None: _check_batch_size(blocks_in, target_block_size, name) - block_bundles: List[ - Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]] - ] = _bundle_blocks_up_to_size(blocks_in, target_block_size, name) - else: - block_bundles: List[ - Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]] - ] = [((b,), (m,)) for b, m in blocks_in] - - # If the max number of the actor pool is set, the number of bundles should - # always be less than this max_size. - # Otherwise, it leads to inefficiencies with creating extra actor tasks and - # prevents the actor task from doing optimizations - # such as batch or block prefetching. - if len(block_bundles) > self.max_size: - - def chunkify(bundles: List, num_chunks: int): - return [bundles[i::num_chunks] for i in range(num_chunks)] - - bundle_groups: List[ - List[Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]]] - ] = chunkify(block_bundles, num_chunks=self.max_size) - - final_bundles = [] - for bundle_group in bundle_groups: - # bundle_group is a list of tuples of lists. - blocks = [] - metadata = [] - for bundle in bundle_group: - blocks.extend(list(bundle[0])) - metadata.extend(list(bundle[1])) - - consolidated_bundle = (tuple(blocks), tuple(metadata)) - - final_bundles.append(consolidated_bundle) - block_bundles = final_bundles + target_num_bundles = min(self.max_size, total_size / target_block_size) + target_block_size = total_size // target_num_bundles + block_bundles: List[ + Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]] + ] = _bundle_blocks_up_to_size(blocks_in, target_block_size) del blocks_in owned_by_consumer = block_list._owned_by_consumer @@ -533,13 +512,12 @@ def _map_block_nosplit( def _bundle_blocks_up_to_size( blocks: List[Tuple[ObjectRef[Block], BlockMetadata]], target_size: int, - name: str, -) -> List[Tuple[List[ObjectRef[Block]], List[BlockMetadata]]]: +) -> List[Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]]]: """Group blocks into bundles that are up to (but not exceeding) the provided target size. """ - block_bundles = [] - curr_bundle = [] + block_bundles: List[List[Tuple[ObjectRef[Block], BlockMetadata]]] = [] + curr_bundle: List[Tuple[ObjectRef[Block], BlockMetadata]] = [] curr_bundle_size = 0 for b, m in blocks: num_rows = m.num_rows From 9273b01c156eab413bb9eb5e03d4024a246c5713 Mon Sep 17 00:00:00 2001 From: amogkam Date: Thu, 19 Jan 2023 16:16:49 -0800 Subject: [PATCH 12/20] reword Signed-off-by: amogkam --- python/ray/data/_internal/compute.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 8be34e0c8648..9a3b91872425 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -258,8 +258,8 @@ def _apply( Tuple[ObjectRef[Block], BlockMetadata] ] = block_list.get_blocks_with_metadata() - # We bin blocks according to the following rules: - # 1. Attempt to bin up to the target block size. + # We bundle blocks according to the following rules: + # 1. Attempt to bundle up to the target block size. # 2. If the max concurrency of the ActorPool is set, then # cap the number of bundles to match the size of the ActorPool. # This avoids additional overhead in submitting new actor tasks and allows From bc3b39ca5c0fdcd8294091f4ae72f6d7004d4553 Mon Sep 17 00:00:00 2001 From: amogkam Date: Thu, 19 Jan 2023 16:21:23 -0800 Subject: [PATCH 13/20] update Signed-off-by: amogkam --- python/ray/data/_internal/compute.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 9a3b91872425..4ed35048c640 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -264,14 +264,19 @@ def _apply( # cap the number of bundles to match the size of the ActorPool. # This avoids additional overhead in submitting new actor tasks and allows # the actor task to do optimizations such as batch prefetching. - total_size = sum(metadata.num_rows for _, metadata in blocks_in) + target_num_bundles = float("inf") if target_block_size is not None: _check_batch_size(blocks_in, target_block_size, name) - target_num_bundles = min(self.max_size, total_size / target_block_size) - target_block_size = total_size // target_num_bundles - block_bundles: List[ - Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]] - ] = _bundle_blocks_up_to_size(blocks_in, target_block_size) + total_size = sum(metadata.num_rows for _, metadata in blocks_in) + target_num_bundles = min(target_num_bundles, total_size / target_block_size) + target_num_bundles = min(target_num_bundles, self.max_size) + if not math.isinf(target_num_bundles): + target_block_size = total_size // target_num_bundles + block_bundles: List[ + Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]] + ] = _bundle_blocks_up_to_size(blocks_in, target_block_size) + else: + block_bundles = [((b,), (m,)) for b, m in blocks_in] del blocks_in owned_by_consumer = block_list._owned_by_consumer From a5c918462cb789932178c17240a3305a68f5f43a Mon Sep 17 00:00:00 2001 From: amogkam Date: Thu, 19 Jan 2023 16:38:46 -0800 Subject: [PATCH 14/20] update Signed-off-by: amogkam --- python/ray/data/_internal/compute.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 4ed35048c640..7fc977b15cd4 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -267,7 +267,10 @@ def _apply( target_num_bundles = float("inf") if target_block_size is not None: _check_batch_size(blocks_in, target_block_size, name) - total_size = sum(metadata.num_rows for _, metadata in blocks_in) + total_size = sum( + metadata.num_rows if metadata.num_rows is not None else float("inf") + for _, metadata in blocks_in + ) target_num_bundles = min(target_num_bundles, total_size / target_block_size) target_num_bundles = min(target_num_bundles, self.max_size) if not math.isinf(target_num_bundles): From 1c18dd774276177f948b8294137fe7e1947a32eb Mon Sep 17 00:00:00 2001 From: amogkam Date: Thu, 19 Jan 2023 17:15:07 -0800 Subject: [PATCH 15/20] add comment Signed-off-by: amogkam --- python/ray/data/_internal/block_batching.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/ray/data/_internal/block_batching.py b/python/ray/data/_internal/block_batching.py index e0b1e2d2f83f..23fafc160ef4 100644 --- a/python/ray/data/_internal/block_batching.py +++ b/python/ray/data/_internal/block_batching.py @@ -190,6 +190,11 @@ def _async_fetch(): # Indicate done adding items. fetch_queue.put(sentinel, block=True) + # Start a background thread which iterates through the base iterator, + # triggering execution and adding results to the queue until it is full. + # Iterating through the iterator returned by this function pulls + # ready items from the queue, allowing the background thread to continue execution. + fetch_thread = threading.Thread(target=_async_fetch) fetch_thread.start() From 1e1a49a26408b33c31085288d3e00b69e3165015 Mon Sep 17 00:00:00 2001 From: amogkam Date: Thu, 19 Jan 2023 17:22:45 -0800 Subject: [PATCH 16/20] cleanup Signed-off-by: amogkam --- python/ray/data/_internal/compute.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index 7fc977b15cd4..dd62ecd311d2 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -264,17 +264,20 @@ def _apply( # cap the number of bundles to match the size of the ActorPool. # This avoids additional overhead in submitting new actor tasks and allows # the actor task to do optimizations such as batch prefetching. - target_num_bundles = float("inf") + target_size = -1 + # First bundle up to the provided target block size. if target_block_size is not None: _check_batch_size(blocks_in, target_block_size, name) + target_size = max(target_block_size, target_size) + # If the max size of the actor pool is set, then we bundle up even more + # if necessary. + if not math.isinf(self.max_size): total_size = sum( metadata.num_rows if metadata.num_rows is not None else float("inf") for _, metadata in blocks_in ) - target_num_bundles = min(target_num_bundles, total_size / target_block_size) - target_num_bundles = min(target_num_bundles, self.max_size) - if not math.isinf(target_num_bundles): - target_block_size = total_size // target_num_bundles + target_size = max(target_size, total_size // self.max_size) + if target_size >= 0: block_bundles: List[ Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]] ] = _bundle_blocks_up_to_size(blocks_in, target_block_size) From adc206e1a8b251af6e64b7bc86da1bb3a3683ebf Mon Sep 17 00:00:00 2001 From: amogkam Date: Fri, 20 Jan 2023 13:20:14 -0800 Subject: [PATCH 17/20] comment and test Signed-off-by: amogkam --- python/ray/data/_internal/compute.py | 19 ++++++++----------- python/ray/data/tests/test_dataset.py | 16 ++++++++++++++++ 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/python/ray/data/_internal/compute.py b/python/ray/data/_internal/compute.py index dd62ecd311d2..ceb93d0ebc69 100644 --- a/python/ray/data/_internal/compute.py +++ b/python/ray/data/_internal/compute.py @@ -264,20 +264,17 @@ def _apply( # cap the number of bundles to match the size of the ActorPool. # This avoids additional overhead in submitting new actor tasks and allows # the actor task to do optimizations such as batch prefetching. - target_size = -1 - # First bundle up to the provided target block size. - if target_block_size is not None: - _check_batch_size(blocks_in, target_block_size, name) - target_size = max(target_block_size, target_size) - # If the max size of the actor pool is set, then we bundle up even more - # if necessary. + if target_block_size is None: + target_block_size = 0 if not math.isinf(self.max_size): total_size = sum( - metadata.num_rows if metadata.num_rows is not None else float("inf") - for _, metadata in blocks_in + meta.num_rows if meta.num_rows is not None else 0 + for _, meta in blocks_in ) - target_size = max(target_size, total_size // self.max_size) - if target_size >= 0: + pool_max_block_size = total_size // self.max_size + target_block_size = max(target_block_size, pool_max_block_size) + if target_block_size > 0: + _check_batch_size(blocks_in, target_block_size, name) block_bundles: List[ Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]] ] = _bundle_blocks_up_to_size(blocks_in, target_block_size) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index d509939ffb4b..973c9632104d 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -5441,6 +5441,22 @@ def f(x): ), "Number of actors is out of the expected bound" +def test_actor_pool_strategy_bundles_to_max_actors(shutdown_only): + """Tests that blocks are bundled up to the specified max number of actors.""" + + def f(x): + return x + + compute_strategy = ray.data.ActorPoolStrategy(max_size=1) + ds = ( + ray.data.range(10, parallelism=10) + .map_batches(f, batch_size=None, compute=compute_strategy) + .fully_executed() + ) + + assert "1/1 blocks" in ds.stats() + + def test_default_batch_format(shutdown_only): ds = ray.data.range(100) assert ds.default_batch_format() == list From 2424de4ddf467609f260f3757784041b1876e2b6 Mon Sep 17 00:00:00 2001 From: amogkam Date: Fri, 20 Jan 2023 13:21:48 -0800 Subject: [PATCH 18/20] update test Signed-off-by: amogkam --- python/ray/data/tests/test_dataset.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 973c9632104d..2101c9402a38 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -5447,14 +5447,15 @@ def test_actor_pool_strategy_bundles_to_max_actors(shutdown_only): def f(x): return x - compute_strategy = ray.data.ActorPoolStrategy(max_size=1) + max_size = 2 + compute_strategy = ray.data.ActorPoolStrategy(max_size=max_size) ds = ( ray.data.range(10, parallelism=10) .map_batches(f, batch_size=None, compute=compute_strategy) .fully_executed() ) - assert "1/1 blocks" in ds.stats() + assert f"{max_size}/{max_size} blocks" in ds.stats() def test_default_batch_format(shutdown_only): From 346bb49929af179b321e9f8cf38738c80dc2d59a Mon Sep 17 00:00:00 2001 From: amogkam Date: Fri, 20 Jan 2023 13:22:53 -0800 Subject: [PATCH 19/20] update test Signed-off-by: amogkam --- python/ray/data/tests/test_dataset.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 2101c9402a38..132f4aeb957e 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -5457,6 +5457,15 @@ def f(x): assert f"{max_size}/{max_size} blocks" in ds.stats() + # Check batch size is still respected. + ds = ( + ray.data.range(10, parallelism=10) + .map_batches(f, batch_size=10, compute=compute_strategy) + .fully_executed() + ) + + assert "1/1 blocks" in ds.stats() + def test_default_batch_format(shutdown_only): ds = ray.data.range(100) From ef60f1131533a78ff765edb45ee901785e87f35e Mon Sep 17 00:00:00 2001 From: amogkam Date: Fri, 20 Jan 2023 13:49:11 -0800 Subject: [PATCH 20/20] fix Signed-off-by: amogkam --- .../air_tests/air_benchmarks/workloads/gpu_batch_prediction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py b/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py index fe54b5cd1a95..9f36133c1a34 100644 --- a/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py +++ b/release/air_tests/air_benchmarks/workloads/gpu_batch_prediction.py @@ -61,7 +61,7 @@ def to_tensor(batch: np.ndarray) -> torch.Tensor: dataset, num_gpus_per_worker=int(not smoke_test), min_scoring_workers=1, - max_scoring_workers=int(ray.cluster_resources()["GPU"]), + max_scoring_workers=1 if smoke_test else int(ray.cluster_resources()["GPU"]), batch_size=512, ) total_time_s = round(time.time() - start, 2)