Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup old compression workarounds #6259

Merged
merged 8 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ dependencies:
- toolz
- tornado
- zict # overridden by git tip below
- zstandard
- zstandard >=0.9.0
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/dask/s3fs
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.8.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ dependencies:
- toolz
- tornado
- zict
- zstandard
- zstandard >=0.9.0
- pip:
- git+https://github.com/dask/dask
- git+https://github.com/jcrist/crick # Only tested here
Expand Down
6 changes: 3 additions & 3 deletions continuous_integration/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies:
- ipywidgets
- jinja2
- locket >=1.0
- lz4 # Only tested here
- lz4 >=0.23.1 # Only tested here
- msgpack-python
- netcdf4
- paramiko
Expand All @@ -32,7 +32,7 @@ dependencies:
- pytest-repeat
- pytest-rerunfailures
- pytest-timeout
- python-snappy # Only tested here
- python-snappy >=0.5.3 # Only tested here
- pytorch # Only tested here
- requests
- s3fs
Expand All @@ -44,7 +44,7 @@ dependencies:
- torchvision # Only tested here
- tornado
- zict
- zstandard
- zstandard >=0.9.0
- pip:
- git+https://github.com/dask/dask
- keras
63 changes: 24 additions & 39 deletions distributed/protocol/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from contextlib import suppress
from typing import Literal

from packaging.version import parse as parse_version
from tlz import identity

import dask
Expand Down Expand Up @@ -39,63 +40,47 @@
with suppress(ImportError):
import snappy

def _fixed_snappy_decompress(data):
# snappy.decompress() doesn't accept memoryviews
if isinstance(data, (memoryview, bytearray)):
data = bytes(data)
return snappy.decompress(data)
Comment on lines -42 to -46
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support for the Python Buffer Protocol was added in 0.5.3 with PR ( intake/python-snappy#72 ). This version was released Jul 2018 so should be old enough to rely upon. With Python Buffer Protocol support, there is no longer a need to copy too bytes here and we can just hand data to snappy.decompress.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

# In python-snappy 0.5.3, support for the Python Buffer Protocol was added.
# This is needed to handle other objects (like `memoryview`s) without
# copying to `bytes` first.
#
# Note: `snappy.__version__` doesn't exist in a release yet.
# So do a little test that will fail if snappy is not 0.5.3 or later.
try:
snappy.compress(memoryview(b""))
except TypeError:
raise ImportError("Need snappy >= 0.5.3")
Comment on lines +49 to +52
Copy link
Member Author

@jakirkham jakirkham May 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recently snappy.__version__ was added with PR ( intake/python-snappy#119 ). Though it is not currently in a release and hasn't been around long enough to rely upon.

So just do a simple test that will fail without python-snappy version 0.5.3 or later. If python-snappy is not new enough (unlikely given how long this release has been out), simply skip using python-snappy. Test results on 0.5.2 & 0.5.3 below:

python-snappy 0.5.2:

In [1]: import snappy

In [2]: snappy.compress(memoryview(b""))
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-2-077d330e1c70> in <module>
----> 1 snappy.compress(memoryview(b""))

~/miniconda/envs/snap52/lib/python3.6/site-packages/snappy/snappy.py in compress(data, encoding)
     82         data = data.encode(encoding)
     83 
---> 84     return _compress(data)
     85 
     86 def uncompress(data, decoding=None):

TypeError: argument 1 must be read-only bytes-like object, not memoryview

python-snappy 0.5.3:

In [1]: import snappy

In [2]: snappy.compress(memoryview(b""))
Out[2]: b'\x00'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

(do we want to use cramjam for all of the de/compressors??)


compressions["snappy"] = {
"compress": snappy.compress,
"decompress": _fixed_snappy_decompress,
"decompress": snappy.decompress,
}
default_compression = "snappy"

with suppress(ImportError):
import lz4

try:
# try using the new lz4 API
import lz4.block

lz4_compress = lz4.block.compress
lz4_decompress = lz4.block.decompress
except ImportError:
# fall back to old one
lz4_compress = lz4.LZ4_compress
lz4_decompress = lz4.LZ4_uncompress
Comment on lines -57 to -66
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lz4.block APIs were added in commit ( python-lz4/python-lz4@bb2c7ea ) and have been around since 0.23.1, which was released Feb 2018. So should be old enough to rely upon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


# helper to bypass missing memoryview support in current lz4
# (fixed in later versions)

def _fixed_lz4_compress(data):
try:
return lz4_compress(data)
except TypeError:
if isinstance(data, (memoryview, bytearray)):
return lz4_compress(bytes(data))
else:
raise

def _fixed_lz4_decompress(data):
try:
return lz4_decompress(data)
except (ValueError, TypeError):
if isinstance(data, (memoryview, bytearray)):
return lz4_decompress(bytes(data))
else:
raise
Comment on lines -68 to -87
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also Python Buffer Protocol support was added in PR ( python-lz4/python-lz4#38 ), which was also included in 0.23.1. So drop these workarounds as these objects should work without copying to bytes first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

# Required to use `lz4.block` APIs and Python Buffer Protocol support.
if parse_version(lz4.__version__) < parse_version("0.23.1"):
raise ImportError("Need lz4 >= 0.23.1")
Comment on lines +63 to +65
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error is a bit more explicit, but this arguably would happen by just trying the import line below.


from lz4.block import compress as lz4_compress
from lz4.block import decompress as lz4_decompress

compressions["lz4"] = {
"compress": _fixed_lz4_compress,
"decompress": _fixed_lz4_decompress,
"compress": lz4_compress,
"decompress": lz4_decompress,
}
default_compression = "lz4"


with suppress(ImportError):
import zstandard

# Required for Python Buffer Protocol support.
if parse_version(zstandard.__version__) < parse_version("0.9.0"):
raise ImportError("Need zstandard >= 0.9.0")
Comment on lines +80 to +82
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the other compressors already support the Python Buffer Protocol, make sure we support it here too. This was added in a series of commits referenced in issue ( indygreg/python-zstandard#26 ) that were included in 0.9.0, which came out Apr 2018. Thus has been around as long as the other releases being required here. So seems like a reasonable minimum.


zstd_compressor = zstandard.ZstdCompressor(
level=dask.config.get("distributed.comm.zstd.level"),
threads=dask.config.get("distributed.comm.zstd.threads"),
Expand Down