Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for snappy text decompression #22298 #22486

Merged
merged 10 commits into from
Mar 15, 2022

Conversation

siddgoel
Copy link
Contributor

Why are these changes needed?

Adds a streaming based reading option for Snappy-compressed files. Arrow doesn't support streaming Snappy decompression since the canonical C++ Snappy library doesn't natively support streaming decompression. This PR works around this by doing streaming reads of snappy-compressed files using the streaming decompression API provided in the python-snappy package.

This commit supplies a custom datasource that uses Arrow + python-snappy to read and decompress Snappy-compressed files.

Related issue number

Closes #22023

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized something that should be really nice!

Since the py-snappy library can do streaming decompression on an Arrow NativeFile, we could push all of this down into BinaryDatasource as an implementation detail switching on whether Snappy decompression is needed, so we don't even need the new SnappyTextDatasource.

Much as I did here in the other PR, we can check for Snappy compression in FileBasedDatasource.prepare_read() and, if found, make sure that we don't open the Arrow stream with that compression and instead pass that compression arg to FileBasedDatasource._read_file() as one of the **reader_args, allowing subclasses of FileBasedDatasource (such as BinaryDatasource) to implement their own manual streaming decompression of Snappy-compressed files:

# file_based_datasource.py
class FileBasedDatasource(Datasource):

    def prepare_read(...):
        # ...
        def read_files(
            read_paths: List[str],
            fs: Union["pyarrow.fs.FileSystem", _S3FileSystemWrapper],
        ) -> Iterable[Block]:
            # ...
            for read_path in read_paths:
                compression = open_stream_args.pop("compression", None)
                if compression is None:
                    try:
                        # If no compression manually given, try to detect compression codec from
                        # path.
                        compression = pa.Codec.detect(read_path).name
                    except (ValueError, TypeError):
                        compression = None
                if compression == "snappy":
                    # Pass Snappy compression as a reader arg, so datasource subclasses can
                    # manually handle streaming decompression in self._read_stream().
                    reader_args["compression"] = compression
                elif compression is not None:
                    # Non-Snappy compression, pass as open_input_stream() arg so Arrow can take
                    # care of streaming decompression for us.
                    open_stream_args["compression"] = compression
                with fs.open_input_stream(read_path, **open_stream_args) as f:
                    for data in read_stream(f, read_path, filesystem=fs, **reader_args):
                        output_buffer.add_block(data)
                        if output_buffer.has_next():
                            yield output_buffer.next()

The BinaryDatasource can then check for compression=="snappy" as well and if found, use py-snappy for streaming decompression using your current implementation in SnappyTextDatasource:

class BinaryDatasource(FileBasedDatasource):
    def _read_file(
        self,
        f: "pyarrow.NativeFile",
        path: str,
        filesystem: "pyarrow.fs.FileSystem",
        **reader_args
    ):
        import pyarrow as pa

        include_paths = reader_args.pop("include_paths", False)
        if reader_args.get("compression") == "snappy":
            import snappy

            rawbytes = io.BytesIO()

            if isinstance(filesystem, pa.fs.HadoopFileSystem):
                snappy.hadoop_snappy.stream_decompress(src=f, dst=rawbytes)
            else:
                snappy.stream_decompress(src=f, dst=rawbytes)

            data = rawbytes.getvalue()
        else:
            data = f.readall()
        if include_paths:
            return [(path, data)]
        else:
            return [data]

This should all happen transparently to the user: if manually given compression="snappy" or if we infer Snappy compression from the file path, we'll switch to py-snappy-based streaming decompression within the BinaryDatasource implementation.

What do you think? Am I missing anything here, or do you think this could work?

python/ray/data/datasource/binary_datasource.py Outdated Show resolved Hide resolved
python/ray/data/datasource/csv_datasource.py Outdated Show resolved Hide resolved
python/ray/data/datasource/file_based_datasource.py Outdated Show resolved Hide resolved
python/ray/data/datasource/file_based_datasource.py Outdated Show resolved Hide resolved
python/ray/data/datasource/file_based_datasource.py Outdated Show resolved Hide resolved
python/ray/data/datasource/numpy_datasource.py Outdated Show resolved Hide resolved
python/requirements.txt Outdated Show resolved Hide resolved
python/ray/data/datasource/snappy_text_datasource.py Outdated Show resolved Hide resolved
python/ray/data/datasource/snappy_text_datasource.py Outdated Show resolved Hide resolved
python/ray/data/datasource/snappy_text_datasource.py Outdated Show resolved Hide resolved
Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking really good! In addition to the review comments, a few other things:

filesystem = reader_args.get("filesystem", None)
rawbytes = BytesIO()

if isinstance(filesystem, pyarrow.fs.HadoopFileSystem):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if isinstance(filesystem, pyarrow.fs.HadoopFileSystem):
if isinstance(filesystem, HadoopFileSystem):

python/ray/data/datasource/binary_datasource.py Outdated Show resolved Hide resolved
python/ray/data/tests/test_dataset_formats.py Show resolved Hide resolved
@clarkzinzow
Copy link
Contributor

@siddgoel Ping on this!

@siddgoel siddgoel requested a review from jjyao as a code owner March 11, 2022 01:00
Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, if the tests pass this looks good to merge. Great work!

@ericl ericl self-assigned this Mar 15, 2022
@scv119
Copy link
Contributor

scv119 commented Mar 15, 2022

looks a bunch of tests failed. could you merge master and trigger the tests again.

@scv119 scv119 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Mar 15, 2022
@siddgoel
Copy link
Contributor Author

@scv119 done, we should be good to go once the tests are finished

@ericl ericl merged commit 0722cbb into ray-project:master Mar 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Datasets] [Bug] Streaming reads for Snappy-compressed files not supported
4 participants