Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[data] Add pickle support for PyArrow CSV WriteOptions #19378

Merged
merged 2 commits into from
Oct 21, 2021

Conversation

pdames
Copy link
Member

@pdames pdames commented Oct 14, 2021

Why are these changes needed?

Users cannot specify custom Dataset write options when writing output to CSV via dataset.write_csv(), because the pyarrow.csv.WriteOptions class retrieved from writer_args["write_options"] in CSVDatasource._write_block() cannot be pickled by Ray.

This PR presents a solution to this problem via a _WriteOptionsWrapper class that wraps WriteOptions while implementing the required __getstate__ and __setstate__ methods.

Related issue number

Closes #19366

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@ericl
Copy link
Contributor

ericl commented Oct 14, 2021

Hmm what if we also accepted a lambda that returned write args in addition to the write args dict? That might be a lighter weight interface than adding a new wrapper class.

@ericl ericl added the @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. label Oct 14, 2021
@pdames
Copy link
Member Author

pdames commented Oct 19, 2021

Hmm what if we also accepted a lambda that returned write args in addition to the write args dict? That might be a lighter weight interface than adding a new wrapper class.

I've added a new commit that updates the relevant write-side APIs to accept a write kwargs supplier lambda (i.e. a lambda that takes no input arguments and returns Dict[str, Any]). The current implementation merges the two dictionaries via a simple dictionary kwargs.update(lambda_supplied_kwargs) which gives precedence to the kwargs supplier lambda over conflicting **kwargs, and leverages the _resolve_kwargs() helper function for consistency.

If we're happy with the current changes, then I could also update the read-side APIs to accept a read kwargs supplier lambda that works in a similar way, which could help us both with code consistency and to head off other similar read kwarg pickling problems that may pop up on the read side either now or in the future.

@ericl ericl removed the @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. label Oct 19, 2021
Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good. Could we also rename the functions to be more consistent with the kwargs (e.g., arrow_parquet_args => arrow_parquet_args_fn, writer_args => writer_args_fn etc.)? I made a suggestion.

filesystem: Optional["pyarrow.fs.FileSystem"] = None,
try_create_dir: bool = True,
arrow_open_stream_args: Optional[Dict[str, Any]] = None,
write_args_provider: Optional[Callable[[], Dict[str, Any]]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
write_args_provider: Optional[Callable[[], Dict[str, Any]]] = None,
arrow_parquet_args_fn: Callable[[], Dict[str, Any]] = = lambda: {},

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to use consistent names with kwargs and default to an empty dict supplier everywhere in latest commit.

@ericl
Copy link
Contributor

ericl commented Oct 19, 2021

If we're happy with the current changes, then I could also update the read-side APIs to accept a read kwargs supplier lambda that works in a similar way, which could help us both with code consistency and to head off other similar read kwarg pickling problems that may pop up on the read side either now or in the future.

Also sounds good.

@ericl ericl added the @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. label Oct 19, 2021
@ericl
Copy link
Contributor

ericl commented Oct 20, 2021

Some tests failures:

(pid=3787) 2021-10-20 05:08:21,870 INFO worker.py:425 -- Task failed with retryable exception: TaskID(a42cc390bccfe367ffffffffffffffffffffffff01000000).
  | (pid=3787) Traceback (most recent call last):
  | (pid=3787) File "python/ray/_raylet.pyx", line 565, in ray._raylet.execute_task
  | (pid=3787) File "python/ray/_raylet.pyx", line 569, in ray._raylet.execute_task
  | (pid=3787) File "/ray/python/ray/data/datasource/file_based_datasource.py", line 150, in write_block
  | (pid=3787) write_args_fn, **write_args)
  | (pid=3787) TypeError: _write_block() got multiple values for argument 'column'
  | (pid=3787) 2021-10-20 05:08:21,872 INFO worker.py:425 -- Task failed with retryable exception: TaskID(6f77e31f0d8e20b7ffffffffffffffffffffffff01000000).
  | (pid=3787) Traceback (most recent call last):
  | (pid=3787) File "python/ray/_raylet.pyx", line 565, in ray._raylet.execute_task
  | (pid=3787) File "python/ray/_raylet.pyx", line 569, in ray._raylet.execute_task
  | (pid=3787) File "/ray/python/ray/data/datasource/file_based_datasource.py", line 150, in write_block
  | (pid=3787) write_args_fn, **write_args)
  | (pid=3787) TypeError: _write_block() got multiple values for argument 'column'
  | (pid=3787) 2021-10-20 05:08:39,879 INFO worker.py:425 -- Task failed with retryable exception: TaskID(11361861e4b5a1a0ffffffffffffffffffffffff01000000).
  | (pid=3787) Traceback (most recent call last):
  | (pid=3787) File "python/ray/_raylet.pyx", line 565, in ray._raylet.execute_task
  | (pid=3787) File "python/ray/_raylet.pyx", line 569, in ray._raylet.execute_task
  | (pid=3787) File "/ray/python/ray/data/datasource/file_based_datasource.py", line 150, in write_block
  | (pid=3787) write_args_fn, **write_args)
  | (pid=3787) TypeError: _write_block() got multiple values for argument 'column'
  | (pid=3787) 2021-10-20 05:08:39,886 INFO worker.py:425 -- Task failed with retryable exception: TaskID(3fc9bc8b50b396b1ffffffffffffffffffffffff01000000).
  | (pid=3787) Traceback (most recent call last):
  | (pid=3787) File "python/ray/_raylet.pyx", line 565, in ray._raylet.execute_task
  | (pid=3787) File "python/ray/_raylet.pyx", line 569, in ray._raylet.execute_task
  | (pid=3787) File "/ray/python/ray/data/datasource/file_based_datasource.py", line 150, in write_block
  | (pid=3787) write_args_fn, **write_args)
  | (pid=3787) TypeError: _write_block() got multiple values for argument 'column'
  | (pid=5462) 2021-10-20 05:09:39,883 INFO worker.py:425 -- Task failed with retryable exception: TaskID(5caaf4fa024cdd1dffffffffffffffffffffffff01000000).FAILED [ 84%]
  | ::test_json_write[local_fs-local_path-None] FAILED [ 84%]
  | ::test_json_write[s3_fs-s3_path-s3_server] FAILED [ 85%]
  | ::test_json_roundtrip[None-local_path] -- Test timed out at 2021-10-20 05:10:32 UTC --
  | ================================================================================
 


@ericl
Copy link
Contributor

ericl commented Oct 20, 2021

still timing out on that json datasource test

Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, thanks for adding this!

I personally find the "args provider" route less appealing since:

  1. It adds an extra API arg that duplicates behavior of an existing API arg in a slightly confusing way for the user, who will ask: why are there two ways to give args, and when do I have to use the lambda?
  2. The user having to package up their unserializable args into a provider lambda is worse UX than being able to pass it directly with us taking care of making it serializable under-the-hood.
  3. Once Arrow fixes the serializability issues upstream, we'd need to break the API in order to remove this now unnecessary arg provider, compared to the wrapper class route which is an internal change.
  4. The code impact is much larger overall compared to the wrapper, which only needed to wrap/unwrap at serialization boundaries.

But I don't think this is worth blocking this from getting merged.

@@ -1019,6 +1021,9 @@ def write_parquet(self,
if True. Does nothing if all directories already exist.
arrow_open_stream_args: kwargs passed to
pyarrow.fs.FileSystem.open_output_stream
arrow_parquet_args_fn: Callable that returns a dictionary of write
arguments to use when writing each block to a file. Overrides
any duplicate keys from arrow_parquet_args.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Maybe mention why this exists in addition to arrow_parquet_args? Ditto for the other functions as well.

Copy link
Member Author

@pdames pdames Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your arguments against the args provider lambda approach, since I had many of the same thoughts while I was mulling over these changes. I wound up warming to the lambda approach the more I thought about it, for the following reasons:

  1. It adds an extra API arg that duplicates behavior of an existing API arg in a slightly confusing way for the user, who will ask: why are there two ways to give args, and when do I have to use the lambda?

Agreed. In the presence of both options, docs can help clarify when to use each, but I think the worry-free approach here is to simply recommend always using the lambda instead of kwargs. To make the APIs more clear, I think we either (1) remove existing dataset write API **kwargs or (2) change their name and docs to say they are reserved for future (or internal/developer) use. The main con of the lambda is that it is less idiomatic than **kwargs but, given that it covers us against any args that can't be pickled now or in the future without having to exhaustively test and/or wrap all of them, I think this is worth pursuing for long-term clarity and maintainability. I would also err slightly on the side of option (2) for backwards compatibility,

  1. The user having to package up their unserializable args into a provider lambda is worse UX than being able to pass it directly with us taking care of making it serializable under-the-hood.

Agreed. I'd like to keep these details hidden from the user and the wrapper is an effective way of doing so, but I also worry about the long-term maintainability of wrappers or custom serializers. If there are any corner cases that we miss now or in the future then waiting for a new wrapper to be developed is also a bad user experience. The odds of missing cases will also continue to grow as new datasources are developed, and as the file IO APIs we depend on continue to evolve. If we make the above changes to stop documenting (or allowing) the use of write option kwargs, then there's no longer any question about which args go where, since the lambda is the only option.

  1. Once Arrow fixes the serializability issues upstream, we'd need to break the API in order to remove this now unnecessary arg provider, compared to the wrapper class route which is an internal change.

Even if we ignore the pickling problem, I've been thinking that a lambda's ability to lazily resolve distinct write options for each block is quite powerful. For example, I've been thinking that we should actually change the lambda signature from Callable[[], Dict[str, Any]] to Callable[[BlockAccessor], Dict[str, Any]]. What do you and @ericl think? This would allow advanced users to intelligently determine per-block write arguments based on the properties of the block being written (e.g. adjusting parquet data_page_size or `compression_level based on the size of the block being written, dictionary encoding based on column cardinalities, etc.), instead of being locked into one static set of write arguments for the entire dataset.

  1. The code impact is much larger overall compared to the wrapper, which only needed to wrap/unwrap at serialization boundaries.

Agreed. The immediate code impact of this PR spread across multiple API layers and more than doubled in LOC changed. However, over the long-term, I wouldn't be surprised to see the code and developer time impact of adding/removing wrappers being greater. Granted, this is purely speculation on my part, but it is a risk.

Copy link
Contributor

@clarkzinzow clarkzinzow Oct 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pdames All great counter-points, thank you for taking the time to write those up! The fact that the provider (1) future-proofs against future unserializable args, (2) gives a path to per-block write args and per-file read args, makes the UX hit more reasonable.

To make the APIs more clear, I think we either (1) remove existing dataset write API **kwargs or (2) change their name and docs to say they are reserved for future (or internal/developer) use. The main con of the lambda is that it is less idiomatic than **kwargs but, given that it covers us against any args that can't be pickled now or in the future without having to exhaustively test and/or wrap all of them, I think this is worth pursuing for long-term clarity and maintainability. I would also err slightly on the side of option (2) for backwards compatibility,

I think that a happy-path here, from a UX perspective, would be a third option: remove the catch-all **kwargs and have a single arrow_parquet_args arg that can either be a plain options dictionary or a provider callable:

    def write_parquet(
            self,
            path: str,
            *,
            filesystem: Optional["pyarrow.fs.FileSystem"] = None,
            try_create_dir: bool = True,
            arrow_open_stream_args: Optional[Dict[str, Any]] = None,
            arrow_parquet_args: Optional[
                Union[Dict[str, Any], Callable[[BlockAccessor], Dict[str, Any]]]] = None
    ) -> None:
        """Write the dataset to parquet.
        ...
        Args:
            ...
            arrow_open_stream_args: kwargs passed to
                pyarrow.fs.FileSystem.open_output_stream
            arrow_parquet_args: A dictionary (or callable that creats a dictionary) of write
                options to use when writing each block to a file. Use a callable if you want to
                vary the write options based on the block, or if one or more of the write options
                are not serializable.
        """

# ...

    def _write_block(self,
                     f: "pyarrow.NativeFile",
                     block: BlockAccessor,
                     writer_args: Optional[
                         Union[Dict[str, Any], Callable[[BlockAccessor], Dict[str, Any]]]] = None:
        import pyarrow.parquet as pq

        writer_args = _resolve_kwargs(writer_args, block)
        pq.write_table(block.to_arrow(), f, **writer_args)

# ...

def _resolve_kwargs(
    kwargs: Union[Dict[str, Any], Callable[[BlockAccessor], Dict[str, Any]]],
    block: BlockAccessor,
) -> Dict[str, Any]:
    if kwargs is None:
        kwargs = {}
    elif callable(kwargs):
        kwargs = kwargs(block)
    return kwargs

You have a more complicated type for arrow_parquet_args, but all three of the following options are available to the user without exposing multiple arguments that step on each other:

  1. Happy/easy path: The common case of a plain dict of write options can be passed.
  2. Unserializable args: Unserializable args can be generated by a callable.
  3. Per-block options: The options can vary for each block via a callable.

@ericl ericl removed the @external-author-action-required Alternate tag for PRs where the author doesn't have labeling permission. label Oct 21, 2021
@ericl ericl merged commit 20d4787 into ray-project:master Oct 21, 2021
@ericl
Copy link
Contributor

ericl commented Oct 21, 2021 via email

@ericl
Copy link
Contributor

ericl commented Oct 21, 2021 via email

@pdames pdames deleted the dataset-csv-write-options branch October 21, 2021 20:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Datasets][Bug] Users cannot specify custom Dataset write options when writing output to CSV
4 participants