-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] [Arrow 7+ Support - 2/N] Add support for Arrow 7 by fixing Arrow serialization bug. #29993
[Datasets] [Arrow 7+ Support - 2/N] Add support for Arrow 7 by fixing Arrow serialization bug. #29993
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make PR title more specific about this fix? Or this can be done in two steps: 1) get the serialization bug fixed; 2) enable arrow 7?
This Arrow serialization bug was the core blocker for supporting Arrow 7 upgrade, and in addition to the unit testing for our serialization workaround, I think that the best validation that the serialization bug is fixed is the Arrow 7 CI job running the Datasets/AIR test suite. Since the success criteria for the serialization bug fixes is strongly tied to whether Arrow 7 works, I think that keeping them in the same PR makes sense? |
41f77b6
to
66ad41c
Compare
8337b79
to
abc0bb0
Compare
# slices. | ||
sliced = sliced[0:1] | ||
return sliced | ||
return super().__getitem__(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Arrow extension array contract is generally to return another extension array for a slice, and the element type (in our case an ndarray) when getting a single row.
@@ -434,7 +434,9 @@ def _sample_piece( | |||
|
|||
# Only sample the first row group. | |||
piece = piece.subset(row_group_ids=[0]) | |||
batch_size = min(piece.metadata.num_rows, PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWS) | |||
batch_size = max( | |||
min(piece.metadata.num_rows, PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWS), 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arrow doesn't allow a batch size of 0; this is needed to handle empty Parquet files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clarkzinzow - thanks for the fix! I am thinking for reading Parquet, we should filter out all empty files before read. WDYT? I can do it as a followup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep filtering out empty files when sampling is a good idea! It should be easy enough to filter out empty files before constructing the sample space: https://github.com/ray-project/ray/blob/abc0bb0a625e01aee9f2ccd777248892db2a5f6c/python/ray/data/datasource/parquet_datasource.py#L303-L308
Agreed on doing it as a follow-up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with some minor questions/comments.
@@ -59,6 +59,7 @@ def build(self) -> Block: | |||
if self._builder is None: | |||
if self._empty_block is not None: | |||
self._builder = BlockAccessor.for_block(self._empty_block).builder() | |||
self._builder.add_block(self._empty_block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wondering why this is required now? did we uncover some bugs from test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I believe that this was also for the empty Parquet file case, and should be covered by the added test!
@@ -434,7 +434,9 @@ def _sample_piece( | |||
|
|||
# Only sample the first row group. | |||
piece = piece.subset(row_group_ids=[0]) | |||
batch_size = min(piece.metadata.num_rows, PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWS) | |||
batch_size = max( | |||
min(piece.metadata.num_rows, PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWS), 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clarkzinzow - thanks for the fix! I am thinking for reading Parquet, we should filter out all empty files before read. WDYT? I can do it as a followup.
|
||
schema = self._row.schema | ||
if isinstance( | ||
schema.field(schema.get_field_index(key)).type, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can it just be schema.field(key).type
- https://arrow.apache.org/docs/python/generated/pyarrow.Schema.html#pyarrow.Schema.field ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I had thought that this support was added in a post-Arrow 6 version, but looks like it's supported. I'll make this change!
""" | ||
import pyarrow as pa | ||
|
||
if os.environ.get(RAY_DISABLE_CUSTOM_ARROW_DATA_SERIALIZATION, "0") == "1": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad that we add an environment variable here!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look good, thanks for making it simpler with Arrow IPC
abc0bb0
to
bfdbd7e
Compare
@ericl @scv119 @jjyao @richardliaw Could I get a rubber stamp for the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry @scv119 I thought that you had the magic powers, but looks like it needs to be one of @ericl @richardliaw or @edoakes! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dep spec LGTM
…nd nightly. (#29999) This PR adds support for Arrow 8, 9, 10, and nightly in Ray, and is the third PR in a set of stacked PRs making up this mono-PR for Arrow 7+ support (#29161), and is stacked on top of a PR fixing task cancellation in Ray Core (#29984) and a PR adding support for Arrow 7 (#29993). The last two commits are the relevant commits for review. Summary of Changes This PR: - For Arrow 9+, add allow_bucket_creation=true to S3 URIs for the Ray Core Storage API and for the Datasets S3 write API ([Datasets] In Arrow 9+, creating S3 buckets requires explicit opt-in. #29815). - For Arrow 9+, create an ExtensionScalar subclass for tensor extension types that returns an ndarray view from .as_py() ([Datasets] For Arrow 8+, tensor column element access returns an ExtensionScalar. #29816). - For Arrow 8.*, we manually convert the ExtensionScalar to an ndarray for tensor extension types, since the ExtensionScalar type exists but isn't subclassable in Arrow 8 ([Datasets] For Arrow 8+, tensor column element access returns an ExtensionScalar. #29816). - For Arrow 10+, we match on other potential error messages when encountering permission issues when interacting with S3 ([Datasets] In Arrow 10+, S3 errors raised due to permission issues can vary beyond our current pattern matching #29994). - adds CI jobs for Arrow 8, 9, 10, and nightly - removes the pyarrow version upper bound
… Arrow serialization bug. (ray-project#29993) This PR adds support for Arrow 7 in Ray, and is the second PR in a set of stacked PRs making up this mono-PR for Arrow 7+ support: ray-project#29161, and is stacked on top of a PR fixing task cancellation in Ray Core: ray-project#29984. This PR: - fixes a serialization bug in Arrow with a custom serializer for Arrow data ([Datasets] Arrow data buffers aren't truncated when pickling zero-copy slice views, leading to huge serialization bloat ray-project#29814) - removes a bunch of defensive copying of Arrow data, which was a workaround for the aforementioned Arrow serialization bug - adds a CI job for Arrow 7 - bumps the pyarrow upper bound to 8.0.0 Signed-off-by: Weichen Xu <[email protected]>
…nd nightly. (ray-project#29999) This PR adds support for Arrow 8, 9, 10, and nightly in Ray, and is the third PR in a set of stacked PRs making up this mono-PR for Arrow 7+ support (ray-project#29161), and is stacked on top of a PR fixing task cancellation in Ray Core (ray-project#29984) and a PR adding support for Arrow 7 (ray-project#29993). The last two commits are the relevant commits for review. Summary of Changes This PR: - For Arrow 9+, add allow_bucket_creation=true to S3 URIs for the Ray Core Storage API and for the Datasets S3 write API ([Datasets] In Arrow 9+, creating S3 buckets requires explicit opt-in. ray-project#29815). - For Arrow 9+, create an ExtensionScalar subclass for tensor extension types that returns an ndarray view from .as_py() ([Datasets] For Arrow 8+, tensor column element access returns an ExtensionScalar. ray-project#29816). - For Arrow 8.*, we manually convert the ExtensionScalar to an ndarray for tensor extension types, since the ExtensionScalar type exists but isn't subclassable in Arrow 8 ([Datasets] For Arrow 8+, tensor column element access returns an ExtensionScalar. ray-project#29816). - For Arrow 10+, we match on other potential error messages when encountering permission issues when interacting with S3 ([Datasets] In Arrow 10+, S3 errors raised due to permission issues can vary beyond our current pattern matching ray-project#29994). - adds CI jobs for Arrow 8, 9, 10, and nightly - removes the pyarrow version upper bound Signed-off-by: Weichen Xu <[email protected]>
Toward fixing #38300 PR #29993 added a local ray fix for issue apache/arrow#26685, but at the time windows failed the tests with pyarrow7. In issue #38300 the suggested fix was to release the pin. Signed-off-by: mattip <[email protected]> Co-authored-by: Edward Oakes <[email protected]>
This PR adds support for Arrow 7 in Ray, and is the second PR in a set of stacked PRs making up this mono-PR for Arrow 7+ support: #29161, and is stacked on top of a PR fixing task cancellation in Ray Core: #29984.
Summary of Changes
This PR:
Related issue number
Closes #29992, closes #29814
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.