Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIR] Add batch_size arg for BatchMapper. #29193

Merged

Conversation

clarkzinzow
Copy link
Contributor

@clarkzinzow clarkzinzow commented Oct 7, 2022

The default batch_size of 4096 at the Datasets level doesn't suffice for all use cases: it can be too large for wide tables and large images, leading to DRAM/GRAM OOms; it can be too small for narrow tables, leading to unnecessary batch slicing overhead and suboptimal vectorized operations in their UDFs. We should allow users to configure the batch_size at the AIR level.

Closes #29168

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Member

@jiaodong jiaodong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me to get closer to map_batches, a few nits:

@@ -152,6 +152,28 @@ def add_and_modify_udf_numpy(data: Union[np.ndarray, Dict[str, np.ndarray]]):
assert_frame_equal(out_df, expected_numpy_df)


def test_batch_mapper_batch_size():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for better coverage can we parametrize this with other combination of data formats and numpy udf as well ?

if transform_type == "pandas":
return dataset.map_batches(self._transform_pandas, batch_format="pandas")
return dataset.map_batches(
self._transform_pandas, batch_format="pandas", **kwargs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we narrow this function down to each field, rather than an arbitrary kwargs dict ?
there're only batch_size , compute and batch_format in map_batches that likely require explicit value, where ** is used for ray_remote_args that ideally we don't want to be mixed with param values above.

def map_batches(
  |   | self,
  |   | fn: BatchUDF,
  |   | *,
  |   | batch_size: Optional[int] = 4096,
  |   | compute: Optional[Union[str, ComputeStrategy]] = None,
  |   | batch_format: Literal["default", "pandas", "pyarrow", "numpy"] = "default",
  |   | fn_args: Optional[Iterable[Any]] = None,
  |   | fn_kwargs: Optional[Dict[str, Any]] = None,
  |   | fn_constructor_args: Optional[Iterable[Any]] = None,
  |   | fn_constructor_kwargs: Optional[Dict[str, Any]] = None,
  |   | **ray_remote_args,
  |   | ) -> "Dataset[Any]":

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the **kwargs passthrough is to make this friendly to current and future .map_batches() arguments, where subclasses of Preprocessor can opt in to passing any of these args, including custom preprocessors. Otherwise, we'll need update the base Preprocessor whenever exposing a new parameter, and devs/advanced users will either be blocked from using that parameter until the next release or will have to override Preprocessor._transform(). Explicitly enumerating each potential field that may or may not exist on the preprocessor will also be more complex than this kwarg passthrough.

where ** is used for ray_remote_args that ideally we don't want to be mixed with param values above.

What if we (or a user) has a preprocessor that we (they) want to run on a GPU, or it's mostly I/O-bound so we (they) to request a fractional CPU, or we (they) want to specify custom retry logic? We do want to expose these.

Since this kwarg passthrough isn't exposed to any users except for advanced users implementing custom preprocessors, is this really an issue? I feel like the future-proofing + simplicity advantages outweigh the disadvantages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a stab at blowing these out into batch_size, compute, batch_format, and ray_remote_args getters that can be overridden, but I don't know if it will be a net benefit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another disadvantage of making each of these kwargs explicit is that supporting batch_size=None is more difficult, since we need to delineate between a preprocessor that doesn't set batch_size vs a preprocessor that wants to disable batching with batch_size=None. So we either need some other "not specified" indicator or start duplicating default values. which isn't great.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i mean it's less error prone for us to explicitly call with args
dataset.map_batches(self._transform_numpy, batch_format="numpy", batch_size={field}, **kwargs) since technically we can also hide the batch_format kwarg completely

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm what error do you see arising there, could you give an example?

since technically we can also hide the batch_format kwarg completely

batch_format is hardcoded since we have generic logic in Preprocessor to determine the appropriate batch format, while the rest (including batch_size) are determined entirely by the subclasses, so I think that difference in treatment is reasonable.

I can pop batch_size out of the kwargs and pass it explicitly, but if we're still passing the rest as a **kwargs passthrough, I'm not sure what that would give us.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think i might missed some context you had, or future plans regarding how we handle batch sizes as always being explicit with its value upon calling each map_batches is more appealing to me. But let's consolidate the discussion in the API discussion below :)

Copy link
Contributor

@amogkam amogkam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we hold off on merging this in until we get to an API decision consensus here: #29229?

Just want to make sure we are very intentional on our API changes so that we don't change them around too frequently.

cc @matthewdeng

@clarkzinzow
Copy link
Contributor Author

@amogkam Sounds good!

Signed-off-by: Amog Kamsetty <[email protected]>
…feat/batch-mapper-batch-size

Signed-off-by: Amog Kamsetty <[email protected]>
Signed-off-by: Amog Kamsetty <[email protected]>
Signed-off-by: Amog Kamsetty <[email protected]>
@clarkzinzow
Copy link
Contributor Author

clarkzinzow commented Oct 31, 2022

@amogkam Any particular reason you removed the added test coverage from test_dataset_pandas.py?

@clarkzinzow
Copy link
Contributor Author

@amogkam Also, why were these changes necessary? f46e683

@clarkzinzow
Copy link
Contributor Author

@amogkam Ah got it, I forgot that I pulled out the .to_pandas() fix into another PR! Thanks for bringing this PR inline with current master.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making this change!

Copy link
Contributor

@c21 c21 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with one question. Thanks @clarkzinzow.

@@ -68,6 +76,7 @@ def __init__(
],
],
batch_format: Optional[str] = None,
batch_size: int = DEFAULT_BATCH_SIZE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to declare batch_size: Optional[int], so user have flexbility to do batch_size=None to consume full block as one batch?

I guess it's not a big deal for now, but want to avoid future change for public APIs when we find more edge cases to support.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good point!

return batch

batch_mapper = BatchMapper(
fn=check_batch_size, batch_size=batch_size, batch_format="pandas"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have one for batch_format="numpy" as well ?

Copy link
Member

@jiaodong jiaodong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lg -- only comments about enhancing test coverage of batch_size in various cases.

Signed-off-by: Amog Kamsetty <[email protected]>
Signed-off-by: Amog Kamsetty <[email protected]>
Signed-off-by: Amog Kamsetty <[email protected]>
Signed-off-by: Amog Kamsetty <[email protected]>
Signed-off-by: Amog Kamsetty <[email protected]>
@amogkam amogkam merged commit 28a2959 into ray-project:master Nov 1, 2022
WeichenXu123 pushed a commit to WeichenXu123/ray that referenced this pull request Dec 19, 2022
The default batch_size of 4096 at the Datasets level doesn't suffice for all use cases: it can be too large for wide tables and large images, leading to DRAM/GRAM OOms; it can be too small for narrow tables, leading to unnecessary batch slicing overhead and suboptimal vectorized operations in their UDFs. We should allow users to configure the batch_size at the AIR level.

Closes ray-project#29168

Signed-off-by: Amog Kamsetty <[email protected]>
Signed-off-by: Amog Kamsetty <[email protected]>
Co-authored-by: Amog Kamsetty <[email protected]>
Co-authored-by: Amog Kamsetty <[email protected]>
Signed-off-by: Weichen Xu <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[AIR] Dynamic block splitting does not work for BatchMapper
7 participants