-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] Change map_batches
to fetch input blocks on-demand
#29289
Conversation
It's kind of hard to add a unit test for this. @stephanie-wang and I tested together in the WIP release test for AIR benchmark. After nightly test gets merged, this will be covered by nightly test. |
I think we can add a unit test like this:
This would be ideal, so we don't need to rely on nightly to catch such issues (nightly is too heavyweight). Seems like a good time to start adding some block splitting-specific tests anyway! |
@stephanie-wang - makes sense, let me look more to create a unit test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is certainly a good fix for the read + map_batches fusion case! However, I think doing something like the following, with an outer iteration over the input blocks, would be a cleaner implementation:
def transform(
blocks: Iterable[Block],
batch_fn: BatchUDF,
*fn_args,
**fn_kwargs,
) -> Iterable[Block]:
DatasetContext._set_current(context)
output_buffer = BlockOutputBuffer(None, context.target_max_block_size)
# Ensure that zero-copy batch views are copied so mutating UDFs don't error.
batcher = Batcher(batch_size, ensure_copy=batch_size is not None)
def process_next_batch() -> Iterator[Block]:
batch = batcher.next_batch()
# Convert to batch format.
batch = BlockAccessor.for_block(batch).to_batch_format(batch_format)
# Apply UDF.
batch = batch_fn(batch, *fn_args, **fn_kwargs)
if not (
isinstance(batch, list)
or isinstance(batch, pa.Table)
or isinstance(batch, np.ndarray)
or (
isinstance(batch, dict)
and all(isinstance(col, np.ndarray) for col in batch.values())
)
or isinstance(batch, pd.core.frame.DataFrame)
):
raise ValueError(
"The map batches UDF returned the value "
f"{batch} of type {type(batch)}, "
"which is not allowed. "
f"The return type must be one of: {BatchType}"
)
# Add output batch to output buffer.
output_buffer.add_batch(batch)
if output_buffer.has_next():
yield output_buffer.next()
# Process batches for each block.
for block in blocks:
batcher.add(block)
while batcher.has_batch():
yield from process_next_batch()
# Process any partial/remainder batches.
batcher.done_adding()
if batcher.has_any():
yield from process_next_batch()
# Yield partial/remainder blocks from finalized output buffer.
output_buffer.finalize()
if output_buffer.has_next():
yield output_buffer.next()
This PR is ready for review, thanks @clarkzinzow, @stephanie-wang and @jianoaix. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice improvement!
python/ray/data/dataset.py
Outdated
batcher.done_adding() | ||
while batcher.has_any(): | ||
|
||
def process_next_batch() -> Iterator[Block]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For readability, it'd be clear to pass in batch as arg, so it read like process_next_batch(batch) when calling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jianoaix - no strong opinion given it's an internal short function. But updated.
Signed-off-by: Cheng Su <[email protected]>
Signed-off-by: Cheng Su <[email protected]>
Signed-off-by: Cheng Su <[email protected]>
Signed-off-by: Cheng Su <[email protected]>
Signed-off-by: Cheng Su <[email protected]>
|
||
# Data source generates multiple 1G random bytes data | ||
class LargeBytesDatasource(Datasource): | ||
def prepare_read(self, parallelism): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is deprecated, prefer using create_reader().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jianoaix - yeah, forgot about it, updated.
Signed-off-by: Cheng Su <[email protected]>
The PR is ready for review again. Let me know if there's more comment, o.w. it's ready for merge. Thanks! @stephanie-wang, @jianoaix and @clarkzinzow. |
parallelism=1, | ||
) | ||
|
||
ds = ds.map_batches(foo, batch_size=None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually do you need to set a memory limit for the cluster? How does this make sure it will OOM without block splitting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After talking to Core folks, no easy way to set memory limit for the cluster in CI unit test now. As you can see here, we already process 20G data in one task. Planning to enable dynamic block splitting by default in 2.2. So I am not sure if it's our best interest to chase down having OOM without block splitting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
…project#29289) Signed-off-by: Cheng Su [email protected] This is the fix the issue we found during AIR benchmark. When the map_batches have multiple input blocks (it can happen when dynamic block splitting is enabled by default, or multiple input blocks are coalesced together), previously we always fetch and buffer all input blocks before producing first batch. This is bad especially for dynamic block splitting, because it essentially buffers all split blocks again in memory. So in this PR, change map_batches to fetch and buffer input blocks on-demand, i.e. only fetch blocks when needed to construct the next required batch. Signed-off-by: Weichen Xu <[email protected]>
Signed-off-by: Cheng Su [email protected]
Why are these changes needed?
This is the fix the issue we found during AIR benchmark. When the map_batches have multiple input blocks (it can happen when dynamic block splitting is enabled by default, or multiple input blocks are coalesced together), previously we always fetch and buffer all input blocks before producing first batch. This is bad especially for dynamic block splitting, because it essentially buffers all split blocks again in memory. So in this PR, change
map_batches
to fetch and buffer input blocks on-demand, i.e. only fetch blocks when needed to construct the next required batch.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.