Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add configurable checksum support for uploads #139

Merged
merged 8 commits into from
Jul 23, 2020
Merged
197 changes: 196 additions & 1 deletion google/resumable_media/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

"""Shared utilities used by both downloads and uploads."""


import base64
import hashlib
import logging
import random
import time
import warnings

from six.moves import http_client

Expand All @@ -33,6 +36,18 @@
http_client.GATEWAY_TIMEOUT,
)

_SLOW_CRC32C_WARNING = (
"Currently using crcmod in pure python form. This is a slow "
"implementation. Python 3 has a faster implementation, `google-crc32c`, "
"which will be used if it is installed."
)
_HASH_HEADER = u"x-goog-hash"
_MISSING_CHECKSUM = u"""\
No {checksum_type} checksum was returned from the service while downloading {}
(which happens for composite objects), so client-side content integrity
checking is not being performed."""
_LOGGER = logging.getLogger(__name__)


def do_nothing():
"""Simple default callback."""
Expand Down Expand Up @@ -164,3 +179,183 @@ def wait_and_retry(func, get_status_code, retry_strategy):
return response

return response


def _get_crc32c_object():
""" Get crc32c object
Attempt to use the Google-CRC32c package. If it isn't available, try
to use CRCMod. CRCMod might be using a 'slow' varietal. If so, warn...
"""
try:
import crc32c

crc_obj = crc32c.Checksum()
except ImportError:
try:
import crcmod

crc_obj = crcmod.predefined.Crc("crc-32c")
_is_fast_crcmod()

except ImportError:
raise ImportError("Failed to import either `google-crc32c` or `crcmod`")

return crc_obj


def _is_fast_crcmod():
# Determine if this is using the slow form of crcmod.
nested_crcmod = __import__(
"crcmod.crcmod", globals(), locals(), ["_usingExtension"], 0,
)
fast_crc = getattr(nested_crcmod, "_usingExtension", False)
if not fast_crc:
warnings.warn(_SLOW_CRC32C_WARNING, RuntimeWarning, stacklevel=2)
return fast_crc


def _get_metadata_key(checksum_type):
if checksum_type == "md5":
return "md5Hash"
else:
return checksum_type


def prepare_checksum_digest(digest_bytestring):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be prefixed _?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several other helper functions that are similarly independent of internal specifics are public in this file so I decided this one should be intentionally public as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case should the file name be renamed to helpers.py instead in the case an end-user wants to use them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would be productive as it would commit us to a public interface and there is no expressed customer demand for that, but I wanted to follow the previous convention of not marking functions in this private module further private if they aren't extremely single-use and don't make strong assumptions about internal structure.

"""Convert a checksum object into a digest encoded for an HTTP header.

Args:
bytes: A checksum digest bytestring.
andrewsg marked this conversation as resolved.
Show resolved Hide resolved

Returns:
str: A base64 string representation of the input.
"""
encoded_digest = base64.b64encode(digest_bytestring)
# NOTE: ``b64encode`` returns ``bytes``, but HTTP headers expect ``str``.
return encoded_digest.decode(u"utf-8")


def _get_expected_checksum(response, get_headers, media_url, checksum_type):
"""Get the expected checksum and checksum object for the download response.

Args:
response (~requests.Response): The HTTP response object.
get_headers (callable: response->dict): returns response headers.
media_url (str): The URL containing the media to be downloaded.
checksum_type Optional(str): The checksum type to read from the headers,
exactly as it will appear in the headers (case-sensitive). Must be
"md5", "crc32c" or None.

Returns:
Tuple (Optional[str], object): The expected checksum of the response,
if it can be detected from the ``X-Goog-Hash`` header, and the
appropriate checksum object for the expected checksum.
"""
if checksum_type not in ["md5", "crc32c", None]:
raise ValueError("checksum must be ``'md5'``, ``'crc32c'`` or ``None``")
elif checksum_type in ["md5", "crc32c"]:
headers = get_headers(response)
expected_checksum = _parse_checksum_header(
headers.get(_HASH_HEADER), response, checksum_label=checksum_type
)

if expected_checksum is None:
msg = _MISSING_CHECKSUM.format(
media_url, checksum_type=checksum_type.upper()
)
_LOGGER.info(msg)
checksum_object = _DoNothingHash()
else:
if checksum_type == "md5":
checksum_object = hashlib.md5()
else:
checksum_object = _get_crc32c_object()
else:
expected_checksum = None
checksum_object = _DoNothingHash()

return (expected_checksum, checksum_object)


def _parse_checksum_header(header_value, response, checksum_label):
"""Parses the checksum header from an ``X-Goog-Hash`` value.

.. _header reference: https://cloud.google.com/storage/docs/\
xml-api/reference-headers#xgooghash

Expects ``header_value`` (if not :data:`None`) to be in one of the three
following formats:

* ``crc32c=n03x6A==``
* ``md5=Ojk9c3dhfxgoKVVHYwFbHQ==``
* ``crc32c=n03x6A==,md5=Ojk9c3dhfxgoKVVHYwFbHQ==``

See the `header reference`_ for more information.

Args:
header_value (Optional[str]): The ``X-Goog-Hash`` header from
a download response.
response (~requests.Response): The HTTP response object.
checksum_label (str): The label of the header value to read, as in the
examples above. Typically "md5" or "crc32c"

Returns:
Optional[str]: The expected checksum of the response, if it
can be detected from the ``X-Goog-Hash`` header; otherwise, None.

Raises:
~google.resumable_media.common.InvalidResponse: If there are
multiple checksums of the requested type in ``header_value``.
"""
if header_value is None:
return None

matches = []
for checksum in header_value.split(u","):
name, value = checksum.split(u"=", 1)
if name == checksum_label:
matches.append(value)

if len(matches) == 0:
return None
elif len(matches) == 1:
return matches[0]
else:
raise common.InvalidResponse(
response,
u"X-Goog-Hash header had multiple ``{}`` values.".format(checksum_label),
header_value,
matches,
)


def _get_checksum_object(checksum_type):
"""Respond with a checksum object for a supported type, if not None.

Raises ValueError if checksum_type is unsupported.
"""
if checksum_type == "md5":
return hashlib.md5()
elif checksum_type == "crc32c":
return _get_crc32c_object()
elif checksum_type is None:
return None
else:
raise ValueError("checksum must be ``'md5'``, ``'crc32c'`` or ``None``")


class _DoNothingHash(object):
"""Do-nothing hash object.

Intended as a stand-in for ``hashlib.md5`` or a crc32c checksum
implementation in cases where it isn't necessary to compute the hash.
"""

def update(self, unused_chunk):
"""Do-nothing ``update`` method.

Intended to match the interface of ``hashlib.md5`` and other checksums.

Args:
unused_chunk (bytes): A chunk of data.
"""
96 changes: 95 additions & 1 deletion google/resumable_media/_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@
)
_POST = u"POST"
_PUT = u"PUT"
_UPLOAD_CHECKSUM_MISMATCH_MESSAGE = (
"The computed ``{}`` checksum, ``{}``, and the checksum reported by the "
"remote host, ``{}``, did not match."
)
_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE = (
"Response metadata had no ``{}`` value; checksum could not be validated."
)


class UploadBase(object):
Expand Down Expand Up @@ -231,11 +238,20 @@ class MultipartUpload(UploadBase):
upload_url (str): The URL where the content will be uploaded.
headers (Optional[Mapping[str, str]]): Extra headers that should
be sent with the request, e.g. headers for encrypted data.
checksum Optional([str]): The type of checksum to compute to verify
the integrity of the object. The request metadata will be amended
to include the computed value. Using this option will override a
manually-set checksum value. Supported values are "md5", "crc32c"
and None. The default is None.

Attributes:
upload_url (str): The URL where the content will be uploaded.
"""

def __init__(self, upload_url, headers=None, checksum=None):
super(MultipartUpload, self).__init__(upload_url, headers=headers)
self._checksum_type = checksum

def _prepare_request(self, data, metadata, content_type):
"""Prepare the contents of an HTTP request.

Expand Down Expand Up @@ -274,11 +290,20 @@ def _prepare_request(self, data, metadata, content_type):

if not isinstance(data, six.binary_type):
raise TypeError(u"`data` must be bytes, received", type(data))

checksum_object = _helpers._get_checksum_object(self._checksum_type)
if checksum_object:
checksum_object.update(data)
actual_checksum = _helpers.prepare_checksum_digest(checksum_object.digest())
metadata_key = _helpers._get_metadata_key(self._checksum_type)
metadata[metadata_key] = actual_checksum
andrewsg marked this conversation as resolved.
Show resolved Hide resolved

content, multipart_boundary = construct_multipart_request(
data, metadata, content_type
)
multipart_content_type = _RELATED_HEADER + multipart_boundary + b'"'
self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type

return _POST, self.upload_url, content, self._headers

def transmit(self, transport, data, metadata, content_type, timeout=None):
Expand Down Expand Up @@ -321,6 +346,13 @@ class ResumableUpload(UploadBase):
be sent with the :meth:`initiate` request, e.g. headers for
encrypted data. These **will not** be sent with
:meth:`transmit_next_chunk` or :meth:`recover` requests.
checksum Optional([str]): The type of checksum to compute to verify
the integrity of the object. After the upload is complete, the
server-computed checksum of the resulting object will be read
and google.resumable_media.common.DataCorruption will be raised on
a mismatch. The corrupted file will not be deleted from the remote
host automatically. Supported values are "md5", "crc32c" and None.
The default is None.

Attributes:
upload_url (str): The URL where the content will be uploaded.
Expand All @@ -330,7 +362,7 @@ class ResumableUpload(UploadBase):
:data:`.UPLOAD_CHUNK_SIZE`.
"""

def __init__(self, upload_url, chunk_size, headers=None):
def __init__(self, upload_url, chunk_size, checksum=None, headers=None):
super(ResumableUpload, self).__init__(upload_url, headers=headers)
if chunk_size % resumable_media.UPLOAD_CHUNK_SIZE != 0:
raise ValueError(
Expand All @@ -342,6 +374,9 @@ def __init__(self, upload_url, chunk_size, headers=None):
self._stream = None
self._content_type = None
self._bytes_uploaded = 0
self._bytes_checksummed = 0
self._checksum_type = checksum
self._checksum_object = None
self._total_bytes = None
self._resumable_url = None
self._invalid = False
Expand Down Expand Up @@ -576,12 +611,36 @@ def _prepare_request(self):
msg = _STREAM_ERROR_TEMPLATE.format(start_byte, self.bytes_uploaded)
raise ValueError(msg)

self._update_checksum(start_byte, payload)

headers = {
_CONTENT_TYPE_HEADER: self._content_type,
_helpers.CONTENT_RANGE_HEADER: content_range,
}
return _PUT, self.resumable_url, payload, headers

def _update_checksum(self, start_byte, payload):
"""Update the checksum with the payload if not already updated.

Because error recovery can result in bytes being transmitted more than
once, the checksum tracks the number of bytes checked in
self._bytes_checksummed and skips bytes that have already been summed.
"""
if not self._checksum_type:
return

if not self._checksum_object:
self._checksum_object = _helpers._get_checksum_object(self._checksum_type)

if start_byte < self._bytes_checksummed:
offset = self._bytes_checksummed - start_byte
data = payload[offset:]
else:
data = payload

self._checksum_object.update(data)
self._bytes_checksummed += len(data)

def _make_invalid(self):
"""Simple setter for ``invalid``.

Expand Down Expand Up @@ -630,6 +689,8 @@ def _process_response(self, response, bytes_sent):
self._bytes_uploaded = self._bytes_uploaded + bytes_sent
# Tombstone the current upload so it cannot be used again.
self._finished = True
# Validate the checksum. This can raise an exception on failure.
self._validate_checksum(response)
else:
bytes_range = _helpers.header_required(
response,
Expand All @@ -648,6 +709,39 @@ def _process_response(self, response, bytes_sent):
)
self._bytes_uploaded = int(match.group(u"end_byte")) + 1

def _validate_checksum(self, response):
"""Check the computed checksum, if any, against the response headers.

Args:
response (object): The HTTP response object.

Raises:
~google.resumable_media.common.DataCorruption: If the checksum
computed locally and the checksum reported by the remote host do
not match.
"""
if self._checksum_type is None:
return
metadata_key = _helpers._get_metadata_key(self._checksum_type)
metadata = response.json()
remote_checksum = metadata.get(metadata_key)
if remote_checksum is None:
raise common.InvalidResponse(
response,
_UPLOAD_METADATA_NO_APPROPRIATE_CHECKSUM_MESSAGE.format(metadata_key),
self._get_headers(response),
)
local_checksum = _helpers.prepare_checksum_digest(
self._checksum_object.digest()
)
if local_checksum != remote_checksum:
raise common.DataCorruption(
response,
_UPLOAD_CHECKSUM_MISMATCH_MESSAGE.format(
self._checksum_type.upper(), local_checksum, remote_checksum
),
)

def transmit_next_chunk(self, transport, timeout=None):
"""Transmit the next chunk of the resource to be uploaded.

Expand Down
Loading