From 8b1ae6dd9ba59fc0568e370cc6bc64a562855818 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Tue, 2 May 2017 17:14:29 -0700 Subject: [PATCH 1/5] Changing over Blob.upload*() methods to use google-resumable-media. --- storage/google/cloud/storage/blob.py | 356 +++++++++++++++++++++------ storage/setup.py | 2 +- 2 files changed, 285 insertions(+), 73 deletions(-) diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index 3691b0c8ba19..7905559c47c9 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -20,17 +20,20 @@ import copy import hashlib from io import BytesIO -from io import UnsupportedOperation -import json import mimetypes import os import time +import warnings import httplib2 from six.moves.urllib.parse import quote import google.auth.transport.requests from google import resumable_media +from google.resumable_media.requests import ChunkedDownload +from google.resumable_media.requests import Download +from google.resumable_media.requests import MultipartUpload +from google.resumable_media.requests import ResumableUpload from google.cloud._helpers import _rfc3339_to_datetime from google.cloud._helpers import _to_bytes @@ -43,7 +46,6 @@ from google.cloud.storage._helpers import _scalar_property from google.cloud.storage.acl import ObjectACL from google.cloud.streaming.http_wrapper import Request -from google.cloud.streaming.http_wrapper import make_api_request from google.cloud.streaming.transfer import RESUMABLE_UPLOAD from google.cloud.streaming.transfer import Upload @@ -52,7 +54,35 @@ _DEFAULT_CONTENT_TYPE = u'application/octet-stream' _DOWNLOAD_URL_TEMPLATE = ( u'https://www.googleapis.com/download/storage/v1{path}?alt=media') -_CONTENT_TYPE = 'contentType' +_BASE_UPLOAD_TEMPLATE = ( + u'https://www.googleapis.com/upload/storage/v1{bucket_path}/o?uploadType=') +_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'multipart' +_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'resumable' +# NOTE: "acl" is also writeable but we defer ACL management to +# the classes in the google.cloud.storage.acl module. +_CONTENT_TYPE_FIELD = 'contentType' +_WRITABLE_FIELDS = ( + 'cacheControl', + 'contentDisposition', + 'contentEncoding', + 'contentLanguage', + _CONTENT_TYPE_FIELD, + 'crc32c', + 'md5Hash', + 'metadata', + 'name', + 'storageClass', +) +_NUM_RETRIES_MESSAGE = ( + 'num_retries is no longer supported. When a transient error occurs, ' + 'such as a 429 Too Many Requests or 500 Internal Server Error, upload ' + 'requests will be automatically retried. Subsequent retries will be ' + 'done after waiting 1, 2, 4, 8, etc. seconds (exponential backoff) until ' + '10 minutes of wait time have elapsed. At that point, there will be no ' + 'more attempts to retry.') +_READ_LESS_THAN_SIZE = ( + 'Size {:d} was specified but the file-like object only had ' + '{:d} bytes remaining.') class Blob(_PropertyMixin): @@ -381,11 +411,11 @@ def _do_download(self, transport, file_obj, download_url, headers): :param headers: Optional headers to be sent with the request(s). """ if self.chunk_size is None: - download = resumable_media.Download(download_url, headers=headers) + download = Download(download_url, headers=headers) response = download.consume(transport) file_obj.write(response.content) else: - download = resumable_media.ChunkedDownload( + download = ChunkedDownload( download_url, self.chunk_size, file_obj, headers=headers) while not download.finished: download.consume_next_chunk(transport) @@ -428,10 +458,7 @@ def download_to_file(self, file_obj, client=None): try: self._do_download(transport, file_obj, download_url, headers) except resumable_media.InvalidResponse as exc: - response = exc.response - faux_response = httplib2.Response({'status': response.status_code}) - raise make_exception(faux_response, response.content, - error_info=download_url, use_json=False) + _raise_from_invalid_response(exc, download_url) def download_to_filename(self, filename, client=None): """Download the contents of this blob into a named file. @@ -613,8 +640,213 @@ def _check_response_error(request, http_response): raise make_exception(faux_response, http_response.content, error_info=request.url) + def _get_writable_metadata(self): + """Get the object / blob metadata which is writable. + + This is intended to be used when creating a new object / blob. + + See the `API reference`_ for more information, the fields marked as + writable are: + + * ``acl`` + * ``cacheControl`` + * ``contentDisposition`` + * ``contentEncoding`` + * ``contentLanguage`` + * ``contentType`` + * ``crc32c`` + * ``md5Hash`` + * ``metadata`` + * ``name`` + * ``storageClass`` + + For now, we don't support ``acl``, access control lists should be + managed directly through :class:`ObjectACL` methods. + + .. _API reference: https://cloud.google.com/storage/\ + docs/json_api/v1/objects + """ + # NOTE: This assumes `self.name` is unicode. + object_metadata = {'name': self.name} + for key in self._changes: + if key in _WRITABLE_FIELDS: + object_metadata[key] = self._properties[key] + + return object_metadata + + def _get_upload_arguments(self, client, content_type): + """Get required arguments for performing an upload. + + The content type returned will be determined in order of precedence: + + - The value passed in to this method (if not :data:`None`) + - The value stored on the current blob + - The default value ('application/octet-stream') + + :type client: :class:`~google.cloud.storage.client.Client` + :param client: (Optional) The client to use. If not passed, falls back + to the ``client`` stored on the blob's bucket. + + :type content_type: str + :param content_type: Type of content being uploaded (or :data:`None`). + + :rtype: tuple + :returns: A quadruple of + + * An + :class:`~google.auth.transport.requests.AuthorizedSession` + * A header dictionary + * An object metadata dictionary + * The ``content_type`` as a string (according to precedence) + """ + transport = self._make_transport(client) + headers = _get_encryption_headers(self._encryption_key) + object_metadata = self._get_writable_metadata() + content_type = self._get_content_type(content_type) + return transport, headers, object_metadata, content_type + + def _do_multipart_upload(self, client, stream, content_type, size): + """Perform a multipart upload. + + Assumes ``chunk_size`` is :data:`None` on the current blob. + + The content type of the upload will be determined in order + of precedence: + + - The value passed in to this method (if not :data:`None`) + - The value stored on the current blob + - The default value ('application/octet-stream') + + :type client: :class:`~google.cloud.storage.client.Client` + :param client: (Optional) The client to use. If not passed, falls back + to the ``client`` stored on the blob's bucket. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type content_type: str + :param content_type: Type of content being uploaded (or :data:`None`). + + :type size: int + :param size: The number of bytes to be uploaded (which will be read + from ``stream``). If not provided, the upload will be + concluded once ``stream`` is exhausted (or :data:`None`). + + :rtype: :class:`~requests.Response` + :returns: The "200 OK" response object returned after the multipart + upload request. + :raises: :exc:`ValueError` if ``size`` is not :data:`None` but the + ``stream`` has fewer than ``size`` bytes remaining. + """ + if size is None: + data = stream.read() + else: + data = stream.read(size) + if len(data) < size: + msg = _READ_LESS_THAN_SIZE.format(size, len(data)) + raise ValueError(msg) + + info = self._get_upload_arguments(client, content_type) + transport, headers, object_metadata, content_type = info + + upload_url = _MULTIPART_URL_TEMPLATE.format( + bucket_path=self.bucket.path) + upload = MultipartUpload(upload_url, headers=headers) + response = upload.transmit( + transport, data, object_metadata, content_type) + + return response + + def _do_resumable_upload(self, client, stream, content_type, size): + """Perform a resumable upload. + + Assumes ``chunk_size`` is not :data:`None` on the current blob. + + The content type of the upload will be determined in order + of precedence: + + - The value passed in to this method (if not :data:`None`) + - The value stored on the current blob + - The default value ('application/octet-stream') + + :type client: :class:`~google.cloud.storage.client.Client` + :param client: (Optional) The client to use. If not passed, falls back + to the ``client`` stored on the blob's bucket. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type content_type: str + :param content_type: Type of content being uploaded (or :data:`None`). + + :type size: int + :param size: The number of bytes to be uploaded (which will be read + from ``stream``). If not provided, the upload will be + concluded once ``stream`` is exhausted (or :data:`None`). + + :rtype: :class:`~requests.Response` + :returns: The "200 OK" response object returned after the final chunk + is uploaded. + """ + info = self._get_upload_arguments(client, content_type) + transport, headers, object_metadata, content_type = info + + upload_url = _RESUMABLE_URL_TEMPLATE.format( + bucket_path=self.bucket.path) + upload = ResumableUpload(upload_url, self.chunk_size, headers=headers) + upload.initiate( + transport, stream, object_metadata, content_type, + total_bytes=size, stream_final=False) + while not upload.finished: + response = upload.transmit_next_chunk(transport) + + return response + + def _do_upload(self, client, stream, content_type, size): + """Determine an upload strategy and then perform the upload. + + If the current blob has a ``chunk_size`` set, then a resumable upload + will be used, otherwise the content and the metadata will be uploaded + in a single multipart upload request. + + The content type of the upload will be determined in order + of precedence: + + - The value passed in to this method (if not :data:`None`) + - The value stored on the current blob + - The default value ('application/octet-stream') + + :type client: :class:`~google.cloud.storage.client.Client` + :param client: (Optional) The client to use. If not passed, falls back + to the ``client`` stored on the blob's bucket. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type content_type: str + :param content_type: Type of content being uploaded (or :data:`None`). + + :type size: int + :param size: The number of bytes to be uploaded (which will be read + from ``stream``). If not provided, the upload will be + concluded once ``stream`` is exhausted (or :data:`None`). + + :rtype: dict + :returns: The parsed JSON from the "200 OK" response. This will be the + **only** response in the multipart case and it will be the + **final** response in the resumable case. + """ + if self.chunk_size is None: + response = self._do_multipart_upload( + client, stream, content_type, size) + else: + response = self._do_resumable_upload( + client, stream, content_type, size) + + return response.json() + def upload_from_file(self, file_obj, rewind=False, size=None, - content_type=None, num_retries=6, client=None): + content_type=None, num_retries=None, client=None): """Upload the contents of this blob from a file-like object. The content type of the upload will be determined in order @@ -655,73 +887,33 @@ def upload_from_file(self, file_obj, rewind=False, size=None, writing the file to Cloud Storage. :type size: int - :param size: The number of bytes to read from the file handle. - If not provided, we'll try to guess the size using - :func:`os.fstat`. (If the file handle is not from the - filesystem this won't be possible.) + :param size: The number of bytes to be uploaded (which will be read + from ``file_obj``). If not provided, the upload will be + concluded once ``file_obj`` is exhausted. :type content_type: str :param content_type: Optional type of content being uploaded. :type num_retries: int - :param num_retries: Number of upload retries. Defaults to 6. + :param num_retries: Number of upload retries. (Deprecated.) - :type client: :class:`~google.cloud.storage.client.Client` or - ``NoneType`` - :param client: Optional. The client to use. If not passed, falls back + :type client: :class:`~google.cloud.storage.client.Client` + :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the blob's bucket. - :raises: :class:`ValueError` if size is not passed in and can not be - determined; :class:`google.cloud.exceptions.GoogleCloudError` + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the upload response returns an error status. """ - client = self._require_client(client) - # Use ``_base_connection`` rather ``_connection`` since the current - # connection may be a batch. A batch wraps a client's connection, - # but does not store the ``http`` object. The rest (API_BASE_URL and - # build_api_url) are also defined on the Batch class, but we just - # use the wrapped connection since it has all three (http, - # API_BASE_URL and build_api_url). - connection = client._base_connection + if num_retries is not None: + warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning) _maybe_rewind(file_obj, rewind=rewind) - # Get the basic stats about the file. - total_bytes = size - if total_bytes is None: - if hasattr(file_obj, 'fileno'): - try: - total_bytes = os.fstat(file_obj.fileno()).st_size - except (OSError, UnsupportedOperation): - pass # Assuming fd is not an actual file (maybe socket). - - chunk_size = None - strategy = None - if self.chunk_size is not None: - chunk_size = self.chunk_size - - if total_bytes is None: - strategy = RESUMABLE_UPLOAD - elif total_bytes is None: - raise ValueError('total bytes could not be determined. Please ' - 'pass an explicit size, or supply a chunk size ' - 'for a streaming transfer.') - - upload, request, _ = self._create_upload( - client, file_obj=file_obj, size=total_bytes, - content_type=content_type, chunk_size=chunk_size, - strategy=strategy) - - if upload.strategy == RESUMABLE_UPLOAD: - http_response = upload.stream_file(use_chunks=True) - else: - http_response = make_api_request( - connection.http, request, retries=num_retries) - - self._check_response_error(request, http_response) - response_content = http_response.content - - response_content = _bytes_to_unicode(response_content) - self._set_properties(json.loads(response_content)) + try: + created_json = self._do_upload( + client, file_obj, content_type, size) + self._set_properties(created_json) + except resumable_media.InvalidResponse as exc: + _raise_from_invalid_response(exc) def upload_from_filename(self, filename, content_type=None, client=None): """Upload this blob's contents from the content of a named file. @@ -751,16 +943,17 @@ def upload_from_filename(self, filename, content_type=None, client=None): :type content_type: str :param content_type: Optional type of content being uploaded. - :type client: :class:`~google.cloud.storage.client.Client` or - ``NoneType`` - :param client: Optional. The client to use. If not passed, falls back + :type client: :class:`~google.cloud.storage.client.Client` + :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the blob's bucket. """ content_type = self._get_content_type(content_type, filename=filename) with open(filename, 'rb') as file_obj: + total_bytes = os.fstat(file_obj.fileno()).st_size self.upload_from_file( - file_obj, content_type=content_type, client=client) + file_obj, content_type=content_type, client=client, + size=total_bytes) def upload_from_string(self, data, content_type='text/plain', client=None): """Upload contents of this blob from the provided string. @@ -1121,7 +1314,7 @@ def update_storage_class(self, new_class, client=None): :rtype: str or ``NoneType`` """ - content_type = _scalar_property(_CONTENT_TYPE) + content_type = _scalar_property(_CONTENT_TYPE_FIELD) """HTTP 'Content-Type' header for this object. See: https://tools.ietf.org/html/rfc2616#section-14.17 and @@ -1430,7 +1623,7 @@ def _quote(value): def _maybe_rewind(stream, rewind=False): """Rewind the stream if desired. - :type stream: IO[Bytes] + :type stream: IO[bytes] :param stream: A bytes IO object open for reading. :type rewind: bool @@ -1438,3 +1631,22 @@ def _maybe_rewind(stream, rewind=False): """ if rewind: stream.seek(0, os.SEEK_SET) + + +def _raise_from_invalid_response(error, error_info=None): + """Re-wrap and raise an ``InvalidResponse`` exception. + + :type error: :exc:`google.resumable_media.InvalidResponse` + :param error: A caught exception from the ``google-resumable-media`` + library. + + :type error_info: str + :param error_info: (Optional) Extra information about the failed request. + + :raises: :class:`~google.cloud.exceptions.GoogleCloudError` corresponding + to the failed status code + """ + response = error.response + faux_response = httplib2.Response({'status': response.status_code}) + raise make_exception(faux_response, response.content, + error_info=error_info, use_json=False) diff --git a/storage/setup.py b/storage/setup.py index b1531bf5ed6e..600c499df067 100644 --- a/storage/setup.py +++ b/storage/setup.py @@ -53,7 +53,7 @@ REQUIREMENTS = [ 'google-cloud-core >= 0.24.1, < 0.25dev', 'google-auth >= 1.0.0', - 'google-resumable-media == 0.0.2', + 'google-resumable-media == 0.0.5', 'requests >= 2.0.0', ] From 113f4bbc1b203437e7b0658af201370d28f173c1 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Wed, 3 May 2017 19:24:20 -0700 Subject: [PATCH 2/5] Adding unit tests for Blob.upload*() changes. In addition, switched over Blob.create_resumable_upload_session() to use google-resumable-media instead of using the vendored in `google.cloud.streaming` package. --- storage/google/cloud/storage/blob.py | 227 ++--- storage/setup.py | 2 +- storage/tests/unit/test_blob.py | 1333 ++++++++++++-------------- 3 files changed, 688 insertions(+), 874 deletions(-) diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index 7905559c47c9..457587cbb276 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -45,9 +45,6 @@ from google.cloud.storage._helpers import _PropertyMixin from google.cloud.storage._helpers import _scalar_property from google.cloud.storage.acl import ObjectACL -from google.cloud.streaming.http_wrapper import Request -from google.cloud.streaming.transfer import RESUMABLE_UPLOAD -from google.cloud.streaming.transfer import Upload _API_ACCESS_ENDPOINT = 'https://storage.googleapis.com' @@ -417,6 +414,7 @@ def _do_download(self, transport, file_obj, download_url, headers): else: download = ChunkedDownload( download_url, self.chunk_size, file_obj, headers=headers) + while not download.finished: download.consume_next_chunk(transport) @@ -527,119 +525,6 @@ def _get_content_type(self, content_type, filename=None): return content_type - def _create_upload( - self, client, file_obj=None, size=None, content_type=None, - chunk_size=None, strategy=None, extra_headers=None): - """Helper for upload methods. - - Creates a :class:`google.cloud.core.streaming.Upload` object to handle - the details of uploading a file to Cloud Storage. - - :type client: :class:`~google.cloud.storage.client.Client` or - ``NoneType`` - :param client: Optional. The client to use. If not passed, falls back - to the ``client`` stored on the blob's bucket. - - :type file_obj: file - :param file_obj: A file handle open for reading. - - :type size: int - :param size: The size of the upload, in bytes. - - :type content_type: str - :param content_type: Optional type of content being uploaded. - - :type chunk_size: int - :param chunk_size: The size of each chunk when doing resumable and - media uploads. - - :type strategy: str - :param strategy: Either - :attr:`google.cloud.core.streaming.transfer.SIMPLE_UPLOAD` or - :attr:`google.cloud.core.streaming.transfer.RESUMABLE_UPLOAD`. - - :type extra_headers: dict - :param extra_headers: Additional headers to be sent with the upload - initiation request. - - :rtype: Tuple[google.cloud.core.streaming.Upload, - google.cloud.core.streaming.Request, - google.cloud.core.streaming.Response] - :returns: The Upload object, the upload HTTP request, and the upload - initiation response. - """ - - client = self._require_client(client) - - # Use ``_base_connection`` rather ``_connection`` since the current - # connection may be a batch. A batch wraps a client's connection, - # but does not store the ``http`` object. The rest (API_BASE_URL and - # build_api_url) are also defined on the Batch class, but we just - # use the wrapped connection since it has all three (http, - # API_BASE_URL and build_api_url). - connection = client._base_connection - - content_type = self._get_content_type(content_type) - - headers = { - 'Accept': 'application/json', - 'Accept-Encoding': 'gzip, deflate', - 'User-Agent': connection.USER_AGENT, - } - - if extra_headers: - headers.update(extra_headers) - - headers.update(_get_encryption_headers(self._encryption_key)) - - # Use apitools' Upload functionality - upload = Upload( - file_obj, content_type, total_size=size, auto_transfer=False) - - if chunk_size is not None: - upload.chunksize = chunk_size - - if strategy is not None: - upload.strategy = RESUMABLE_UPLOAD - - url_builder = _UrlBuilder( - bucket_name=self.bucket.name, - object_name=self.name) - upload_config = _UploadConfig() - - # Temporary URL until strategy is determined. - base_url = connection.API_BASE_URL + '/upload' - upload_url = connection.build_api_url( - api_base_url=base_url, - path=self.bucket.path + '/o') - - # Configure the upload request parameters. - request = Request(upload_url, 'POST', headers) - upload.configure_request(upload_config, request, url_builder) - - # Configure final URL - query_params = url_builder.query_params - base_url = connection.API_BASE_URL + '/upload' - request.url = connection.build_api_url( - api_base_url=base_url, - path=self.bucket.path + '/o', - query_params=query_params) - - # Start the upload session - response = upload.initialize_upload(request, connection.http) - - return upload, request, response - - @staticmethod - def _check_response_error(request, http_response): - """Helper for :meth:`upload_from_file`.""" - info = http_response.info - status = int(info['status']) - if not 200 <= status < 300: - faux_response = httplib2.Response({'status': status}) - raise make_exception(faux_response, http_response.content, - error_info=request.url) - def _get_writable_metadata(self): """Get the object / blob metadata which is writable. @@ -757,8 +642,9 @@ def _do_multipart_upload(self, client, stream, content_type, size): return response - def _do_resumable_upload(self, client, stream, content_type, size): - """Perform a resumable upload. + def _initiate_resumable_upload(self, client, stream, content_type, + size, extra_headers=None): + """Initiate a resumable upload. Assumes ``chunk_size`` is not :data:`None` on the current blob. @@ -784,12 +670,22 @@ def _do_resumable_upload(self, client, stream, content_type, size): from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). - :rtype: :class:`~requests.Response` - :returns: The "200 OK" response object returned after the final chunk - is uploaded. + :type extra_headers: dict + :param extra_headers: (Optional) Extra headers to add to standard + headers. + + :rtype: tuple + :returns: + Pair of + + * The :class:`~google.resumable_media.requests.ResumableUpload` + that was created + * The ``transport`` used to initiate the upload. """ info = self._get_upload_arguments(client, content_type) transport, headers, object_metadata, content_type = info + if extra_headers is not None: + headers.update(extra_headers) upload_url = _RESUMABLE_URL_TEMPLATE.format( bucket_path=self.bucket.path) @@ -797,6 +693,43 @@ def _do_resumable_upload(self, client, stream, content_type, size): upload.initiate( transport, stream, object_metadata, content_type, total_bytes=size, stream_final=False) + + return upload, transport + + def _do_resumable_upload(self, client, stream, content_type, size): + """Perform a resumable upload. + + Assumes ``chunk_size`` is not :data:`None` on the current blob. + + The content type of the upload will be determined in order + of precedence: + + - The value passed in to this method (if not :data:`None`) + - The value stored on the current blob + - The default value ('application/octet-stream') + + :type client: :class:`~google.cloud.storage.client.Client` + :param client: (Optional) The client to use. If not passed, falls back + to the ``client`` stored on the blob's bucket. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type content_type: str + :param content_type: Type of content being uploaded (or :data:`None`). + + :type size: int + :param size: The number of bytes to be uploaded (which will be read + from ``stream``). If not provided, the upload will be + concluded once ``stream`` is exhausted (or :data:`None`). + + :rtype: :class:`~requests.Response` + :returns: The "200 OK" response object returned after the final chunk + is uploaded. + """ + upload, transport = self._initiate_resumable_upload( + client, stream, content_type, size) + while not upload.finished: response = upload.transmit_next_chunk(transport) @@ -1033,52 +966,54 @@ def create_resumable_upload_session( encryption#customer-supplied :type size: int - :param size: Optional, the maximum number of bytes that can be - uploaded using this session. If the size is not known when creating - the session, this should be left blank. + :param size: (Optional). The maximum number of bytes that can be + uploaded using this session. If the size is not known + when creating the session, this should be left blank. :type content_type: str - :param content_type: Optional type of content being uploaded. This can - be used to restrict the allowed file type that can be uploaded - to the size. + :param content_type: (Optional) Type of content being uploaded. :type origin: str - :param origin: Optional origin. If set, the upload can only be - completed by a user-agent that uploads from the given origin. This - can be useful when passing the session to a web client. + :param origin: (Optional) If set, the upload can only be completed + by a user-agent that uploads from the given origin. This + can be useful when passing the session to a web client. - :type client: :class:`~google.cloud.storage.client.Client` or - ``NoneType`` - :param client: Optional. The client to use. If not passed, falls back + :type client: :class:`~google.cloud.storage.client.Client` + :param client: (Optional) The client to use. If not passed, falls back to the ``client`` stored on the blob's bucket. :rtype: str :returns: The resumable upload session URL. The upload can be - completed by making an HTTP PUT request with the file's contents. + completed by making an HTTP PUT request with the + file's contents. :raises: :class:`google.cloud.exceptions.GoogleCloudError` if the session creation response returns an error status. """ - extra_headers = {} - if origin is not None: # This header is specifically for client-side uploads, it # determines the origins allowed for CORS. extra_headers['Origin'] = origin - _, _, start_response = self._create_upload( - client, - size=size, - content_type=content_type, - strategy=RESUMABLE_UPLOAD, - extra_headers=extra_headers) + curr_chunk_size = self.chunk_size + try: + # Temporarily patch the chunk size. A user should still be able + # to initiate an upload session without setting the chunk size. + # The chunk size only matters when **sending** bytes to an upload. + self.chunk_size = self._CHUNK_SIZE_MULTIPLE - # The location header contains the session URL. This can be used - # to continue the upload. - resumable_upload_session_url = start_response.info['location'] + dummy_stream = BytesIO(b'') + upload, _ = self._initiate_resumable_upload( + client, dummy_stream, content_type, size, + extra_headers=extra_headers) - return resumable_upload_session_url + return upload.resumable_url + except resumable_media.InvalidResponse as exc: + _raise_from_invalid_response(exc) + finally: + # Put back the original chunk size. + self.chunk_size = curr_chunk_size def get_iam_policy(self, client=None): """Retrieve the IAM policy for the object. diff --git a/storage/setup.py b/storage/setup.py index 600c499df067..88ebfcbe853e 100644 --- a/storage/setup.py +++ b/storage/setup.py @@ -53,7 +53,7 @@ REQUIREMENTS = [ 'google-cloud-core >= 0.24.1, < 0.25dev', 'google-auth >= 1.0.0', - 'google-resumable-media == 0.0.5', + 'google-resumable-media >= 0.1.0', 'requests >= 2.0.0', ] diff --git a/storage/tests/unit/test_blob.py b/storage/tests/unit/test_blob.py index f1d761d79b12..4e3e7c3eac17 100644 --- a/storage/tests/unit/test_blob.py +++ b/storage/tests/unit/test_blob.py @@ -12,10 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime +import io +import json import os import unittest import mock +from six.moves import http_client def _make_credentials(): @@ -291,10 +295,8 @@ def test_generate_signed_url_w_method_arg(self): self.assertEqual(SIGNER._signed, [(EXPECTED_ARGS, EXPECTED_KWARGS)]) def test_exists_miss(self): - from six.moves.http_client import NOT_FOUND - NONESUCH = 'nonesuch' - not_found_response = ({'status': NOT_FOUND}, b'') + not_found_response = ({'status': http_client.NOT_FOUND}, b'') connection = _Connection(not_found_response) client = _Client(connection) bucket = _Bucket(client) @@ -302,10 +304,8 @@ def test_exists_miss(self): self.assertFalse(blob.exists()) def test_exists_hit(self): - from six.moves.http_client import OK - BLOB_NAME = 'blob-name' - found_response = ({'status': OK}, b'') + found_response = ({'status': http_client.OK}, b'') connection = _Connection(found_response) client = _Client(connection) bucket = _Bucket(client) @@ -314,9 +314,8 @@ def test_exists_hit(self): self.assertTrue(blob.exists()) def test_delete(self): - from six.moves.http_client import NOT_FOUND BLOB_NAME = 'blob-name' - not_found_response = ({'status': NOT_FOUND}, b'') + not_found_response = ({'status': http_client.NOT_FOUND}, b'') connection = _Connection(not_found_response) client = _Client(connection) bucket = _Bucket(client) @@ -379,9 +378,7 @@ def _mock_requests_response(status_code, headers, content=b''): content=content, headers=headers, status_code=status_code, spec=['content', 'headers', 'status_code']) - def _mock_transport(self): - from six.moves import http_client - + def _mock_download_transport(self): fake_transport = mock.Mock(spec=['request']) # Give the transport two fake responses. chunk1_response = self._mock_requests_response( @@ -413,9 +410,6 @@ def _check_session_mocks(self, client, fake_session_factory, self.assertEqual(fake_transport.request.mock_calls, [call, call]) def test__do_download_simple(self): - from io import BytesIO - from six.moves import http_client - blob_name = 'blob-name' # Create a fake client/bucket and use them in the Blob() constructor. client = mock.Mock( @@ -431,7 +425,7 @@ def test__do_download_simple(self): http_client.OK, {'content-length': '6', 'content-range': 'bytes 0-5/6'}, content=b'abcdef') - file_obj = BytesIO() + file_obj = io.BytesIO() download_url = 'http://test.invalid' headers = {} blob._do_download(transport, file_obj, download_url, headers) @@ -442,8 +436,6 @@ def test__do_download_simple(self): 'GET', download_url, data=None, headers=headers) def test__do_download_chunked(self): - from io import BytesIO - blob_name = 'blob-name' # Create a fake client/bucket and use them in the Blob() constructor. client = mock.Mock( @@ -455,8 +447,8 @@ def test__do_download_chunked(self): blob._CHUNK_SIZE_MULTIPLE = 1 blob.chunk_size = 3 - transport = self._mock_transport() - file_obj = BytesIO() + transport = self._mock_download_transport() + file_obj = io.BytesIO() download_url = 'http://test.invalid' headers = {} blob._do_download(transport, file_obj, download_url, headers) @@ -473,8 +465,6 @@ def test__do_download_chunked(self): @mock.patch('google.auth.transport.requests.AuthorizedSession') def test_download_to_file_with_failure(self, fake_session_factory): - from io import BytesIO - from six.moves import http_client from google.cloud import exceptions blob_name = 'blob-name' @@ -494,7 +484,7 @@ def test_download_to_file_with_failure(self, fake_session_factory): # Set the media link on the blob blob._properties['mediaLink'] = 'http://test.invalid' - file_obj = BytesIO() + file_obj = io.BytesIO() with self.assertRaises(exceptions.NotFound): blob.download_to_file(file_obj) @@ -507,10 +497,8 @@ def test_download_to_file_with_failure(self, fake_session_factory): @mock.patch('google.auth.transport.requests.AuthorizedSession') def test_download_to_file_wo_media_link(self, fake_session_factory): - from io import BytesIO - blob_name = 'blob-name' - fake_session_factory.return_value = self._mock_transport() + fake_session_factory.return_value = self._mock_download_transport() # Create a fake client/bucket and use them in the Blob() constructor. client = mock.Mock( _credentials=_make_credentials(), spec=['_credentials']) @@ -520,7 +508,7 @@ def test_download_to_file_wo_media_link(self, fake_session_factory): blob._CHUNK_SIZE_MULTIPLE = 1 blob.chunk_size = 3 - file_obj = BytesIO() + file_obj = io.BytesIO() blob.download_to_file(file_obj) self.assertEqual(file_obj.getvalue(), b'abcdef') # Make sure the media link is still unknown. @@ -533,11 +521,8 @@ def test_download_to_file_wo_media_link(self, fake_session_factory): @mock.patch('google.auth.transport.requests.AuthorizedSession') def _download_to_file_helper(self, fake_session_factory, use_chunks=False): - from io import BytesIO - from six.moves.http_client import OK - blob_name = 'blob-name' - fake_transport = self._mock_transport() + fake_transport = self._mock_download_transport() fake_session_factory.return_value = fake_transport # Create a fake client/bucket and use them in the Blob() constructor. client = mock.Mock( @@ -553,12 +538,12 @@ def _download_to_file_helper(self, fake_session_factory, use_chunks=False): else: # Modify the response. single_chunk_response = self._mock_requests_response( - OK, + http_client.OK, {'content-length': '6', 'content-range': 'bytes 0-5/6'}, content=b'abcdef') fake_transport.request.side_effect = [single_chunk_response] - file_obj = BytesIO() + file_obj = io.BytesIO() blob.download_to_file(file_obj) self.assertEqual(file_obj.getvalue(), b'abcdef') @@ -582,7 +567,7 @@ def _download_to_filename_helper(self, fake_session_factory, updated=None): from google.cloud._testing import _NamedTemporaryFile blob_name = 'blob-name' - fake_session_factory.return_value = self._mock_transport() + fake_session_factory.return_value = self._mock_download_transport() # Create a fake client/bucket and use them in the Blob() constructor. client = mock.Mock( _credentials=_make_credentials(), spec=['_credentials']) @@ -629,7 +614,7 @@ def test_download_to_filename_w_key(self, fake_session_factory): from google.cloud._testing import _NamedTemporaryFile blob_name = 'blob-name' - fake_session_factory.return_value = self._mock_transport() + fake_session_factory.return_value = self._mock_download_transport() # Create a fake client/bucket and use them in the Blob() constructor. client = mock.Mock( _credentials=_make_credentials(), spec=['_credentials']) @@ -667,7 +652,7 @@ def test_download_to_filename_w_key(self, fake_session_factory): @mock.patch('google.auth.transport.requests.AuthorizedSession') def test_download_as_string(self, fake_session_factory): blob_name = 'blob-name' - fake_session_factory.return_value = self._mock_transport() + fake_session_factory.return_value = self._mock_download_transport() # Create a fake client/bucket and use them in the Blob() constructor. client = mock.Mock( _credentials=_make_credentials(), spec=['_credentials']) @@ -710,720 +695,597 @@ def test__get_content_type_default(self): return_value = blob._get_content_type(None) self.assertEqual(return_value, u'application/octet-stream') - def test_upload_from_file_size_failure(self): - BLOB_NAME = 'blob-name' - connection = _Connection() - client = _Client(connection) - bucket = _Bucket(client) - blob = self._make_one(BLOB_NAME, bucket=bucket) - file_obj = object() - with self.assertRaises(ValueError): - blob.upload_from_file(file_obj, size=None) - - def _upload_from_file_simple_test_helper(self, properties=None, - content_type_arg=None, - expected_content_type=None, - chunk_size=5, - status=None): - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud._testing import _NamedTemporaryFile + def test__get_writable_metadata_no_changes(self): + name = u'blob-name' + blob = self._make_one(name, bucket=None) - BLOB_NAME = 'blob-name' - DATA = b'ABCDEF' - if status is None: - status = OK - response = {'status': status} - connection = _Connection( - (response, b'{}'), - ) - client = _Client(connection) - bucket = _Bucket(client) - blob = self._make_one(BLOB_NAME, bucket=bucket, properties=properties) - blob._CHUNK_SIZE_MULTIPLE = 1 - blob.chunk_size = chunk_size + object_metadata = blob._get_writable_metadata() + expected = {'name': name} + self.assertEqual(object_metadata, expected) - with _NamedTemporaryFile() as temp: - with open(temp.name, 'wb') as file_obj: - file_obj.write(DATA) + def test__get_writable_metadata_with_changes(self): + name = u'blob-name' + blob = self._make_one(name, bucket=None) + blob.storage_class = 'NEARLINE' + blob.cache_control = 'max-age=3600' + blob.metadata = {'color': 'red'} + + object_metadata = blob._get_writable_metadata() + expected = { + 'cacheControl': blob.cache_control, + 'metadata': blob.metadata, + 'name': name, + 'storageClass': blob.storage_class, + } + self.assertEqual(object_metadata, expected) - with open(temp.name, 'rb') as file_obj: - blob.upload_from_file(file_obj, rewind=True, - content_type=content_type_arg) - - rq = connection.http._requested - self.assertEqual(len(rq), 1) - self.assertEqual(rq[0]['method'], 'POST') - uri = rq[0]['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/b/name/o') - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'media', 'name': BLOB_NAME}) - headers = { - x.title(): str(y) for x, y in rq[0]['headers'].items()} - self.assertEqual(headers['Content-Length'], '6') - self.assertEqual(headers['Content-Type'], expected_content_type) + def test__get_writable_metadata_unwritable_field(self): + name = u'blob-name' + properties = {'updated': '2016-10-16T18:18:18.181Z'} + blob = self._make_one(name, bucket=None, properties=properties) + # Fake that `updated` is in changes. + blob._changes.add('updated') - def test_upload_from_file_stream(self): - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud.streaming import http_wrapper + object_metadata = blob._get_writable_metadata() + expected = {'name': name} + self.assertEqual(object_metadata, expected) - BLOB_NAME = 'blob-name' - UPLOAD_URL = 'http://example.com/upload/name/key' - DATA = b'ABCDE' - loc_response = {'status': OK, 'location': UPLOAD_URL} - chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE, - 'range': 'bytes 0-4'} - chunk2_response = {'status': OK} - # Need valid JSON on last response, since resumable. - connection = _Connection( - (loc_response, b''), - (chunk1_response, b''), - (chunk2_response, b'{}'), - ) - client = _Client(connection) - bucket = _Bucket(client) - blob = self._make_one(BLOB_NAME, bucket=bucket) - blob._CHUNK_SIZE_MULTIPLE = 1 - blob.chunk_size = 5 + def test__get_upload_arguments(self): + name = u'blob-name' + key = b'[pXw@,p@@AfBfrR3x-2b2SCHR,.?YwRO' + blob = self._make_one(name, bucket=None, encryption_key=key) + blob._make_transport = mock.Mock(spec=[]) + blob.content_disposition = 'inline' + + client = mock.sentinel.mock + content_type = u'image/jpeg' + info = blob._get_upload_arguments(client, content_type) + + transport, headers, object_metadata, new_content_type = info + self.assertIs(transport, blob._make_transport.return_value) + header_key_value = 'W3BYd0AscEBAQWZCZnJSM3gtMmIyU0NIUiwuP1l3Uk8=' + header_key_hash_value = 'G0++dxF4q5rG4o9kE8gvEKn15RH6wLm0wXV1MgAlXOg=' + expected_headers = { + 'X-Goog-Encryption-Algorithm': 'AES256', + 'X-Goog-Encryption-Key': header_key_value, + 'X-Goog-Encryption-Key-Sha256': header_key_hash_value, + } + self.assertEqual(headers, expected_headers) + expected_metadata = { + 'contentDisposition': blob.content_disposition, + 'name': name, + } + self.assertEqual(object_metadata, expected_metadata) + self.assertEqual(new_content_type, content_type) - file_obj = _Stream(DATA) + blob._make_transport.assert_called_once_with(client) - # Mock stream closes at end of data, like a socket might - def is_stream_closed(stream): - if stream.tell() < len(DATA): - return stream._closed - else: - return stream.close() or True + def _mock_transport(self, status_code, headers, content=b''): + fake_transport = mock.Mock(spec=['request']) + fake_response = self._mock_requests_response( + status_code, headers, content=content) + fake_transport.request.return_value = fake_response + return fake_transport - _Stream.closed = property(is_stream_closed) + def _do_multipart_success(self, mock_get_boundary, size=None): + bucket = mock.Mock(path='/b/w00t', spec=[u'path']) + blob = self._make_one(u'blob-name', bucket=bucket) + self.assertIsNone(blob.chunk_size) - def fileno_mock(): - from io import UnsupportedOperation - raise UnsupportedOperation() + # Create mocks to be checked for doing transport. + fake_transport = self._mock_transport(http_client.OK, {}) + blob._make_transport = mock.Mock(return_value=fake_transport, spec=[]) + + # Create some mock arguments. + client = mock.sentinel.mock + data = b'data here hear hier' + stream = io.BytesIO(data) + content_type = u'application/xml' + response = blob._do_multipart_upload( + client, stream, content_type, size) + + # Check the mocks and the returned value. + self.assertIs(response, fake_transport.request.return_value) + if size is None: + data_read = data + self.assertEqual(stream.tell(), len(data)) + else: + data_read = data[:size] + self.assertEqual(stream.tell(), size) + + blob._make_transport.assert_called_once_with(client) + mock_get_boundary.assert_called_once_with() + + upload_url = ( + 'https://www.googleapis.com/upload/storage/v1' + + bucket.path + + '/o?uploadType=multipart') + payload = ( + b'--==0==\r\n' + + b'content-type: application/json; charset=UTF-8\r\n\r\n' + + b'{"name": "blob-name"}\r\n' + + b'--==0==\r\n' + + b'content-type: application/xml\r\n\r\n' + + data_read + + b'\r\n--==0==--') + headers = {'content-type': b'multipart/related; boundary="==0=="'} + fake_transport.request.assert_called_once_with( + 'POST', upload_url, data=payload, headers=headers) + + @mock.patch(u'google.resumable_media._upload.get_boundary', + return_value=b'==0==') + def test__do_multipart_upload_no_size(self, mock_get_boundary): + self._do_multipart_success(mock_get_boundary) + + @mock.patch(u'google.resumable_media._upload.get_boundary', + return_value=b'==0==') + def test__do_multipart_upload_with_size(self, mock_get_boundary): + self._do_multipart_success(mock_get_boundary, size=10) + + def test__do_multipart_upload_bad_size(self): + blob = self._make_one(u'blob-name', bucket=None) - file_obj.fileno = fileno_mock + data = b'data here hear hier' + stream = io.BytesIO(data) + size = 50 + self.assertGreater(size, len(data)) + + with self.assertRaises(ValueError) as exc_info: + blob._do_multipart_upload(None, stream, None, size) + + exc_contents = str(exc_info.exception) + self.assertIn( + 'was specified but the file-like object only had', exc_contents) + self.assertEqual(stream.tell(), len(data)) + + def _initiate_resumable_helper(self, size=None, extra_headers=None): + from google.resumable_media.requests import ResumableUpload + + bucket = mock.Mock(path='/b/whammy', spec=[u'path']) + blob = self._make_one(u'blob-name', bucket=bucket) + blob.metadata = {'rook': 'takes knight'} + blob.chunk_size = 3 * blob._CHUNK_SIZE_MULTIPLE + self.assertIsNotNone(blob.chunk_size) + + # Need to make sure **same** dict is used because ``json.dumps()`` + # will depend on the hash order. + object_metadata = blob._get_writable_metadata() + blob._get_writable_metadata = mock.Mock( + return_value=object_metadata, spec=[]) + + # Create mocks to be checked for doing transport. + resumable_url = 'http://test.invalid?upload_id=hey-you' + response_headers = {'location': resumable_url} + fake_transport = self._mock_transport( + http_client.OK, response_headers) + blob._make_transport = mock.Mock(return_value=fake_transport, spec=[]) + + # Create some mock arguments and call the method under test. + client = mock.sentinel.mock + data = b'hello hallo halo hi-low' + stream = io.BytesIO(data) + content_type = u'text/plain' + upload, transport = blob._initiate_resumable_upload( + client, stream, content_type, size, extra_headers=extra_headers) + + # Check the returned values. + self.assertIsInstance(upload, ResumableUpload) + upload_url = ( + 'https://www.googleapis.com/upload/storage/v1' + + bucket.path + + '/o?uploadType=resumable') + self.assertEqual(upload.upload_url, upload_url) + if extra_headers is None: + self.assertEqual(upload._headers, {}) + else: + self.assertEqual(upload._headers, extra_headers) + self.assertIsNot(upload._headers, extra_headers) + self.assertFalse(upload.finished) + self.assertEqual(upload._chunk_size, blob.chunk_size) + self.assertIs(upload._stream, stream) + if size is None: + self.assertIsNone(upload._total_bytes) + else: + self.assertEqual(upload._total_bytes, size) + self.assertEqual(upload._content_type, content_type) + self.assertEqual(upload.resumable_url, resumable_url) + self.assertIs(transport, fake_transport) + # Make sure we never read from the stream. + self.assertEqual(stream.tell(), 0) + + # Check the mocks. + blob._get_writable_metadata.assert_called_once_with() + blob._make_transport.assert_called_once_with(client) + payload = json.dumps(object_metadata).encode('utf-8') + expected_headers = { + 'content-type': 'application/json; charset=UTF-8', + 'x-upload-content-type': content_type, + } + if size is not None: + expected_headers['x-upload-content-length'] = str(size) + if extra_headers is not None: + expected_headers.update(extra_headers) + fake_transport.request.assert_called_once_with( + 'POST', upload_url, data=payload, headers=expected_headers) - blob.upload_from_file(file_obj) + def test__initiate_resumable_upload_no_size(self): + self._initiate_resumable_helper() - # Remove the temp property - delattr(_Stream, "closed") + def test__initiate_resumable_upload_with_size(self): + self._initiate_resumable_helper(size=10000) - rq = connection.http._requested - self.assertEqual(len(rq), 3) + def test__initiate_resumable_upload_with_extra_headers(self): + extra_headers = {'origin': 'http://not-in-kansas-anymore.invalid'} + self._initiate_resumable_helper(extra_headers=extra_headers) - # Requested[0] - headers = { - x.title(): str(y) for x, y in rq[0].pop('headers').items()} - self.assertEqual(headers['Content-Length'], '0') - self.assertEqual(headers['X-Upload-Content-Type'], - 'application/octet-stream') - - uri = rq[0].pop('uri') - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/b/name/o') - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'resumable', 'name': BLOB_NAME}) - self.assertEqual(rq[0], { - 'method': 'POST', - 'body': '', - 'connection_type': None, - 'redirections': 5, - }) + def _make_resumable_transport(self, headers1, headers2, + headers3, total_bytes): + from google import resumable_media - # Requested[1] - headers = { - x.title(): str(y) for x, y in rq[1].pop('headers').items()} - self.assertEqual(headers['Content-Range'], 'bytes 0-4/*') - self.assertEqual(rq[1], { - 'method': 'PUT', - 'uri': UPLOAD_URL, - 'body': DATA[:5], - 'connection_type': None, - 'redirections': 5, - }) - - # Requested[2] - headers = { - x.title(): str(y) for x, y in rq[2].pop('headers').items()} - self.assertEqual(headers['Content-Range'], 'bytes */5') - self.assertEqual(rq[2], { - 'method': 'PUT', - 'uri': UPLOAD_URL, - 'body': DATA[5:], - 'connection_type': None, - 'redirections': 5, - }) - - def test_upload_from_file_simple(self): - self._upload_from_file_simple_test_helper( - expected_content_type='application/octet-stream') - - def test_upload_from_file_simple_not_found(self): - from six.moves.http_client import NOT_FOUND - from google.cloud.exceptions import NotFound + fake_transport = mock.Mock(spec=['request']) - with self.assertRaises(NotFound): - self._upload_from_file_simple_test_helper(status=NOT_FOUND) - - def test_upload_from_file_simple_w_chunk_size_None(self): - self._upload_from_file_simple_test_helper( - expected_content_type='application/octet-stream', - chunk_size=None) - - def test_upload_from_file_simple_with_content_type(self): - EXPECTED_CONTENT_TYPE = 'foo/bar' - self._upload_from_file_simple_test_helper( - properties={'contentType': EXPECTED_CONTENT_TYPE}, - expected_content_type=EXPECTED_CONTENT_TYPE) - - def test_upload_from_file_simple_with_content_type_passed(self): - EXPECTED_CONTENT_TYPE = 'foo/bar' - self._upload_from_file_simple_test_helper( - content_type_arg=EXPECTED_CONTENT_TYPE, - expected_content_type=EXPECTED_CONTENT_TYPE) - - def test_upload_from_file_simple_both_content_type_sources(self): - EXPECTED_CONTENT_TYPE = 'foo/bar' - ALT_CONTENT_TYPE = 'foo/baz' - self._upload_from_file_simple_test_helper( - properties={'contentType': ALT_CONTENT_TYPE}, - content_type_arg=EXPECTED_CONTENT_TYPE, - expected_content_type=EXPECTED_CONTENT_TYPE) - - def test_upload_from_file_resumable(self): - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud._testing import _NamedTemporaryFile - from google.cloud.streaming import http_wrapper + fake_response1 = self._mock_requests_response( + http_client.OK, headers1) + fake_response2 = self._mock_requests_response( + resumable_media.PERMANENT_REDIRECT, headers2) + json_body = '{{"size": "{:d}"}}'.format(total_bytes) + fake_response3 = self._mock_requests_response( + http_client.OK, headers3, content=json_body) - BLOB_NAME = 'blob-name' - UPLOAD_URL = 'http://example.com/upload/name/key' - DATA = b'ABCDEF' - loc_response = {'status': OK, 'location': UPLOAD_URL} - chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE, - 'range': 'bytes 0-4'} - chunk2_response = {'status': OK} - # Need valid JSON on last response, since resumable. - connection = _Connection( - (loc_response, b''), - (chunk1_response, b''), - (chunk2_response, b'{}'), - ) - client = _Client(connection) - bucket = _Bucket(client) - blob = self._make_one(BLOB_NAME, bucket=bucket) - blob._CHUNK_SIZE_MULTIPLE = 1 - blob.chunk_size = 5 + responses = [fake_response1, fake_response2, fake_response3] + fake_transport.request.side_effect = responses + return fake_transport, responses - # Set the threshhold low enough that we force a resumable upload. - patch = mock.patch( - 'google.cloud.streaming.transfer.RESUMABLE_UPLOAD_THRESHOLD', - new=5) + @staticmethod + def _do_resumable_upload_call0(blob, content_type, size=None): + # First mock transport.request() does initiates upload. + upload_url = ( + 'https://www.googleapis.com/upload/storage/v1' + + blob.bucket.path + + '/o?uploadType=resumable') + expected_headers = { + 'content-type': 'application/json; charset=UTF-8', + 'x-upload-content-type': content_type, + } + if size is not None: + expected_headers['x-upload-content-length'] = str(size) + payload = json.dumps({'name': blob.name}).encode('utf-8') + return mock.call( + 'POST', upload_url, data=payload, headers=expected_headers) - with patch: - with _NamedTemporaryFile() as temp: - with open(temp.name, 'wb') as file_obj: - file_obj.write(DATA) - with open(temp.name, 'rb') as file_obj: - blob.upload_from_file(file_obj, rewind=True) + @staticmethod + def _do_resumable_upload_call1(blob, content_type, data, + resumable_url, size=None): + # Second mock transport.request() does sends first chunk. + if size is None: + content_range = 'bytes 0-{:d}/*'.format(blob.chunk_size - 1) + else: + content_range = 'bytes 0-{:d}/{:d}'.format( + blob.chunk_size - 1, size) - rq = connection.http._requested - self.assertEqual(len(rq), 3) + expected_headers = { + 'content-type': content_type, + 'content-range': content_range, + } + payload = data[:blob.chunk_size] + return mock.call( + 'PUT', resumable_url, data=payload, headers=expected_headers) - # Requested[0] - headers = { - x.title(): str(y) for x, y in rq[0].pop('headers').items()} - self.assertEqual(headers['X-Upload-Content-Length'], '6') - self.assertEqual(headers['X-Upload-Content-Type'], - 'application/octet-stream') - - uri = rq[0].pop('uri') - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/b/name/o') - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'resumable', 'name': BLOB_NAME}) - self.assertEqual(rq[0], { - 'method': 'POST', - 'body': '', - 'connection_type': None, - 'redirections': 5, - }) + @staticmethod + def _do_resumable_upload_call2(blob, content_type, data, + resumable_url, total_bytes): + # Third mock transport.request() does sends last chunk. + content_range = 'bytes {:d}-{:d}/{:d}'.format( + blob.chunk_size, total_bytes - 1, total_bytes) + expected_headers = { + 'content-type': content_type, + 'content-range': content_range, + } + payload = data[blob.chunk_size:] + return mock.call( + 'PUT', resumable_url, data=payload, headers=expected_headers) + + def _do_resumable_helper(self, use_size=False): + bucket = mock.Mock(path='/b/yesterday', spec=[u'path']) + blob = self._make_one(u'blob-name', bucket=bucket) + blob.chunk_size = blob._CHUNK_SIZE_MULTIPLE + self.assertIsNotNone(blob.chunk_size) + + # Data to be uploaded. + data = b'' + (b'A' * blob.chunk_size) + b'' + total_bytes = len(data) + if use_size: + size = total_bytes + else: + size = None + + # Create mocks to be checked for doing transport. + resumable_url = 'http://test.invalid?upload_id=and-then-there-was-1' + headers1 = {'location': resumable_url} + headers2 = {'range': 'bytes=0-{:d}'.format(blob.chunk_size - 1)} + fake_transport, responses = self._make_resumable_transport( + headers1, headers2, {}, total_bytes) + blob._make_transport = mock.Mock(return_value=fake_transport, spec=[]) + + # Create some mock arguments and call the method under test. + client = mock.sentinel.mock + stream = io.BytesIO(data) + content_type = u'text/html' + response = blob._do_resumable_upload( + client, stream, content_type, size) + + # Check the returned values. + self.assertIs(response, responses[2]) + self.assertEqual(stream.tell(), total_bytes) + + # Check the mocks. + blob._make_transport.assert_called_once_with(client) + call0 = self._do_resumable_upload_call0(blob, content_type, size=size) + call1 = self._do_resumable_upload_call1( + blob, content_type, data, resumable_url, size=size) + call2 = self._do_resumable_upload_call2( + blob, content_type, data, resumable_url, total_bytes) + self.assertEqual( + fake_transport.request.mock_calls, [call0, call1, call2]) - # Requested[1] - headers = { - x.title(): str(y) for x, y in rq[1].pop('headers').items()} - self.assertEqual(headers['Content-Range'], 'bytes 0-4/6') - self.assertEqual(rq[1], { - 'method': 'PUT', - 'uri': UPLOAD_URL, - 'body': DATA[:5], - 'connection_type': None, - 'redirections': 5, - }) - - # Requested[2] - headers = { - x.title(): str(y) for x, y in rq[2].pop('headers').items()} - self.assertEqual(headers['Content-Range'], 'bytes 5-5/6') - self.assertEqual(rq[2], { - 'method': 'PUT', - 'uri': UPLOAD_URL, - 'body': DATA[5:], - 'connection_type': None, - 'redirections': 5, - }) - - def test_upload_from_file_resumable_w_error(self): - from six.moves.http_client import NOT_FOUND - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud._testing import _NamedTemporaryFile - from google.cloud.streaming.exceptions import HttpError + def test__do_resumable_upload_no_size(self): + self._do_resumable_helper() - BLOB_NAME = 'blob-name' - DATA = b'ABCDEF' - loc_response = {'status': NOT_FOUND} - connection = _Connection( - (loc_response, b'{"error": "no such bucket"}'), - ) - client = _Client(connection) - bucket = _Bucket(client) - blob = self._make_one(BLOB_NAME, bucket=bucket) - blob._CHUNK_SIZE_MULTIPLE = 1 - blob.chunk_size = 5 + def test__do_resumable_upload_with_size(self): + self._do_resumable_helper(use_size=True) - # Set the threshhold low enough that we force a resumable upload. - patch = mock.patch( - 'google.cloud.streaming.transfer.RESUMABLE_UPLOAD_THRESHOLD', - new=5) + def _do_upload_helper(self, chunk_size=None): + blob = self._make_one(u'blob-name', bucket=None) - with patch: - with _NamedTemporaryFile() as temp: - with open(temp.name, 'wb') as file_obj: - file_obj.write(DATA) - with open(temp.name, 'rb') as file_obj: - with self.assertRaises(HttpError): - blob.upload_from_file(file_obj, rewind=True) + # Create a fake response. + response = mock.Mock(spec=[u'json']) + response.json.return_value = mock.sentinel.json + # Mock **both** helpers. + blob._do_multipart_upload = mock.Mock(return_value=response, spec=[]) + blob._do_resumable_upload = mock.Mock(return_value=response, spec=[]) - rq = connection.http._requested - self.assertEqual(len(rq), 1) + if chunk_size is None: + self.assertIsNone(blob.chunk_size) + else: + blob.chunk_size = chunk_size + self.assertIsNotNone(blob.chunk_size) + + client = mock.sentinel.client + stream = mock.sentinel.stream + content_type = u'video/mp4' + size = 12345654321 + + # Make the request and check the mocks. + created_json = blob._do_upload(client, stream, content_type, size) + self.assertIs(created_json, mock.sentinel.json) + response.json.assert_called_once_with() + if chunk_size is None: + blob._do_multipart_upload.assert_called_once_with( + client, stream, content_type, size) + blob._do_resumable_upload.assert_not_called() + else: + blob._do_multipart_upload.assert_not_called() + blob._do_resumable_upload.assert_called_once_with( + client, stream, content_type, size) - # Requested[0] - headers = { - x.title(): str(y) for x, y in rq[0].pop('headers').items()} - self.assertEqual(headers['X-Upload-Content-Length'], '6') - self.assertEqual(headers['X-Upload-Content-Type'], - 'application/octet-stream') - - uri = rq[0].pop('uri') - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/b/name/o') - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'resumable', 'name': BLOB_NAME}) - self.assertEqual(rq[0], { - 'method': 'POST', - 'body': '', - 'connection_type': None, - 'redirections': 5, - }) - - def test_upload_from_file_w_slash_in_name(self): - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud._testing import _NamedTemporaryFile - from google.cloud.streaming import http_wrapper + def test__do_upload_without_chunk_size(self): + self._do_upload_helper() - BLOB_NAME = 'parent/child' - UPLOAD_URL = 'http://example.com/upload/name/parent%2Fchild' - DATA = b'ABCDEF' - loc_response = {'status': OK, 'location': UPLOAD_URL} - chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE, - 'range': 'bytes 0-4'} - chunk2_response = {'status': OK} - connection = _Connection( - (loc_response, '{}'), - (chunk1_response, ''), - (chunk2_response, ''), - ) - client = _Client(connection) - bucket = _Bucket(client) - blob = self._make_one(BLOB_NAME, bucket=bucket) - blob._CHUNK_SIZE_MULTIPLE = 1 - blob.chunk_size = 5 + def test__do_upload_with_chunk_size(self): + chunk_size = 1024 * 1024 * 1024 # 1GB + self._do_upload_helper(chunk_size=chunk_size) - with _NamedTemporaryFile() as temp: - with open(temp.name, 'wb') as file_obj: - file_obj.write(DATA) - with open(temp.name, 'rb') as file_obj: - blob.upload_from_file(file_obj, rewind=True) - self.assertEqual(file_obj.tell(), len(DATA)) - - rq = connection.http._requested - self.assertEqual(len(rq), 1) - self.assertEqual(rq[0]['redirections'], 5) - self.assertEqual(rq[0]['body'], DATA) - self.assertIsNone(rq[0]['connection_type']) - self.assertEqual(rq[0]['method'], 'POST') - uri = rq[0]['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/b/name/o') - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'media', 'name': 'parent/child'}) - headers = { - x.title(): str(y) for x, y in rq[0]['headers'].items()} - self.assertEqual(headers['Content-Length'], '6') - self.assertEqual(headers['Content-Type'], 'application/octet-stream') + def _upload_from_file_helper(self, side_effect=None, **kwargs): + from google.cloud._helpers import UTC - def test_upload_from_filename_w_key(self): - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud._testing import _NamedTemporaryFile - from google.cloud.streaming import http_wrapper + blob = self._make_one('blob-name', bucket=None) - BLOB_NAME = 'blob-name' - UPLOAD_URL = 'http://example.com/upload/name/key' - DATA = b'ABCDEF' - KEY = b'aa426195405adee2c8081bb9e7e74b19' - HEADER_KEY_VALUE = 'YWE0MjYxOTU0MDVhZGVlMmM4MDgxYmI5ZTdlNzRiMTk=' - HEADER_KEY_HASH_VALUE = 'V3Kwe46nKc3xLv96+iJ707YfZfFvlObta8TQcx2gpm0=' - EXPECTED_CONTENT_TYPE = 'foo/bar' - properties = {'contentType': EXPECTED_CONTENT_TYPE} - loc_response = {'status': OK, 'location': UPLOAD_URL} - chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE, - 'range': 'bytes 0-4'} - chunk2_response = {'status': OK} - connection = _Connection( - (loc_response, '{}'), - (chunk1_response, ''), - (chunk2_response, ''), - ) - client = _Client(connection) - bucket = _Bucket(client) - blob = self._make_one(BLOB_NAME, bucket=bucket, - properties=properties, encryption_key=KEY) - blob._CHUNK_SIZE_MULTIPLE = 1 - blob.chunk_size = 5 + # Mock low-level upload helper on blob (it is tested elsewhere). + created_json = {'updated': '2017-01-01T09:09:09.081Z'} + blob._do_upload = mock.Mock(return_value=created_json, spec=[]) + if side_effect is not None: + blob._do_upload.side_effect = side_effect + # Make sure `updated` is empty before the request. + self.assertIsNone(blob.updated) - with _NamedTemporaryFile(suffix='.jpeg') as temp: - with open(temp.name, 'wb') as file_obj: - file_obj.write(DATA) - blob.upload_from_filename(temp.name, - content_type=EXPECTED_CONTENT_TYPE) - - rq = connection.http._requested - self.assertEqual(len(rq), 1) - self.assertEqual(rq[0]['method'], 'POST') - uri = rq[0]['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/b/name/o') - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'media', 'name': BLOB_NAME}) - headers = { - x.title(): str(y) for x, y in rq[0]['headers'].items()} - self.assertEqual(headers['X-Goog-Encryption-Algorithm'], 'AES256') - self.assertEqual(headers['X-Goog-Encryption-Key'], HEADER_KEY_VALUE) - self.assertEqual(headers['X-Goog-Encryption-Key-Sha256'], - HEADER_KEY_HASH_VALUE) - self.assertEqual(headers['Content-Length'], '6') - self.assertEqual(headers['Content-Type'], 'foo/bar') - - def _upload_from_filename_test_helper(self, properties=None, - content_type_arg=None, - expected_content_type=None): - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud._testing import _NamedTemporaryFile - from google.cloud.streaming import http_wrapper + data = b'data is here' + stream = io.BytesIO(data) + stream.seek(2) # Not at zero. + content_type = u'font/woff' + client = mock.sentinel.client + ret_val = blob.upload_from_file( + stream, size=len(data), content_type=content_type, + client=client, **kwargs) - BLOB_NAME = 'blob-name' - UPLOAD_URL = 'http://example.com/upload/name/key' - DATA = b'ABCDEF' - loc_response = {'status': OK, 'location': UPLOAD_URL} - chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE, - 'range': 'bytes 0-4'} - chunk2_response = {'status': OK} - connection = _Connection( - (loc_response, '{}'), - (chunk1_response, ''), - (chunk2_response, ''), - ) - client = _Client(connection) - bucket = _Bucket(client) - blob = self._make_one(BLOB_NAME, bucket=bucket, - properties=properties) - blob._CHUNK_SIZE_MULTIPLE = 1 - blob.chunk_size = 5 + # Check the response and side-effects. + self.assertIsNone(ret_val) + new_updated = datetime.datetime( + 2017, 1, 1, 9, 9, 9, 81000, tzinfo=UTC) + self.assertEqual(blob.updated, new_updated) - with _NamedTemporaryFile(suffix='.jpeg') as temp: - with open(temp.name, 'wb') as file_obj: - file_obj.write(DATA) - blob.upload_from_filename(temp.name, - content_type=content_type_arg) - - rq = connection.http._requested - self.assertEqual(len(rq), 1) - self.assertEqual(rq[0]['method'], 'POST') - uri = rq[0]['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/b/name/o') - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'media', 'name': BLOB_NAME}) - headers = { - x.title(): str(y) for x, y in rq[0]['headers'].items()} - self.assertEqual(headers['Content-Length'], '6') - self.assertEqual(headers['Content-Type'], expected_content_type) + # Check the mock. + blob._do_upload.assert_called_once_with( + client, stream, content_type, len(data)) + + return stream + + def test_upload_from_file_success(self): + stream = self._upload_from_file_helper() + assert stream.tell() == 2 + + @mock.patch('warnings.warn') + def test_upload_from_file_with_retries(self, mock_warn): + from google.cloud.storage import blob as blob_module + + self._upload_from_file_helper(num_retries=20) + mock_warn.assert_called_once_with( + blob_module._NUM_RETRIES_MESSAGE, DeprecationWarning) + + def test_upload_from_file_with_rewind(self): + stream = self._upload_from_file_helper(rewind=True) + assert stream.tell() == 0 + + def test_upload_from_file_failure(self): + from google.resumable_media import InvalidResponse + from google.cloud import exceptions + + message = u'Someone is already in this spot.' + response = mock.Mock( + content=message, status_code=http_client.CONFLICT, + spec=[u'content', u'status_code']) + side_effect = InvalidResponse(response) + + with self.assertRaises(exceptions.Conflict) as exc_info: + self._upload_from_file_helper(side_effect=side_effect) + + self.assertEqual(exc_info.exception.message, message) + self.assertEqual(exc_info.exception.errors, []) + + def _do_upload_mock_call_helper(self, blob, client, content_type, size): + self.assertEqual(blob._do_upload.call_count, 1) + mock_call = blob._do_upload.mock_calls[0] + call_name, pos_args, kwargs = mock_call + self.assertEqual(call_name, '') + self.assertEqual(len(pos_args), 4) + self.assertEqual(pos_args[0], client) + self.assertEqual(pos_args[2], content_type) + self.assertEqual(pos_args[3], size) + self.assertEqual(kwargs, {}) + + return pos_args[1] def test_upload_from_filename(self): - self._upload_from_filename_test_helper( - expected_content_type='image/jpeg') - - def test_upload_from_filename_with_content_type(self): - EXPECTED_CONTENT_TYPE = 'foo/bar' - self._upload_from_filename_test_helper( - properties={'contentType': EXPECTED_CONTENT_TYPE}, - expected_content_type=EXPECTED_CONTENT_TYPE) - - def test_upload_from_filename_with_content_type_passed(self): - EXPECTED_CONTENT_TYPE = 'foo/bar' - self._upload_from_filename_test_helper( - content_type_arg=EXPECTED_CONTENT_TYPE, - expected_content_type=EXPECTED_CONTENT_TYPE) - - def test_upload_from_filename_both_content_type_sources(self): - EXPECTED_CONTENT_TYPE = 'foo/bar' - ALT_CONTENT_TYPE = 'foo/baz' - self._upload_from_filename_test_helper( - properties={'contentType': ALT_CONTENT_TYPE}, - content_type_arg=EXPECTED_CONTENT_TYPE, - expected_content_type=EXPECTED_CONTENT_TYPE) + from google.cloud._testing import _NamedTemporaryFile - def test_upload_from_string_w_bytes(self): - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud.streaming import http_wrapper + blob = self._make_one('blob-name', bucket=None) + # Mock low-level upload helper on blob (it is tested elsewhere). + created_json = {'metadata': {'mint': 'ice-cream'}} + blob._do_upload = mock.Mock(return_value=created_json, spec=[]) + # Make sure `metadata` is empty before the request. + self.assertIsNone(blob.metadata) - BLOB_NAME = 'blob-name' - UPLOAD_URL = 'http://example.com/upload/name/key' - DATA = b'ABCDEF' - loc_response = {'status': OK, 'location': UPLOAD_URL} - chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE, - 'range': 'bytes 0-4'} - chunk2_response = {'status': OK} - connection = _Connection( - (loc_response, '{}'), - (chunk1_response, ''), - (chunk2_response, ''), - ) - client = _Client(connection) - bucket = _Bucket(client) - blob = self._make_one(BLOB_NAME, bucket=bucket) - blob._CHUNK_SIZE_MULTIPLE = 1 - blob.chunk_size = 5 - blob.upload_from_string(DATA) - rq = connection.http._requested - self.assertEqual(len(rq), 1) - self.assertEqual(rq[0]['method'], 'POST') - uri = rq[0]['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/b/name/o') - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'media', 'name': BLOB_NAME}) - headers = { - x.title(): str(y) for x, y in rq[0]['headers'].items()} - self.assertEqual(headers['Content-Length'], '6') - self.assertEqual(headers['Content-Type'], 'text/plain') - self.assertEqual(rq[0]['body'], DATA) + data = b'soooo much data' + content_type = u'image/svg+xml' + client = mock.sentinel.client + with _NamedTemporaryFile() as temp: + with open(temp.name, 'wb') as file_obj: + file_obj.write(data) - def test_upload_from_string_w_text(self): - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud.streaming import http_wrapper + ret_val = blob.upload_from_filename( + temp.name, content_type=content_type, client=client) - BLOB_NAME = 'blob-name' - UPLOAD_URL = 'http://example.com/upload/name/key' - DATA = u'ABCDEF\u1234' - ENCODED = DATA.encode('utf-8') - loc_response = {'status': OK, 'location': UPLOAD_URL} - chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE, - 'range': 'bytes 0-4'} - chunk2_response = {'status': OK} - connection = _Connection( - (loc_response, '{}'), - (chunk1_response, ''), - (chunk2_response, ''), - ) - client = _Client(connection) - bucket = _Bucket(client=client) - blob = self._make_one(BLOB_NAME, bucket=bucket) - blob._CHUNK_SIZE_MULTIPLE = 1 - blob.chunk_size = 5 - blob.upload_from_string(DATA) - rq = connection.http._requested - self.assertEqual(len(rq), 1) - self.assertEqual(rq[0]['method'], 'POST') - uri = rq[0]['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/b/name/o') - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'media', 'name': BLOB_NAME}) - headers = { - x.title(): str(y) for x, y in rq[0]['headers'].items()} - self.assertEqual(headers['Content-Length'], str(len(ENCODED))) - self.assertEqual(headers['Content-Type'], 'text/plain') - self.assertEqual(rq[0]['body'], ENCODED) - - def test_upload_from_string_text_w_key(self): - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit - from google.cloud.streaming import http_wrapper + # Check the response and side-effects. + self.assertIsNone(ret_val) + self.assertEqual(blob.metadata, created_json['metadata']) - BLOB_NAME = 'blob-name' - KEY = b'aa426195405adee2c8081bb9e7e74b19' - HEADER_KEY_VALUE = 'YWE0MjYxOTU0MDVhZGVlMmM4MDgxYmI5ZTdlNzRiMTk=' - HEADER_KEY_HASH_VALUE = 'V3Kwe46nKc3xLv96+iJ707YfZfFvlObta8TQcx2gpm0=' - UPLOAD_URL = 'http://example.com/upload/name/key' - DATA = u'ABCDEF\u1234' - ENCODED = DATA.encode('utf-8') - loc_response = {'status': OK, 'location': UPLOAD_URL} - chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE, - 'range': 'bytes 0-4'} - chunk2_response = {'status': OK} - connection = _Connection( - (loc_response, '{}'), - (chunk1_response, ''), - (chunk2_response, ''), - ) - client = _Client(connection) - bucket = _Bucket(client=client) - blob = self._make_one(BLOB_NAME, bucket=bucket, encryption_key=KEY) - blob._CHUNK_SIZE_MULTIPLE = 1 - blob.chunk_size = 5 - blob.upload_from_string(DATA) - rq = connection.http._requested - self.assertEqual(len(rq), 1) - self.assertEqual(rq[0]['method'], 'POST') - uri = rq[0]['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/b/name/o') - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'media', 'name': BLOB_NAME}) - headers = { - x.title(): str(y) for x, y in rq[0]['headers'].items()} + # Check the mock. + stream = self._do_upload_mock_call_helper( + blob, client, content_type, len(data)) + self.assertTrue(stream.closed) + self.assertEqual(stream.mode, 'rb') + self.assertEqual(stream.name, temp.name) - self.assertEqual(headers['X-Goog-Encryption-Algorithm'], 'AES256') - self.assertEqual(headers['X-Goog-Encryption-Key'], HEADER_KEY_VALUE) - self.assertEqual(headers['X-Goog-Encryption-Key-Sha256'], - HEADER_KEY_HASH_VALUE) - self.assertEqual(headers['Content-Length'], str(len(ENCODED))) - self.assertEqual(headers['Content-Type'], 'text/plain') - self.assertEqual(rq[0]['body'], ENCODED) + def _upload_from_string_helper(self, data, **kwargs): + from google.cloud._helpers import _to_bytes - def test_create_resumable_upload_session(self): - from six.moves.http_client import OK - from six.moves.urllib.parse import parse_qsl - from six.moves.urllib.parse import urlsplit + blob = self._make_one('blob-name', bucket=None) - BLOB_NAME = 'blob-name' - UPLOAD_URL = 'http://example.com/upload/name/key' - loc_response = {'status': OK, 'location': UPLOAD_URL} - connection = _Connection( - (loc_response, '{}'), - ) - client = _Client(connection) - bucket = _Bucket(client=client) - blob = self._make_one(BLOB_NAME, bucket=bucket) + # Mock low-level upload helper on blob (it is tested elsewhere). + created_json = {'componentCount': '5'} + blob._do_upload = mock.Mock(return_value=created_json, spec=[]) + # Make sure `metadata` is empty before the request. + self.assertIsNone(blob.component_count) - resumable_url = blob.create_resumable_upload_session() + client = mock.sentinel.client + ret_val = blob.upload_from_string(data, client=client, **kwargs) - self.assertEqual(resumable_url, UPLOAD_URL) + # Check the response and side-effects. + self.assertIsNone(ret_val) + self.assertEqual(blob.component_count, 5) - rq = connection.http._requested - self.assertEqual(len(rq), 1) - self.assertEqual(rq[0]['method'], 'POST') + # Check the mock. + payload = _to_bytes(data, encoding='utf-8') + stream = self._do_upload_mock_call_helper( + blob, client, 'text/plain', len(payload)) + self.assertIsInstance(stream, io.BytesIO) + self.assertEqual(stream.getvalue(), payload) - uri = rq[0]['uri'] - scheme, netloc, path, qs, _ = urlsplit(uri) - self.assertEqual(scheme, 'http') - self.assertEqual(netloc, 'example.com') - self.assertEqual(path, '/b/name/o') - self.assertEqual(dict(parse_qsl(qs)), - {'uploadType': 'resumable', 'name': BLOB_NAME}) - headers = { - key.title(): str(value) for key, value in rq[0]['headers'].items()} - self.assertEqual(headers['Content-Length'], '0') - self.assertEqual( - headers['X-Upload-Content-Type'], 'application/octet-stream') + def test_upload_from_string_w_bytes(self): + data = b'XB]jb\xb8tad\xe0' + self._upload_from_string_helper(data) - def test_create_resumable_upload_session_args(self): - from six.moves.http_client import OK + def test_upload_from_string_w_text(self): + data = u'\N{snowman} \N{sailboat}' + self._upload_from_string_helper(data) + + def _create_resumable_upload_session_helper(self, origin=None, + side_effect=None): + bucket = mock.Mock(path='/b/alex-trebek', spec=[u'path']) + blob = self._make_one('blob-name', bucket=bucket) + blob.chunk_size = 99 * blob._CHUNK_SIZE_MULTIPLE + + # Create mocks to be checked for doing transport. + resumable_url = 'http://test.invalid?upload_id=clean-up-everybody' + response_headers = {'location': resumable_url} + fake_transport = self._mock_transport( + http_client.OK, response_headers) + blob._make_transport = mock.Mock(return_value=fake_transport, spec=[]) + if side_effect is not None: + fake_transport.request.side_effect = side_effect + + # Create some mock arguments and call the method under test. + content_type = u'text/plain' + size = 10000 + client = mock.sentinel.mock + new_url = blob.create_resumable_upload_session( + content_type=content_type, size=size, + origin=origin, client=client) + + # Check the returned value. + self.assertEqual(new_url, resumable_url) + + # Check the mocks. + blob._make_transport.assert_called_once_with(client) + upload_url = ( + 'https://www.googleapis.com/upload/storage/v1' + + bucket.path + + '/o?uploadType=resumable') + payload = b'{"name": "blob-name"}' + expected_headers = { + 'content-type': 'application/json; charset=UTF-8', + 'x-upload-content-length': str(size), + 'x-upload-content-type': content_type, + } + if origin is not None: + expected_headers['Origin'] = origin + fake_transport.request.assert_called_once_with( + 'POST', upload_url, data=payload, headers=expected_headers) - BLOB_NAME = 'blob-name' - UPLOAD_URL = 'http://example.com/upload/name/key' - CONTENT_TYPE = 'text/plain' - SIZE = 1024 - ORIGIN = 'http://google.com' - - loc_response = {'status': OK, 'location': UPLOAD_URL} - connection = _Connection( - (loc_response, '{}'), - ) - client = _Client(connection) - bucket = _Bucket(client=client) - blob = self._make_one(BLOB_NAME, bucket=bucket) + def test_create_resumable_upload_session(self): + self._create_resumable_upload_session_helper() - resumable_url = blob.create_resumable_upload_session( - content_type=CONTENT_TYPE, - size=SIZE, - origin=ORIGIN) + def test_create_resumable_upload_session_with_origin(self): + self._create_resumable_upload_session_helper( + origin='http://google.com') - self.assertEqual(resumable_url, UPLOAD_URL) + def test_create_resumable_upload_session_with_failure(self): + from google.resumable_media import InvalidResponse + from google.cloud import exceptions - rq = connection.http._requested - self.assertEqual(len(rq), 1) - self.assertEqual(rq[0]['method'], 'POST') + message = u'5-oh-3 woe is me.' + response = mock.Mock( + content=message, status_code=http_client.SERVICE_UNAVAILABLE, + spec=[u'content', u'status_code']) + side_effect = InvalidResponse(response) - headers = { - key.title(): str(value) for key, value in rq[0]['headers'].items()} - self.assertEqual(headers['Content-Length'], '0') - self.assertEqual(headers['X-Upload-Content-Length'], str(SIZE)) - self.assertEqual( - headers['X-Upload-Content-Type'], 'text/plain') - self.assertEqual( - headers['Origin'], ORIGIN) + with self.assertRaises(exceptions.ServiceUnavailable) as exc_info: + self._create_resumable_upload_session_helper( + side_effect=side_effect) + + self.assertEqual(exc_info.exception.message, message) + self.assertEqual(exc_info.exception.errors, []) def test_get_iam_policy(self): - from six.moves.http_client import OK from google.cloud.storage.iam import STORAGE_OWNER_ROLE from google.cloud.storage.iam import STORAGE_EDITOR_ROLE from google.cloud.storage.iam import STORAGE_VIEWER_ROLE @@ -1449,7 +1311,7 @@ def test_get_iam_policy(self): {'role': STORAGE_VIEWER_ROLE, 'members': [VIEWER1, VIEWER2]}, ], } - after = ({'status': OK}, RETURNED) + after = ({'status': http_client.OK}, RETURNED) EXPECTED = { binding['role']: set(binding['members']) for binding in RETURNED['bindings']} @@ -1472,7 +1334,6 @@ def test_get_iam_policy(self): def test_set_iam_policy(self): import operator - from six.moves.http_client import OK from google.cloud.storage.iam import STORAGE_OWNER_ROLE from google.cloud.storage.iam import STORAGE_EDITOR_ROLE from google.cloud.storage.iam import STORAGE_VIEWER_ROLE @@ -1498,7 +1359,7 @@ def test_set_iam_policy(self): 'version': VERSION, 'bindings': BINDINGS, } - after = ({'status': OK}, RETURNED) + after = ({'status': http_client.OK}, RETURNED) policy = Policy() for binding in BINDINGS: policy[binding['role']] = binding['members'] @@ -1530,7 +1391,6 @@ def test_set_iam_policy(self): sorted(found['members']), sorted(expected['members'])) def test_test_iam_permissions(self): - from six.moves.http_client import OK from google.cloud.storage.iam import STORAGE_OBJECTS_LIST from google.cloud.storage.iam import STORAGE_BUCKETS_GET from google.cloud.storage.iam import STORAGE_BUCKETS_UPDATE @@ -1544,7 +1404,7 @@ def test_test_iam_permissions(self): ] ALLOWED = PERMISSIONS[1:] RETURNED = {'permissions': ALLOWED} - after = ({'status': OK}, RETURNED) + after = ({'status': http_client.OK}, RETURNED) connection = _Connection(after) client = _Client(connection) bucket = _Bucket(client=client) @@ -1561,12 +1421,11 @@ def test_test_iam_permissions(self): self.assertEqual(kw[0]['query_params'], {'permissions': PERMISSIONS}) def test_make_public(self): - from six.moves.http_client import OK from google.cloud.storage.acl import _ACLEntity BLOB_NAME = 'blob-name' permissive = [{'entity': 'allUsers', 'role': _ACLEntity.READER_ROLE}] - after = ({'status': OK}, {'acl': permissive}) + after = ({'status': http_client.OK}, {'acl': permissive}) connection = _Connection(after) client = _Client(connection) bucket = _Bucket(client=client) @@ -1596,15 +1455,13 @@ def test_compose_wo_content_type_set(self): destination.compose(sources=[source_1, source_2]) def test_compose_minimal(self): - from six.moves.http_client import OK - SOURCE_1 = 'source-1' SOURCE_2 = 'source-2' DESTINATION = 'destinaton' RESOURCE = { 'etag': 'DEADBEEF' } - after = ({'status': OK}, RESOURCE) + after = ({'status': http_client.OK}, RESOURCE) connection = _Connection(after) client = _Client(connection) bucket = _Bucket(client=client) @@ -1633,15 +1490,13 @@ def test_compose_minimal(self): self.assertEqual(kw[0]['data'], SENT) def test_compose_w_additional_property_changes(self): - from six.moves.http_client import OK - SOURCE_1 = 'source-1' SOURCE_2 = 'source-2' DESTINATION = 'destinaton' RESOURCE = { 'etag': 'DEADBEEF' } - after = ({'status': OK}, RESOURCE) + after = ({'status': http_client.OK}, RESOURCE) connection = _Connection(after) client = _Client(connection) bucket = _Bucket(client=client) @@ -1676,8 +1531,6 @@ def test_compose_w_additional_property_changes(self): self.assertEqual(kw[0]['data'], SENT) def test_rewrite_response_without_resource(self): - from six.moves.http_client import OK - SOURCE_BLOB = 'source' DEST_BLOB = 'dest' DEST_BUCKET = 'other-bucket' @@ -1688,7 +1541,7 @@ def test_rewrite_response_without_resource(self): 'done': False, 'rewriteToken': TOKEN, } - response = ({'status': OK}, RESPONSE) + response = ({'status': http_client.OK}, RESPONSE) connection = _Connection(response) client = _Client(connection) source_bucket = _Bucket(client=client) @@ -1703,8 +1556,6 @@ def test_rewrite_response_without_resource(self): self.assertEqual(size, 42) def test_rewrite_other_bucket_other_name_no_encryption_partial(self): - from six.moves.http_client import OK - SOURCE_BLOB = 'source' DEST_BLOB = 'dest' DEST_BUCKET = 'other-bucket' @@ -1716,7 +1567,7 @@ def test_rewrite_other_bucket_other_name_no_encryption_partial(self): 'rewriteToken': TOKEN, 'resource': {'etag': 'DEADBEEF'}, } - response = ({'status': OK}, RESPONSE) + response = ({'status': http_client.OK}, RESPONSE) connection = _Connection(response) client = _Client(connection) source_bucket = _Bucket(client=client) @@ -1752,7 +1603,6 @@ def test_rewrite_other_bucket_other_name_no_encryption_partial(self): def test_rewrite_same_name_no_old_key_new_key_done(self): import base64 import hashlib - from six.moves.http_client import OK KEY = b'01234567890123456789012345678901' # 32 bytes KEY_B64 = base64.b64encode(KEY).rstrip().decode('ascii') @@ -1765,7 +1615,7 @@ def test_rewrite_same_name_no_old_key_new_key_done(self): 'done': True, 'resource': {'etag': 'DEADBEEF'}, } - response = ({'status': OK}, RESPONSE) + response = ({'status': http_client.OK}, RESPONSE) connection = _Connection(response) client = _Client(connection) bucket = _Bucket(client=client) @@ -1800,7 +1650,6 @@ def test_rewrite_same_name_no_old_key_new_key_done(self): def test_rewrite_same_name_no_key_new_key_w_token(self): import base64 import hashlib - from six.moves.http_client import OK SOURCE_KEY = b'01234567890123456789012345678901' # 32 bytes SOURCE_KEY_B64 = base64.b64encode(SOURCE_KEY).rstrip().decode('ascii') @@ -1820,7 +1669,7 @@ def test_rewrite_same_name_no_key_new_key_w_token(self): 'done': True, 'resource': {'etag': 'DEADBEEF'}, } - response = ({'status': OK}, RESPONSE) + response = ({'status': http_client.OK}, RESPONSE) connection = _Connection(response) client = _Client(connection) bucket = _Bucket(client=client) @@ -1868,13 +1717,12 @@ def test_update_storage_class_invalid(self): blob.update_storage_class(u'BOGUS') def test_update_storage_class_wo_encryption_key(self): - from six.moves.http_client import OK BLOB_NAME = 'blob-name' STORAGE_CLASS = u'NEARLINE' RESPONSE = { 'resource': {'storageClass': STORAGE_CLASS}, } - response = ({'status': OK}, RESPONSE) + response = ({'status': http_client.OK}, RESPONSE) connection = _Connection(response) client = _Client(connection) bucket = _Bucket(client=client) @@ -1906,7 +1754,6 @@ def test_update_storage_class_wo_encryption_key(self): def test_update_storage_class_w_encryption_key(self): import base64 import hashlib - from six.moves.http_client import OK BLOB_NAME = 'blob-name' BLOB_KEY = b'01234567890123456789012345678901' # 32 bytes @@ -1918,7 +1765,7 @@ def test_update_storage_class_w_encryption_key(self): RESPONSE = { 'resource': {'storageClass': STORAGE_CLASS}, } - response = ({'status': OK}, RESPONSE) + response = ({'status': http_client.OK}, RESPONSE) connection = _Connection(response) client = _Client(connection) bucket = _Bucket(client=client) @@ -2230,7 +2077,6 @@ def test_storage_class_setter(self): self.assertEqual(blob._properties, {'storageClass': storage_class}) def test_time_deleted(self): - import datetime from google.cloud._helpers import _RFC3339_MICROS from google.cloud._helpers import UTC @@ -2248,7 +2094,6 @@ def test_time_deleted_unset(self): self.assertIsNone(blob.time_deleted) def test_time_created(self): - import datetime from google.cloud._helpers import _RFC3339_MICROS from google.cloud._helpers import UTC @@ -2266,7 +2111,6 @@ def test_time_created_unset(self): self.assertIsNone(blob.time_created) def test_updated(self): - import datetime from google.cloud._helpers import _RFC3339_MICROS from google.cloud._helpers import UTC @@ -2336,6 +2180,44 @@ def test_do_rewind(self): stream.seek.assert_called_once_with(0, os.SEEK_SET) +class Test__raise_from_invalid_response(unittest.TestCase): + + @staticmethod + def _call_fut(*args, **kwargs): + from google.cloud.storage.blob import _raise_from_invalid_response + + return _raise_from_invalid_response(*args, **kwargs) + + def _helper(self, message, **kwargs): + from google.resumable_media import InvalidResponse + from google.cloud import exceptions + + response = mock.Mock( + content=message, status_code=http_client.BAD_REQUEST, + spec=[u'content', u'status_code']) + error = InvalidResponse(response) + + with self.assertRaises(exceptions.BadRequest) as exc_info: + self._call_fut(error, **kwargs) + + return exc_info + + def test_default(self): + message = u'Failure' + exc_info = self._helper(message) + self.assertEqual(exc_info.exception.message, message) + self.assertEqual(exc_info.exception.errors, []) + + def test_with_error_info(self): + message = u'Eeek bad.' + error_info = 'http://test.invalid' + exc_info = self._helper(message, error_info=error_info) + + full_message = u'{} ({})'.format(message, error_info) + self.assertEqual(exc_info.exception.message, full_message) + self.assertEqual(exc_info.exception.errors, []) + + class _Responder(object): def __init__(self, *responses): @@ -2360,11 +2242,10 @@ def __init__(self, *responses): self.http = _HTTP(*responses) def api_request(self, **kw): - from six.moves.http_client import NOT_FOUND from google.cloud.exceptions import NotFound info, content = self._respond(**kw) - if info.get('status') == NOT_FOUND: + if info.get('status') == http_client.NOT_FOUND: raise NotFound(info) return content @@ -2438,8 +2319,6 @@ class _Stream(object): _closed = False def __init__(self, to_read=b''): - import io - self._written = [] self._to_read = io.BytesIO(to_read) From 9d229fa4828b3795191012416b54c219c32bd027 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Wed, 3 May 2017 19:33:21 -0700 Subject: [PATCH 3/5] Removing unused stub classes (previously used for `google.cloud.streaming`). --- storage/google/cloud/storage/blob.py | 22 ---------------------- 1 file changed, 22 deletions(-) diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index 457587cbb276..bd3d12cba08d 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -1482,28 +1482,6 @@ def updated(self): return _rfc3339_to_datetime(value) -class _UploadConfig(object): - """Faux message FBO apitools' 'configure_request'. - - Values extracted from apitools - 'samples/storage_sample/storage/storage_v1_client.py' - """ - accept = ['*/*'] - max_size = None - resumable_multipart = True - resumable_path = u'/resumable/upload/storage/v1/b/{bucket}/o' - simple_multipart = True - simple_path = u'/upload/storage/v1/b/{bucket}/o' - - -class _UrlBuilder(object): - """Faux builder FBO apitools' 'configure_request'""" - def __init__(self, bucket_name, object_name): - self.query_params = {'name': object_name} - self._bucket_name = bucket_name - self._relative_path = '' - - def _get_encryption_headers(key, source=False): """Builds customer encryption key headers From 92be78f911b6b499b81d543f3bc0dedd9fa5daed Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Wed, 3 May 2017 19:47:31 -0700 Subject: [PATCH 4/5] Cleaning up some unused unit test code in test_blob. --- storage/tests/unit/test_blob.py | 61 ++++----------------------------- 1 file changed, 6 insertions(+), 55 deletions(-) diff --git a/storage/tests/unit/test_blob.py b/storage/tests/unit/test_blob.py index 4e3e7c3eac17..fe755496cc4e 100644 --- a/storage/tests/unit/test_blob.py +++ b/storage/tests/unit/test_blob.py @@ -2218,29 +2218,22 @@ def test_with_error_info(self): self.assertEqual(exc_info.exception.errors, []) -class _Responder(object): +class _Connection(object): + + API_BASE_URL = 'http://example.com' + USER_AGENT = 'testing 1.2.3' + credentials = object() def __init__(self, *responses): self._responses = responses[:] self._requested = [] + self._signed = [] def _respond(self, **kw): self._requested.append(kw) response, self._responses = self._responses[0], self._responses[1:] return response - -class _Connection(_Responder): - - API_BASE_URL = 'http://example.com' - USER_AGENT = 'testing 1.2.3' - credentials = object() - - def __init__(self, *responses): - super(_Connection, self).__init__(*responses) - self._signed = [] - self.http = _HTTP(*responses) - def api_request(self, **kw): from google.cloud.exceptions import NotFound @@ -2249,28 +2242,6 @@ def api_request(self, **kw): raise NotFound(info) return content - def build_api_url(self, path, query_params=None, - api_base_url=API_BASE_URL): - from six.moves.urllib.parse import urlencode - from six.moves.urllib.parse import urlsplit - from six.moves.urllib.parse import urlunsplit - - # Mimic the build_api_url interface. - qs = urlencode(query_params or {}) - scheme, netloc, _, _, _ = urlsplit(api_base_url) - return urlunsplit((scheme, netloc, path, qs, '')) - - -class _HTTP(_Responder): - - connections = {} # For google-apitools debugging. - - def request(self, uri, method, headers, body, **kw): - if hasattr(body, 'read'): - body = body.read() - return self._respond(uri=uri, method=method, headers=headers, - body=body, **kw) - class _Bucket(object): @@ -2313,23 +2284,3 @@ def _connection(self): @property def _credentials(self): return self._base_connection.credentials - - -class _Stream(object): - _closed = False - - def __init__(self, to_read=b''): - self._written = [] - self._to_read = io.BytesIO(to_read) - - def seek(self, offset, whence=0): - self._to_read.seek(offset, whence) - - def read(self, size): - return self._to_read.read(size) - - def tell(self): - return self._to_read.tell() - - def close(self): - self._closed = True From 4fcbe8a4b536b053027535f345889ea73fd16444 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Thu, 4 May 2017 10:17:43 -0700 Subject: [PATCH 5/5] Adding chunk_size to Blob._initiate_resumable_upload. This is to avoid monkey-patching the instance when "pure" behavior will suffice. Also removed the transport from Blob._get_upload_arguments(). --- storage/google/cloud/storage/blob.py | 54 ++++++++++++++-------------- storage/tests/unit/test_blob.py | 31 +++++++++------- 2 files changed, 45 insertions(+), 40 deletions(-) diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index bd3d12cba08d..3bd9fe3edb0a 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -559,7 +559,7 @@ def _get_writable_metadata(self): return object_metadata - def _get_upload_arguments(self, client, content_type): + def _get_upload_arguments(self, content_type): """Get required arguments for performing an upload. The content type returned will be determined in order of precedence: @@ -568,27 +568,20 @@ def _get_upload_arguments(self, client, content_type): - The value stored on the current blob - The default value ('application/octet-stream') - :type client: :class:`~google.cloud.storage.client.Client` - :param client: (Optional) The client to use. If not passed, falls back - to the ``client`` stored on the blob's bucket. - :type content_type: str :param content_type: Type of content being uploaded (or :data:`None`). :rtype: tuple - :returns: A quadruple of + :returns: A triple of - * An - :class:`~google.auth.transport.requests.AuthorizedSession` * A header dictionary * An object metadata dictionary * The ``content_type`` as a string (according to precedence) """ - transport = self._make_transport(client) headers = _get_encryption_headers(self._encryption_key) object_metadata = self._get_writable_metadata() content_type = self._get_content_type(content_type) - return transport, headers, object_metadata, content_type + return headers, object_metadata, content_type def _do_multipart_upload(self, client, stream, content_type, size): """Perform a multipart upload. @@ -631,8 +624,9 @@ def _do_multipart_upload(self, client, stream, content_type, size): msg = _READ_LESS_THAN_SIZE.format(size, len(data)) raise ValueError(msg) - info = self._get_upload_arguments(client, content_type) - transport, headers, object_metadata, content_type = info + transport = self._make_transport(client) + info = self._get_upload_arguments(content_type) + headers, object_metadata, content_type = info upload_url = _MULTIPART_URL_TEMPLATE.format( bucket_path=self.bucket.path) @@ -643,11 +637,9 @@ def _do_multipart_upload(self, client, stream, content_type, size): return response def _initiate_resumable_upload(self, client, stream, content_type, - size, extra_headers=None): + size, extra_headers=None, chunk_size=None): """Initiate a resumable upload. - Assumes ``chunk_size`` is not :data:`None` on the current blob. - The content type of the upload will be determined in order of precedence: @@ -674,6 +666,13 @@ def _initiate_resumable_upload(self, client, stream, content_type, :param extra_headers: (Optional) Extra headers to add to standard headers. + :type chunk_size: int + :param chunk_size: + (Optional) Chunk size to use when creating a + :class:`~google.resumable_media.requests.ResumableUpload`. + If not passed, will fall back to the chunk size on the + current blob. + :rtype: tuple :returns: Pair of @@ -682,14 +681,18 @@ def _initiate_resumable_upload(self, client, stream, content_type, that was created * The ``transport`` used to initiate the upload. """ - info = self._get_upload_arguments(client, content_type) - transport, headers, object_metadata, content_type = info + if chunk_size is None: + chunk_size = self.chunk_size + + transport = self._make_transport(client) + info = self._get_upload_arguments(content_type) + headers, object_metadata, content_type = info if extra_headers is not None: headers.update(extra_headers) upload_url = _RESUMABLE_URL_TEMPLATE.format( bucket_path=self.bucket.path) - upload = ResumableUpload(upload_url, self.chunk_size, headers=headers) + upload = ResumableUpload(upload_url, chunk_size, headers=headers) upload.initiate( transport, stream, object_metadata, content_type, total_bytes=size, stream_final=False) @@ -996,24 +999,19 @@ def create_resumable_upload_session( # determines the origins allowed for CORS. extra_headers['Origin'] = origin - curr_chunk_size = self.chunk_size try: - # Temporarily patch the chunk size. A user should still be able - # to initiate an upload session without setting the chunk size. - # The chunk size only matters when **sending** bytes to an upload. - self.chunk_size = self._CHUNK_SIZE_MULTIPLE - dummy_stream = BytesIO(b'') + # Send a fake the chunk size which we **know** will be acceptable + # to the `ResumableUpload` constructor. The chunk size only + # matters when **sending** bytes to an upload. upload, _ = self._initiate_resumable_upload( client, dummy_stream, content_type, size, - extra_headers=extra_headers) + extra_headers=extra_headers, + chunk_size=self._CHUNK_SIZE_MULTIPLE) return upload.resumable_url except resumable_media.InvalidResponse as exc: _raise_from_invalid_response(exc) - finally: - # Put back the original chunk size. - self.chunk_size = curr_chunk_size def get_iam_policy(self, client=None): """Retrieve the IAM policy for the object. diff --git a/storage/tests/unit/test_blob.py b/storage/tests/unit/test_blob.py index fe755496cc4e..bbe67047fbff 100644 --- a/storage/tests/unit/test_blob.py +++ b/storage/tests/unit/test_blob.py @@ -734,15 +734,12 @@ def test__get_upload_arguments(self): name = u'blob-name' key = b'[pXw@,p@@AfBfrR3x-2b2SCHR,.?YwRO' blob = self._make_one(name, bucket=None, encryption_key=key) - blob._make_transport = mock.Mock(spec=[]) blob.content_disposition = 'inline' - client = mock.sentinel.mock content_type = u'image/jpeg' - info = blob._get_upload_arguments(client, content_type) + info = blob._get_upload_arguments(content_type) - transport, headers, object_metadata, new_content_type = info - self.assertIs(transport, blob._make_transport.return_value) + headers, object_metadata, new_content_type = info header_key_value = 'W3BYd0AscEBAQWZCZnJSM3gtMmIyU0NIUiwuP1l3Uk8=' header_key_hash_value = 'G0++dxF4q5rG4o9kE8gvEKn15RH6wLm0wXV1MgAlXOg=' expected_headers = { @@ -758,8 +755,6 @@ def test__get_upload_arguments(self): self.assertEqual(object_metadata, expected_metadata) self.assertEqual(new_content_type, content_type) - blob._make_transport.assert_called_once_with(client) - def _mock_transport(self, status_code, headers, content=b''): fake_transport = mock.Mock(spec=['request']) fake_response = self._mock_requests_response( @@ -838,7 +833,8 @@ def test__do_multipart_upload_bad_size(self): 'was specified but the file-like object only had', exc_contents) self.assertEqual(stream.tell(), len(data)) - def _initiate_resumable_helper(self, size=None, extra_headers=None): + def _initiate_resumable_helper(self, size=None, extra_headers=None, + chunk_size=None): from google.resumable_media.requests import ResumableUpload bucket = mock.Mock(path='/b/whammy', spec=[u'path']) @@ -866,7 +862,8 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None): stream = io.BytesIO(data) content_type = u'text/plain' upload, transport = blob._initiate_resumable_upload( - client, stream, content_type, size, extra_headers=extra_headers) + client, stream, content_type, size, + extra_headers=extra_headers, chunk_size=chunk_size) # Check the returned values. self.assertIsInstance(upload, ResumableUpload) @@ -881,7 +878,11 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None): self.assertEqual(upload._headers, extra_headers) self.assertIsNot(upload._headers, extra_headers) self.assertFalse(upload.finished) - self.assertEqual(upload._chunk_size, blob.chunk_size) + if chunk_size is None: + self.assertEqual(upload._chunk_size, blob.chunk_size) + else: + self.assertNotEqual(blob.chunk_size, chunk_size) + self.assertEqual(upload._chunk_size, chunk_size) self.assertIs(upload._stream, stream) if size is None: self.assertIsNone(upload._total_bytes) @@ -914,6 +915,10 @@ def test__initiate_resumable_upload_no_size(self): def test__initiate_resumable_upload_with_size(self): self._initiate_resumable_helper(size=10000) + def test__initiate_resumable_upload_with_chunk_size(self): + one_mb = 1048576 + self._initiate_resumable_helper(chunk_size=one_mb) + def test__initiate_resumable_upload_with_extra_headers(self): extra_headers = {'origin': 'http://not-in-kansas-anymore.invalid'} self._initiate_resumable_helper(extra_headers=extra_headers) @@ -1222,7 +1227,8 @@ def _create_resumable_upload_session_helper(self, origin=None, side_effect=None): bucket = mock.Mock(path='/b/alex-trebek', spec=[u'path']) blob = self._make_one('blob-name', bucket=bucket) - blob.chunk_size = 99 * blob._CHUNK_SIZE_MULTIPLE + chunk_size = 99 * blob._CHUNK_SIZE_MULTIPLE + blob.chunk_size = chunk_size # Create mocks to be checked for doing transport. resumable_url = 'http://test.invalid?upload_id=clean-up-everybody' @@ -1241,8 +1247,9 @@ def _create_resumable_upload_session_helper(self, origin=None, content_type=content_type, size=size, origin=origin, client=client) - # Check the returned value. + # Check the returned value and (lack of) side-effect. self.assertEqual(new_url, resumable_url) + self.assertEqual(blob.chunk_size, chunk_size) # Check the mocks. blob._make_transport.assert_called_once_with(client)