-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Async batch fetching for map_batches
#31576
Conversation
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
python/ray/data/_internal/compute.py
Outdated
block_bundles = [((b,), (m,)) for b, m in blocks_in] | ||
block_bundles: List[ | ||
Tuple[Tuple[ObjectRef[Block]], Tuple[BlockMetadata]] | ||
] = [((b,), (m,)) for b, m in blocks_in] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a necessary change needed to get the performance improvements for batch prediction.
Before, we would only bundle blocks up to batch size and submit each bundle as a separate actor task. This means we cannot do prefetching when batch size is greater than block size since each bundle is a separate task.
Instead, if the max actor pool size is set, then we bundle up to min(batch size, max actor pool size).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully once we switch to a fully iterator-based implementation, these type of special cases are no longer necessary.
Signed-off-by: amogkam <[email protected]>
python/ray/data/_internal/compute.py
Outdated
# always be less than this max_size. | ||
# Otherwise, it leads to inefficiencies with creating extra actor tasks and | ||
# prevents the actor task from doing optimizations such as batch or block prefetching. | ||
if self.max_size and len(block_bundles) > self.max_size: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code will become deprecated with new executor backend, cc @clarkzinzow.
@@ -121,6 +123,96 @@ def test_format_batches(batch_format): | |||
assert isinstance(batch["foo"], np.ndarray) | |||
|
|||
|
|||
def test_async_batch_fetching(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we add a test for map_batches
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried it but there's too much time variance for a deterministic small-scale map_batches CI test. I'll confirm the performance improvements via running the batch inference release tests.
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
LGTM except one comment - #31576 (comment) . cc @clarkzinzow. |
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, one big thing that we need to resolve is that the actor pool rebundling will break block ordering, which I don't think we'll want to do.
python/ray/data/_internal/compute.py
Outdated
if self.max_size and len(block_bundles) > self.max_size: | ||
|
||
def chunkify(bundles: List, num_chunks: int): | ||
return [bundles[i::num_chunks] for i in range(num_chunks)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure that I understand the motivation, this is giving us stratified chunking, where a given chunk consists of an equal number of blocks from each of the original bundles (modulo the number of chunks), right? Might be worth leaving a comment as much for those that are less familiar with this pattern.
Two potential issues with this chunking scheme:
- This breaks block and therefore row ordering; the previous block bundling and actor compute strategy made sure to preserve it. This doesn't matter for batch prediction workloads but may matter for other workloads that use the actor compute strategy.
- There are pathological cases of skewed blocks/bundles that could pop up. E.g. suppose we had
bundles = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
(pretend the numbers are block IDs) andnum_chunks = 2
, and suppose that blocks with odd IDs are much larger than blocks with even IDs; this chunking would produce bundles[[1, 3, 5, 7, 9], [2, 4, 6, 8]]
, where the first bundle is way, way larger than the second bundle.
Could solve (1) by changing the rechunking to merge adjacent chunks without breaking ordering, but (2) would require rebundling while taking the block sizes into account. I think that (1) is probably a blocker but (2) is not, cc @matthewdeng @c21 for more opinions.
If we are only wanting to solve (1) for now, we could do the simple thing of merging adjacent bundles until we either (1) are at the specified number of chunks (max pool size), or (2) all would-be merged bundles exceed the max target block size threshold (currently 512 MiB by default).
Could do something like the following progressive merging of adjacent bundles, which should preserve block/row order:
def rebundle_to_size(bundles: list, num_bundles: int):
if len(bundles) <= num_bundles:
# Already done.
return bundles
max_bundle_size = DatasetContext.get_current().target_max_block_size
# Carry out multiple rounds of merging adjacent blocks, until we have scaled down
# to num_bundles bundles, or we've stopped making merging progress.
while len(bundles) > num_bundles:
new_bundles = []
num_merges = 0
for i in range(len(bundles) // 2):
left, right = bundles[2 * i], bundles[2 * i + 1]
left_size = sum(meta.size_bytes for _, meta in left)
right_size = sum(meta.size_bytes for _, meta in right)
if left_size + right_size <= max_bundle_size:
# Merging these bundles would be under the max bundle size, so we merge them.
new_bundles.append(left + right)
num_merges += 1
if len(bundles) - num_merges == num_bundles:
# This merging round has already brought us to the requisite number of bundles,
# so we short-circuit.
break
else:
new_bundles.extend([left, right])
if num_merges == 0:
break
# Add leftover bundles due to odd number of bundles or short-circuiting to new bundles.
for j in range(2*i + 1, len(bundles)):
new_bundles.append(bundles[i])
bundles = new_bundles
return bundles
python/ray/data/_internal/compute.py
Outdated
# always be less than this max_size. | ||
# Otherwise, it leads to inefficiencies with creating extra actor tasks and | ||
# prevents the actor task from doing optimizations | ||
# such as batch or block prefetching. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A more orthogonal, future looking thing: a target bundle size that might be better than the user-provided batch_size
is probably something like the following:
target_size = max(
min((prefetch_batches + 1) * batch_size_in_bytes, ctx.target_min_block_size),
ctx.target_max_block_size,
)
I.e. where we bundle up to at least the ctx.target_min_block_size
(default is 1 MiB) since that's what we consider to be the smallest "reasonable" block to make the task overhead worth it; if the number of desired concurrent batches is larger than this (e.g. batch size is larger than this and/or aggressive prefetching is specified), then we use that as a bundling target. And all of this is capped by ctx.target_max_block_size
.
We'd probably still have the max actor pool size serve as a cap on the number of block bundles as well, but I'd imagine that the initial actor pool size (i.e. actors started at the beginning of execution) and the scale-up rate would be influenced by the number of block bundles.
We should experiment with a few of these hints/policies in the new execution model, and try to ensure good performance with the default configuration. cc @ericl @c21
Signed-off-by: amogkam <[email protected]>
@amogkam - can you rebase to latest master? It should fix the CI failures. |
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
@@ -480,6 +481,9 @@ def map_batches( | |||
``pandas.DataFrame``, "pyarrow" to select ``pyarrow.Table``, or | |||
``"numpy"`` to select ``numpy.ndarray`` for tensor datasets and | |||
``Dict[str, numpy.ndarray]`` for tabular datasets. Default is "default". | |||
prefetch_batches: The number of batches to fetch ahead of the current batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When porting this to the new executor, we should try to consolidate prefetch_batches
and prefetch_blocks
into a single prefetch_batches
argument, where we always prefetch enough blocks to satisfy prefetch_batches
, which should be simple enough to implement since we have the size for each to-be-fetched block on hand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep +1!
python/ray/data/_internal/compute.py
Outdated
block_bundles = _bundle_blocks_up_to_size( | ||
blocks_in, target_block_size, name | ||
) | ||
total_size = sum(metadata.num_rows for _, metadata in blocks_in) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
metadata.num_rows
could technically be None
, but shouldn't happen in practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to handle None the same way as _bundle_blocks_up_to_size
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
…fetching Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
Signed-off-by: amogkam <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Nice work! 👏
Signed-off-by: amogkam <[email protected]>
Signed-off-by: Amog Kamsetty [email protected] Implements batch fetching in a separate thread for GPU UDFs in map_batches. This allows CPU based batch fetching to be overlapped with the UDF computation. prefetch_batches is added as an argument to map_batches. By default, this is set to 0. We do not add it to DatasetContext as this functionality needs to be configured for each map_batchesindependently and not globally for the entire dataset. This is because the Dataset workflow might contain some transformations that are on GPU and others that are on CPU. We see GPU prediction throughput increase from ~260 images/sec to ~300 images/sec. Signed-off-by: Andrea Pisoni <[email protected]>
Signed-off-by: Amog Kamsetty [email protected]
Implements batch fetching in a separate thread for GPU UDFs in
map_batches
. This allows CPU based batch fetching to be overlapped with the UDF computation.prefetch_batches
is added as an argument tomap_batches
. By default, this is set to 0.We do not add it to DatasetContext as this functionality needs to be configured for each map_batchesindependently and not globally for the entire dataset. This is because the Dataset workflow might contain some transformations that are on GPU and others that are on CPU.
We see GPU prediction throughput increase from ~260 images/sec to ~300 images/sec:
No prefetching:
With prefetching:
Why are these changes needed?
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.