Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add correct support for compressing file-like objects #174

Merged
merged 9 commits into from
Oct 4, 2023
10 changes: 7 additions & 3 deletions ibm_cloud_sdk_core/base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

import gzip
import io
import json as json_import
import logging
import platform
Expand All @@ -39,6 +40,7 @@
read_external_sources,
strip_extra_slashes,
SSLHTTPAdapter,
GzipStream,
)
from .version import __version__

Expand Down Expand Up @@ -420,10 +422,12 @@ def prepare_request(
# Compress the request body if applicable
if self.get_enable_gzip_compression() and 'content-encoding' not in headers and request['data'] is not None:
headers['content-encoding'] = 'gzip'
uncompressed_data = request['data']
request_body = gzip.compress(uncompressed_data)
request['data'] = request_body
request['headers'] = headers
raw = request['data']
# Handle the compression for file-like objects.
# We need to use a custom stream/pipe method to prevent
# reading the whole file into the memory.
request['data'] = GzipStream(raw) if isinstance(raw, io.IOBase) else gzip.compress(raw)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use this if-else format to avoid too-many-branches linter error then I renamed the variable to raw to do not exceed the max row length...:)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would add some value if you were to add some additional function to the GzipStream() ctor so that you could also pass in a non-IOBase-type value for raw and it would do the right thing (in that case, perhaps just wrap raw with an appropriate IOBase-type class that could stream the bytes of raw... in other words, beef up GzipStream just a bit so it can handle file-like objects and static strings/buffers).
This way, the details of how a particular requestBody is gzip-encoded is hidden inside GzipStream and not exposed up in this BaseService method AND we also get a stream-based zip compression being performed which might help with large JSON requestBodies.

Copy link
Member

@ricellis ricellis Sep 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI when I was originally investigating this in our SDK I noticed from gzip.compress() docs

Changed in version 3.11: Speed is improved by compressing all data at once instead of in a streamed fashion.

So there might not be much value in trying to be fully stream-based.

It also says:

Calls with mtime set to 0 are delegated to zlib.compress() for better speed.

but it isn't immediately obvious to me whether that delegation makes any difference to the streaming behaviour, but mtime defaults to current time so it won't happen at present anyway (nb. mtime only avialable from 3.8).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there might not be much value in trying to be fully stream-based.

In my opinion it would make the code cleaner (to have all compression related steps in the helper class) and although the performance is improved in 3.11, the memory usage could still be a problem when the file is large, right? I think that's Phil's main point.

mtime only avialable from 3.8

We still support 3.7 so that's not an option - at least for the gzip.compress function. From my understanding the mtime is only used during the decompressing so - I think - the streaming shouldn't affect the result.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the memory usage could still be a problem when the file is large, right? I think that's Phil's main point

My understanding is that in 3.11 the entire contents will be in memory in gzip anyway. So my point was that might not be worth making changes only for that reason, but as you say there are other benefits.


# Next, we need to process the 'files' argument to try to fill in
# any missing filenames where possible.
Expand Down
75 changes: 75 additions & 0 deletions ibm_cloud_sdk_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# limitations under the License.
# from ibm_cloud_sdk_core.authenticators import Authenticator
import datetime
import gzip
import io
import json as json_import
import re
import ssl
Expand Down Expand Up @@ -43,6 +45,79 @@ def init_poolmanager(self, connections, maxsize, block):
super().init_poolmanager(connections, maxsize, block, ssl_context=ssl_context)


class GzipStream(io.IOBase):
"""Compress files on the fly.
GzipStream is a helper class around the gzip library. It helps to
compress already opened files (file-like objects) on the fly, so
there is no need to read everything into the memory and call the
`compress` function on it.
The GzipFile is opened on the instance itself so it needs to act
as a file-like object.
Args:
input: a file-like object to be compressed
"""

def __init__(self, source: io.IOBase):
self.uncompressed = source
self.buffer = b''

self.compressor = gzip.GzipFile(fileobj=self, mode='wb')

def read(self, size: int = -1):
"""Compresses and returns the reqested size of data.
pyrooka marked this conversation as resolved.
Show resolved Hide resolved
Args:
size: how many bytes to return. -1 to read and compress the whole file
"""
if (size < 0) or (len(self.buffer) < size):
for raw in self.uncompressed:
# We need to encode text like streams (e.g. TextIOWrapper) to bytes.
if isinstance(raw, str):
raw = raw.encode()

self.compressor.write(raw)

# Stop compressing if we reached the max allowed size.
if 0 < size < len(self.buffer):
self.compressor.flush()
break
else:
self.compressor.close()

if size < 0:
# Return all data from the buffer.
compressed = self.buffer
self.buffer = b''
else:
# If we already have enough data in our buffer
# return the desired chunk of bytes
compressed = self.buffer[:size]
# then remove them from the buffer.
self.buffer = self.buffer[size:]

return compressed

def flush(self):
"""Not implemented."""
# Since this "pipe" sits between 2 other stream (source/read -> target/write)
# it wouldn't be worth to implemet flushing.
pass

def write(self, compressed: bytes):
"""Appened the compressed data to the buffer
pyrooka marked this conversation as resolved.
Show resolved Hide resolved
This happens when the target stream calls the `read` method and
that triggers the gzip "compressor".
"""
self.buffer += compressed

def close(self):
"""Closes the underlying file-like object."""
self.uncompressed.close()


def has_bad_first_or_last_char(val: str) -> bool:
"""Returns true if a string starts with any of: {," ; or ends with any of: },".
Expand Down
28 changes: 28 additions & 0 deletions test/test_base_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,34 @@ def test_gzip_compression():
assert prepped['headers'].get('content-encoding') == 'gzip'


def test_gzip_compression_file_input():
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to put the new test cases into a separate function to avoid the too-many-branches linter error.

service = AnyServiceV1('2018-11-20', authenticator=NoAuthAuthenticator())
service.set_enable_gzip_compression(True)

# Should return file-like object with the compressed data when compression is on
# and the input is a file, opened for reading in binary mode.
raw_data = b'rawdata'
with tempfile.TemporaryFile(mode='w+b') as tmp_file:
tmp_file.write(raw_data)
tmp_file.seek(0)

prepped = service.prepare_request('GET', url='', data=tmp_file)
assert prepped['data'].read() == gzip.compress(raw_data)
assert prepped['headers'].get('content-encoding') == 'gzip'

# Should return file-like object with the compressed data when compression is on
# and the input is a file, opened for reading in text mode.
assert service.get_enable_gzip_compression()
text_data = 'textata'
with tempfile.TemporaryFile(mode='w+') as tmp_file:
tmp_file.write(text_data)
tmp_file.seek(0)

prepped = service.prepare_request('GET', url='', data=tmp_file)
assert prepped['data'].read() == gzip.compress(text_data.encode())
assert prepped['headers'].get('content-encoding') == 'gzip'


def test_gzip_compression_external():
# Should set gzip compression from external config
file_path = os.path.join(os.path.dirname(__file__), '../resources/ibm-credentials-gzip.env')
Expand Down