-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[AIR][Predictor] Enable numpy based predictor #28917
Conversation
…ow batch type from all predictors
…s fix pytorch_tabular_starter
This PR is good for initial review, with pending fixes on one example notebook to be fixed. === edit === |
some release test are flaky due to marginal e2e latency assertions, but this PR doesn't touch them (predictor only) |
Failed release test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jiaodong, overall lgtm! Left some minor comments
python/ray/data/preprocessor.py
Outdated
@@ -183,7 +184,7 @@ def _fit(self, dataset: Dataset) -> "Preprocessor": | |||
"""Sub-classes should override this instead of fit().""" | |||
raise NotImplementedError() | |||
|
|||
def _determine_transform_to_use(self, data_format: str) -> str: | |||
def determine_transform_to_use(self, data_format: BlockFormat) -> BatchFormat: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we keep this as private still? it should not be a public facing api to users
elif output_df.dtypes[col] == np.dtype(object) and all( | ||
isinstance(v, np.ndarray) for v in output_df[col] | ||
): | ||
output_df.loc[:, col] = [v.tolist() for v in output_df[col]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if numpy arrays are not json serializable, is this also a problem if dict of ndarrays are returned?
this function is currently only called for pandas output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serve already knows how to handle it https://sourcegraph.com/github.com/ray-project/ray/-/blob/python/ray/serve/air_integrations.py?L129
This case is a very small edge case that we return a DataFrame that happens to have ndarray in it due to fallback casting. With your ongoing PR we should be able to remove this function and casting completely.
- Predictor implementation (pandas vs numpy) | ||
""" | ||
# Got to inline this rather than using @pytest.mark.parametrize to void | ||
# unknown object owner error when running test with python cli. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following up on this- we use parametrize in test_batch_mapper
.
does that not work here?
else BatchFormat.PANDAS | ||
) | ||
# No preprocessor, just use the predictor format. | ||
return self._predictor_cls._batch_format_to_use() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function should never be called in the first place if preprocessor
is None. Don't think we need this if clause
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amogkam Hmm seems like this is still called if preprocessor is None
?
ray/python/ray/train/batch_predictor.py
Line 188 in dbc3bd8
self._determine_preprocessor_batch_format(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, but it doesn't need to be. But this is a minor point, so looks good to merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, lgtm! Can we address the remaining comments before merging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, only nits and suggestions for follow-ups, so I think we can merge!
""" | ||
# We need schema to properly validate, so synchronously | ||
# fetch it if necessary. | ||
schema = self.schema(fetch_if_missing=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the pipeline peeking implemented above, this triggering execution should be fine, i.e. we shouldn't hit the double-execution issue. 👍
""" | ||
from ray.data.extensions import TensorDtype | ||
|
||
for col in output_df.columns: | ||
# TensorArray requires special handling to numpy array. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming that we're leaving this relatively alone in this PR? Just double-checking, what was the decision?
@@ -222,13 +281,19 @@ def __call__(self, batch): | |||
# Set the in-predictor preprocessing to a no-op when using a separate | |||
# GPU stage. Otherwise, the preprocessing will be applied twice. | |||
override_prep = BatchMapper(lambda x: x) | |||
# preprocessor.transform will break for DatasetPipeline due to | |||
# missing _dataset_format() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer true with your addition, we should try unifying these paths in a follow-up PR.
else BatchFormat.PANDAS | ||
) | ||
# No preprocessor, just use the predictor format. | ||
return self._predictor_cls._batch_format_to_use() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amogkam Hmm seems like this is still called if preprocessor is None
?
ray/python/ray/train/batch_predictor.py
Line 188 in dbc3bd8
self._determine_preprocessor_batch_format(data) |
raise NotImplementedError( | ||
"None of `_predict_pandas` or `_predict_numpy` are " | ||
f"implemented for input data batch format `{batch_format}`." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see that the check is here, nice. This happens upstream of any Predictor._batch_format_to_use()
calls, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one is a bit more downstream tho upon seeing data on predict
, so i've added your suggestion above to surface the issue earlier
- Predictor implementation (pandas vs numpy) | ||
""" | ||
# Got to inline this rather than using @pytest.mark.parametrize to void | ||
# unknown object owner error when running test with python cli. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fixed this issue for the test_batch_mapper
tests by ensuring that the fixtures use the ray_start_regular_shared
fixture for its Datasets execution, otherwise you could have the fixtures creating the Datasets on separate Ray clusters than the eventual tests run. If you have this test use the ray_start_regular_shared
fixture, and turn these test cases into fixtures depending on the ray_start_regular_shared
fixture, it should work:
ray/python/ray/data/tests/conftest.py
Line 292 in c749ad3
def ds_pandas_single_column_format(ray_start_regular_shared): |
Can happen in a follow-up PR if you'd like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc changes
Co-authored-by: Clark Zinzow <[email protected]> Signed-off-by: Jiao <[email protected]>
rllib get started is flaky that also fails on master. |
Co-authored-by: Clark Zinzow <[email protected]> Co-authored-by: Amog Kamsetty <[email protected]> Signed-off-by: Weichen Xu <[email protected]>
Why are these changes needed?
Add a numpy first path for DL predictors such as tensorflow and pytorch.
Notable changes:
BatchFormat
and use it across our codebase instead of raw string valpredictor.py
now choose implementation to call based on input batch data type, same as preprocessorpredictor.py
andbatch_predictor.py
return same data type as input batch / bock formattest_predictor.py
that removed mocks and test against all numpy + pandas combination of {data batch, preprocessor, predictor}Batch prediction results
TL;DR -- Faster, Better memory footprint, no GRAM leak or OOM.
Setup:
Image 1: Pandas narrow waist prediction, +0.6GB accumulated GPU memory usage each batch
Image 2: Pandas narrow waist prediction, extra 3.03GB GPU memory required to dump final output from batchnorm that lead to OOM
Image 3: Numpy narrow waist prediction, const memory usage that finishes.
Related issue number
Related #28346
Closes #28525, #28627, #29003
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.