From 9876ad66e9d1c2b365b30acea9e32949a68ff1b3 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Mon, 9 Oct 2017 17:10:08 -0400 Subject: [PATCH] bigquery: add Client.load_table_from_file (#4136) Move the method from Table to Client. --- bigquery/google/cloud/bigquery/client.py | 221 +++++++++- bigquery/google/cloud/bigquery/job.py | 10 +- bigquery/google/cloud/bigquery/table.py | 410 ------------------- bigquery/tests/system.py | 50 +-- bigquery/tests/unit/test_client.py | 475 ++++++++++++++++++++++ bigquery/tests/unit/test_table.py | 497 ----------------------- 6 files changed, 726 insertions(+), 937 deletions(-) diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index f460202a3631..ce41824996bd 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -17,11 +17,17 @@ from __future__ import absolute_import import collections +import os import uuid import six +from google import resumable_media +from google.resumable_media.requests import MultipartUpload +from google.resumable_media.requests import ResumableUpload + from google.api.core import page_iterator +from google.cloud import exceptions from google.cloud.client import ClientWithProject from google.cloud.bigquery._http import Connection from google.cloud.bigquery.dataset import Dataset @@ -37,6 +43,20 @@ from google.cloud.bigquery._helpers import _rows_page_start +_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB +_MAX_MULTIPART_SIZE = 5 * 1024 * 1024 +_DEFAULT_NUM_RETRIES = 6 +_BASE_UPLOAD_TEMPLATE = ( + u'https://www.googleapis.com/upload/bigquery/v2/projects/' + u'{project}/jobs?uploadType=') +_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'multipart' +_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'resumable' +_GENERIC_CONTENT_TYPE = u'*/*' +_READ_LESS_THAN_SIZE = ( + 'Size {:d} was specified but the file-like object only had ' + '{:d} bytes remaining.') + + class Project(object): """Wrapper for resource describing a BigQuery project. @@ -535,7 +555,7 @@ def load_table_from_storage(self, source_uris, destination, :param destination: Table into which data is to be loaded. :type job_id: str - :param job_id: Name of the job. + :param job_id: (Optional) Name of the job. :type job_config: :class:`google.cloud.bigquery.job.LoadJobConfig` :param job_config: (Optional) Extra configuration options for the job. @@ -550,6 +570,171 @@ def load_table_from_storage(self, source_uris, destination, job.begin() return job + def load_table_from_file(self, file_obj, destination, + rewind=False, + size=None, + num_retries=_DEFAULT_NUM_RETRIES, + job_id=None, job_config=None): + """Upload the contents of this table from a file-like object. + + Like load_table_from_storage, this creates, starts and returns + a ``LoadJob``. + + :type file_obj: file + :param file_obj: A file handle opened in binary mode for reading. + + :type destination: :class:`google.cloud.bigquery.table.TableReference` + :param destination: Table into which data is to be loaded. + + :type rewind: bool + :param rewind: If True, seek to the beginning of the file handle before + reading the file. + + :type size: int + :param size: The number of bytes to read from the file handle. + If size is ``None`` or large, resumable upload will be + used. Otherwise, multipart upload will be used. + + :type num_retries: int + :param num_retries: Number of upload retries. Defaults to 6. + + :type job_id: str + :param job_id: (Optional) Name of the job. + + :type job_config: :class:`google.cloud.bigquery.job.LoadJobConfig` + :param job_config: (Optional) Extra configuration options for the job. + + :rtype: :class:`~google.cloud.bigquery.jobs.LoadJob` + + :returns: the job instance used to load the data (e.g., for + querying status). Note that the job is already started: + do not call ``job.begin()``. + :raises: :class:`ValueError` if ``size`` is not passed in and can not + be determined, or if the ``file_obj`` can be detected to be + a file opened in text mode. + """ + job_id = _make_job_id(job_id) + job = LoadJob(job_id, None, destination, self, job_config) + job_resource = job._build_resource() + if rewind: + file_obj.seek(0, os.SEEK_SET) + _check_mode(file_obj) + try: + if size is None or size >= _MAX_MULTIPART_SIZE: + response = self._do_resumable_upload( + file_obj, job_resource, num_retries) + else: + response = self._do_multipart_upload( + file_obj, job_resource, size, num_retries) + except resumable_media.InvalidResponse as exc: + raise exceptions.from_http_response(exc.response) + return self.job_from_resource(response.json()) + + def _do_resumable_upload(self, stream, metadata, num_retries): + """Perform a resumable upload. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type metadata: dict + :param metadata: The metadata associated with the upload. + + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + + :rtype: :class:`~requests.Response` + :returns: The "200 OK" response object returned after the final chunk + is uploaded. + """ + upload, transport = self._initiate_resumable_upload( + stream, metadata, num_retries) + + while not upload.finished: + response = upload.transmit_next_chunk(transport) + + return response + + def _initiate_resumable_upload(self, stream, metadata, num_retries): + """Initiate a resumable upload. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type metadata: dict + :param metadata: The metadata associated with the upload. + + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + + :rtype: tuple + :returns: + Pair of + + * The :class:`~google.resumable_media.requests.ResumableUpload` + that was created + * The ``transport`` used to initiate the upload. + """ + chunk_size = _DEFAULT_CHUNKSIZE + transport = self._http + headers = _get_upload_headers(self._connection.USER_AGENT) + upload_url = _RESUMABLE_URL_TEMPLATE.format(project=self.project) + upload = ResumableUpload(upload_url, chunk_size, headers=headers) + + if num_retries is not None: + upload._retry_strategy = resumable_media.RetryStrategy( + max_retries=num_retries) + + upload.initiate( + transport, stream, metadata, _GENERIC_CONTENT_TYPE, + stream_final=False) + + return upload, transport + + def _do_multipart_upload(self, stream, metadata, size, num_retries): + """Perform a multipart upload. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :type metadata: dict + :param metadata: The metadata associated with the upload. + + :type size: int + :param size: The number of bytes to be uploaded (which will be read + from ``stream``). If not provided, the upload will be + concluded once ``stream`` is exhausted (or :data:`None`). + + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + + :rtype: :class:`~requests.Response` + :returns: The "200 OK" response object returned after the multipart + upload request. + :raises: :exc:`ValueError` if the ``stream`` has fewer than ``size`` + bytes remaining. + """ + data = stream.read(size) + if len(data) < size: + msg = _READ_LESS_THAN_SIZE.format(size, len(data)) + raise ValueError(msg) + + headers = _get_upload_headers(self._connection.USER_AGENT) + + upload_url = _MULTIPART_URL_TEMPLATE.format(project=self.project) + upload = MultipartUpload(upload_url, headers=headers) + + if num_retries is not None: + upload._retry_strategy = resumable_media.RetryStrategy( + max_retries=num_retries) + + response = upload.transmit( + self._http, data, metadata, _GENERIC_CONTENT_TYPE) + + return response + def copy_table(self, sources, destination, job_id=None, job_config=None): """Start a job for copying one or more tables into another table. @@ -832,3 +1017,37 @@ def _make_job_id(job_id): if job_id is None: return str(uuid.uuid4()) return job_id + + +def _check_mode(stream): + """Check that a stream was opened in read-binary mode. + + :type stream: IO[bytes] + :param stream: A bytes IO object open for reading. + + :raises: :exc:`ValueError` if the ``stream.mode`` is a valid attribute + and is not among ``rb``, ``r+b`` or ``rb+``. + """ + mode = getattr(stream, 'mode', None) + + if mode is not None and mode not in ('rb', 'r+b', 'rb+'): + raise ValueError( + "Cannot upload files opened in text mode: use " + "open(filename, mode='rb') or open(filename, mode='r+b')") + + +def _get_upload_headers(user_agent): + """Get the headers for an upload request. + + :type user_agent: str + :param user_agent: The user-agent for requests. + + :rtype: dict + :returns: The headers to be used for the request. + """ + return { + 'Accept': 'application/json', + 'Accept-Encoding': 'gzip, deflate', + 'User-Agent': user_agent, + 'content-type': 'application/json', + } diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index af3b1997f177..4f9c005a883e 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -684,11 +684,11 @@ class LoadJob(_AsyncJob): :type job_id: str :param job_id: the job's ID - :type source_uris: sequence of string + :type source_uris: sequence of string or ``NoneType`` :param source_uris: URIs of one or more data files to be loaded. See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.sourceUris - for supported URI formats. + for supported URI formats. Pass None for jobs that load from a file. :type destination: :class:`google.cloud.bigquery.table.TableReference` :param destination: reference to table into which data is to be loaded. @@ -856,7 +856,8 @@ def output_rows(self): def _build_resource(self): """Generate a resource for :meth:`begin`.""" configuration = self._configuration.to_api_repr() - configuration['sourceUris'] = self.source_uris + if self.source_uris is not None: + configuration['sourceUris'] = self.source_uris configuration['destinationTable'] = self.destination.to_api_repr() return { @@ -898,8 +899,7 @@ def from_api_repr(cls, resource, client): ds_ref = DatasetReference(dest_config['projectId'], dest_config['datasetId'],) destination = TableReference(ds_ref, dest_config['tableId']) - # TODO(jba): sourceUris should not be absent if there are no LoadJobs - # for file uploads. + # sourceUris will be absent if this is a file upload. source_uris = config_resource.get('sourceUris') job = cls(job_id, source_uris, destination, client, config) job._set_properties(resource) diff --git a/bigquery/google/cloud/bigquery/table.py b/bigquery/google/cloud/bigquery/table.py index e4814ae16c8e..8f56dffd18bf 100644 --- a/bigquery/google/cloud/bigquery/table.py +++ b/bigquery/google/cloud/bigquery/table.py @@ -17,14 +17,9 @@ from __future__ import absolute_import import datetime -import os import six -from google import resumable_media -from google.resumable_media.requests import MultipartUpload -from google.resumable_media.requests import ResumableUpload - from google.cloud import exceptions from google.cloud._helpers import _datetime_from_microseconds from google.cloud._helpers import _millis_from_datetime @@ -34,17 +29,6 @@ _TABLE_HAS_NO_SCHEMA = "Table has no schema: call 'client.get_table()'" _MARKER = object() -_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB -_BASE_UPLOAD_TEMPLATE = ( - u'https://www.googleapis.com/upload/bigquery/v2/projects/' - u'{project}/jobs?uploadType=') -_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'multipart' -_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'resumable' -_GENERIC_CONTENT_TYPE = u'*/*' -_READ_LESS_THAN_SIZE = ( - 'Size {:d} was specified but the file-like object only had ' - '{:d} bytes remaining.') -_DEFAULT_NUM_RETRIES = 6 class TableReference(object): @@ -826,353 +810,6 @@ def insert_data(self, return errors - def _get_transport(self, client): - """Return the client's transport. - - :type client: :class:`~google.cloud.bigquery.client.Client` - :param client: The client to use. - - :rtype transport: - :class:`~google.auth.transport.requests.AuthorizedSession` - :returns: The transport (with credentials) that will - make authenticated requests. - """ - return client._http - - def _initiate_resumable_upload(self, client, stream, - metadata, num_retries): - """Initiate a resumable upload. - - :type client: :class:`~google.cloud.bigquery.client.Client` - :param client: The client to use. - - :type stream: IO[bytes] - :param stream: A bytes IO object open for reading. - - :type metadata: dict - :param metadata: The metadata associated with the upload. - - :type num_retries: int - :param num_retries: Number of upload retries. (Deprecated: This - argument will be removed in a future release.) - - :rtype: tuple - :returns: - Pair of - - * The :class:`~google.resumable_media.requests.ResumableUpload` - that was created - * The ``transport`` used to initiate the upload. - """ - chunk_size = _DEFAULT_CHUNKSIZE - transport = self._get_transport(client) - headers = _get_upload_headers(client._connection.USER_AGENT) - upload_url = _RESUMABLE_URL_TEMPLATE.format(project=self.project) - upload = ResumableUpload(upload_url, chunk_size, headers=headers) - - if num_retries is not None: - upload._retry_strategy = resumable_media.RetryStrategy( - max_retries=num_retries) - - upload.initiate( - transport, stream, metadata, _GENERIC_CONTENT_TYPE, - stream_final=False) - - return upload, transport - - def _do_resumable_upload(self, client, stream, metadata, num_retries): - """Perform a resumable upload. - - :type client: :class:`~google.cloud.bigquery.client.Client` - :param client: The client to use. - - :type stream: IO[bytes] - :param stream: A bytes IO object open for reading. - - :type metadata: dict - :param metadata: The metadata associated with the upload. - - :type num_retries: int - :param num_retries: Number of upload retries. (Deprecated: This - argument will be removed in a future release.) - - :rtype: :class:`~requests.Response` - :returns: The "200 OK" response object returned after the final chunk - is uploaded. - """ - upload, transport = self._initiate_resumable_upload( - client, stream, metadata, num_retries) - - while not upload.finished: - response = upload.transmit_next_chunk(transport) - - return response - - def _do_multipart_upload(self, client, stream, metadata, - size, num_retries): - """Perform a multipart upload. - - :type client: :class:`~google.cloud.bigquery.client.Client` - :param client: The client to use. - - :type stream: IO[bytes] - :param stream: A bytes IO object open for reading. - - :type metadata: dict - :param metadata: The metadata associated with the upload. - - :type size: int - :param size: The number of bytes to be uploaded (which will be read - from ``stream``). If not provided, the upload will be - concluded once ``stream`` is exhausted (or :data:`None`). - - :type num_retries: int - :param num_retries: Number of upload retries. (Deprecated: This - argument will be removed in a future release.) - - :rtype: :class:`~requests.Response` - :returns: The "200 OK" response object returned after the multipart - upload request. - :raises: :exc:`ValueError` if the ``stream`` has fewer than ``size`` - bytes remaining. - """ - data = stream.read(size) - if len(data) < size: - msg = _READ_LESS_THAN_SIZE.format(size, len(data)) - raise ValueError(msg) - - transport = self._get_transport(client) - headers = _get_upload_headers(client._connection.USER_AGENT) - - upload_url = _MULTIPART_URL_TEMPLATE.format(project=self.project) - upload = MultipartUpload(upload_url, headers=headers) - - if num_retries is not None: - upload._retry_strategy = resumable_media.RetryStrategy( - max_retries=num_retries) - - response = upload.transmit( - transport, data, metadata, _GENERIC_CONTENT_TYPE) - - return response - - def _do_upload(self, client, stream, metadata, size, num_retries): - """Determine an upload strategy and then perform the upload. - - If ``size`` is :data:`None`, then a resumable upload will be used, - otherwise the content and the metadata will be uploaded - in a single multipart upload request. - - :type client: :class:`~google.cloud.bigquery.client.Client` - :param client: The client to use. - - :type stream: IO[bytes] - :param stream: A bytes IO object open for reading. - - :type metadata: dict - :param metadata: The metadata associated with the upload. - - :type size: int - :param size: The number of bytes to be uploaded (which will be read - from ``stream``). If not provided, the upload will be - concluded once ``stream`` is exhausted (or :data:`None`). - - :type num_retries: int - :param num_retries: Number of upload retries. (Deprecated: This - argument will be removed in a future release.) - - :rtype: dict - :returns: The parsed JSON from the "200 OK" response. This will be the - **only** response in the multipart case and it will be the - **final** response in the resumable case. - """ - if size is None: - response = self._do_resumable_upload( - client, stream, metadata, num_retries) - else: - response = self._do_multipart_upload( - client, stream, metadata, size, num_retries) - - return response.json() - - # pylint: disable=too-many-arguments,too-many-locals - def upload_from_file(self, - file_obj, - source_format, - rewind=False, - size=None, - num_retries=_DEFAULT_NUM_RETRIES, - allow_jagged_rows=None, - allow_quoted_newlines=None, - create_disposition=None, - encoding=None, - field_delimiter=None, - ignore_unknown_values=None, - max_bad_records=None, - quote_character=None, - skip_leading_rows=None, - write_disposition=None, - client=None, - job_name=None, - null_marker=None): - """Upload the contents of this table from a file-like object. - - :type file_obj: file - :param file_obj: A file handle opened in binary mode for reading. - - :type source_format: str - :param source_format: Any supported format. The full list of supported - formats is documented under the - ``configuration.extract.destinationFormat`` property on this page: - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs - - :type rewind: bool - :param rewind: If True, seek to the beginning of the file handle before - writing the file. - - :type size: int - :param size: The number of bytes to read from the file handle. - If not provided, we'll try to guess the size using - :func:`os.fstat`. (If the file handle is not from the - filesystem this won't be possible.) - - :type num_retries: int - :param num_retries: Number of upload retries. Defaults to 6. - - :type allow_jagged_rows: bool - :param allow_jagged_rows: job configuration option; see - :meth:`google.cloud.bigquery.job.LoadJob`. - - :type allow_quoted_newlines: bool - :param allow_quoted_newlines: job configuration option; see - :meth:`google.cloud.bigquery.job.LoadJob`. - - :type create_disposition: str - :param create_disposition: job configuration option; see - :meth:`google.cloud.bigquery.job.LoadJob`. - - :type encoding: str - :param encoding: job configuration option; see - :meth:`google.cloud.bigquery.job.LoadJob`. - - :type field_delimiter: str - :param field_delimiter: job configuration option; see - :meth:`google.cloud.bigquery.job.LoadJob`. - - :type ignore_unknown_values: bool - :param ignore_unknown_values: job configuration option; see - :meth:`google.cloud.bigquery.job.LoadJob`. - - :type max_bad_records: int - :param max_bad_records: job configuration option; see - :meth:`google.cloud.bigquery.job.LoadJob`. - - :type quote_character: str - :param quote_character: job configuration option; see - :meth:`google.cloud.bigquery.job.LoadJob`. - - :type skip_leading_rows: int - :param skip_leading_rows: job configuration option; see - :meth:`google.cloud.bigquery.job.LoadJob`. - - :type write_disposition: str - :param write_disposition: job configuration option; see - :meth:`google.cloud.bigquery.job.LoadJob`. - - :type client: :class:`~google.cloud.bigquery.client.Client` - :param client: (Optional) The client to use. If not passed, falls back - to the ``client`` stored on the current table. - - :type job_name: str - :param job_name: Optional. The id of the job. Generated if not - explicitly passed in. - - :type null_marker: str - :param null_marker: Optional. A custom null marker (example: "\\N") - - :rtype: :class:`~google.cloud.bigquery.jobs.LoadJob` - - :returns: the job instance used to load the data (e.g., for - querying status). Note that the job is already started: - do not call ``job.begin()``. - :raises: :class:`ValueError` if ``size`` is not passed in and can not - be determined, or if the ``file_obj`` can be detected to be - a file opened in text mode. - """ - client = self._require_client(client) - _maybe_rewind(file_obj, rewind=rewind) - _check_mode(file_obj) - metadata = _get_upload_metadata( - source_format, self._schema, self._project, - self._dataset_id, self.table_id) - _configure_job_metadata(metadata, allow_jagged_rows, - allow_quoted_newlines, create_disposition, - encoding, field_delimiter, - ignore_unknown_values, max_bad_records, - quote_character, skip_leading_rows, - write_disposition, job_name, null_marker) - - try: - created_json = self._do_upload( - client, file_obj, metadata, size, num_retries) - return client.job_from_resource(created_json) - except resumable_media.InvalidResponse as exc: - raise exceptions.from_http_response(exc.response) - # pylint: enable=too-many-arguments,too-many-locals - - -def _configure_job_metadata(metadata, # pylint: disable=too-many-arguments - allow_jagged_rows, - allow_quoted_newlines, - create_disposition, - encoding, - field_delimiter, - ignore_unknown_values, - max_bad_records, - quote_character, - skip_leading_rows, - write_disposition, - job_name, - null_marker): - """Helper for :meth:`Table.upload_from_file`.""" - load_config = metadata['configuration']['load'] - - if allow_jagged_rows is not None: - load_config['allowJaggedRows'] = allow_jagged_rows - - if allow_quoted_newlines is not None: - load_config['allowQuotedNewlines'] = allow_quoted_newlines - - if create_disposition is not None: - load_config['createDisposition'] = create_disposition - - if encoding is not None: - load_config['encoding'] = encoding - - if field_delimiter is not None: - load_config['fieldDelimiter'] = field_delimiter - - if ignore_unknown_values is not None: - load_config['ignoreUnknownValues'] = ignore_unknown_values - - if max_bad_records is not None: - load_config['maxBadRecords'] = max_bad_records - - if quote_character is not None: - load_config['quote'] = quote_character - - if skip_leading_rows is not None: - load_config['skipLeadingRows'] = skip_leading_rows - - if write_disposition is not None: - load_config['writeDisposition'] = write_disposition - - if job_name is not None: - load_config['jobReference'] = {'jobId': job_name} - - if null_marker is not None: - load_config['nullMarker'] = null_marker - def _parse_schema_resource(info): """Parse a resource fragment into a schema field. @@ -1222,53 +859,6 @@ def _build_schema_resource(fields): # pylint: enable=unused-argument -def _maybe_rewind(stream, rewind=False): - """Rewind the stream if desired. - - :type stream: IO[bytes] - :param stream: A bytes IO object open for reading. - - :type rewind: bool - :param rewind: Indicates if we should seek to the beginning of the stream. - """ - if rewind: - stream.seek(0, os.SEEK_SET) - - -def _check_mode(stream): - """Check that a stream was opened in read-binary mode. - - :type stream: IO[bytes] - :param stream: A bytes IO object open for reading. - - :raises: :exc:`ValueError` if the ``stream.mode`` is a valid attribute - and is not among ``rb``, ``r+b`` or ``rb+``. - """ - mode = getattr(stream, 'mode', None) - - if mode is not None and mode not in ('rb', 'r+b', 'rb+'): - raise ValueError( - "Cannot upload files opened in text mode: use " - "open(filename, mode='rb') or open(filename, mode='r+b')") - - -def _get_upload_headers(user_agent): - """Get the headers for an upload request. - - :type user_agent: str - :param user_agent: The user-agent for requests. - - :rtype: dict - :returns: The headers to be used for the request. - """ - return { - 'Accept': 'application/json', - 'Accept-Encoding': 'gzip, deflate', - 'User-Agent': user_agent, - 'content-type': 'application/json', - } - - def _get_upload_metadata(source_format, schema, project, dataset_id, table_id): """Get base metadata for creating a table. diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index ce49e88177e7..1bf00a9b57ef 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -372,7 +372,8 @@ def test_load_table_from_local_file_then_dump_table(self): full_name = bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED') age = bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED') - table_arg = Table(dataset.table(TABLE_NAME), schema=[full_name, age], + table_ref = dataset.table(TABLE_NAME) + table_arg = Table(table_ref, schema=[full_name, age], client=Config.CLIENT) table = retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) @@ -384,13 +385,14 @@ def test_load_table_from_local_file_then_dump_table(self): writer.writerows(ROWS) with open(temp.name, 'rb') as csv_read: - job = table.upload_from_file( - csv_read, - source_format='CSV', - skip_leading_rows=1, - create_disposition='CREATE_NEVER', - write_disposition='WRITE_EMPTY', - ) + config = bigquery.LoadJobConfig() + config.source_format = 'CSV' + config.skip_leading_rows = 1 + config.create_disposition = 'CREATE_NEVER' + config.write_disposition = 'WRITE_EMPTY' + config.schema = table.schema + job = Config.CLIENT.load_table_from_file( + csv_read, table_ref, job_config=config) # Retry until done. job.result(timeout=JOB_TIMEOUT) @@ -414,16 +416,16 @@ def test_load_table_from_local_avro_file_then_dump_table(self): ("red", 650)] dataset = self.temp_dataset(_make_dataset_id('load_local_then_dump')) - table = Table(dataset.table(TABLE_NAME), client=Config.CLIENT) + table_ref = dataset.table(TABLE_NAME) + table = Table(table_ref, client=Config.CLIENT) self.to_delete.insert(0, table) with open(os.path.join(WHERE, 'data', 'colors.avro'), 'rb') as avrof: - job = table.upload_from_file( - avrof, - source_format='AVRO', - write_disposition='WRITE_TRUNCATE' - ) - + config = bigquery.LoadJobConfig() + config.source_format = 'AVRO' + config.write_disposition = 'WRITE_TRUNCATE' + job = Config.CLIENT.load_table_from_file( + avrof, table_ref, job_config=config) # Retry until done. job.result(timeout=JOB_TIMEOUT) @@ -889,8 +891,8 @@ def _load_table_for_dml(self, rows, dataset_id, table_id): dataset = self.temp_dataset(dataset_id) greeting = bigquery.SchemaField( 'greeting', 'STRING', mode='NULLABLE') - table_arg = Table(dataset.table(table_id), schema=[greeting], - client=Config.CLIENT) + table_ref = dataset.table(table_id) + table_arg = Table(table_ref, schema=[greeting], client=Config.CLIENT) table = retry_403(Config.CLIENT.create_table)(table_arg) self.to_delete.insert(0, table) @@ -901,13 +903,13 @@ def _load_table_for_dml(self, rows, dataset_id, table_id): writer.writerows(rows) with open(temp.name, 'rb') as csv_read: - job = table.upload_from_file( - csv_read, - source_format='CSV', - skip_leading_rows=1, - create_disposition='CREATE_NEVER', - write_disposition='WRITE_EMPTY', - ) + config = bigquery.LoadJobConfig() + config.source_format = 'CSV' + config.skip_leading_rows = 1 + config.create_disposition = 'CREATE_NEVER' + config.write_disposition = 'WRITE_EMPTY' + job = Config.CLIENT.load_table_from_file( + csv_read, table_ref, job_config=config) # Retry until done. job.result(timeout=JOB_TIMEOUT) diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 4f509da53f1c..f4537f3fba8b 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -13,10 +13,17 @@ # limitations under the License. import copy +import email +import io +import json import unittest import mock import six +from six.moves import http_client +import pytest + +from google.cloud.bigquery.dataset import DatasetReference def _make_credentials(): @@ -1442,6 +1449,154 @@ def test_load_table_from_storage(self): self.assertEqual(list(job.source_uris), [SOURCE_URI]) self.assertIs(job.destination, destination) + @staticmethod + def _mock_requests_response(status_code, headers, content=b''): + return mock.Mock( + content=content, headers=headers, status_code=status_code, + spec=['content', 'headers', 'status_code']) + + def _mock_transport(self, status_code, headers, content=b''): + fake_transport = mock.Mock(spec=['request']) + fake_response = self._mock_requests_response( + status_code, headers, content=content) + fake_transport.request.return_value = fake_response + return fake_transport + + def _initiate_resumable_upload_helper(self, num_retries=None): + from google.resumable_media.requests import ResumableUpload + from google.cloud.bigquery.client import _DEFAULT_CHUNKSIZE + from google.cloud.bigquery.client import _GENERIC_CONTENT_TYPE + from google.cloud.bigquery.client import _get_upload_headers + from google.cloud.bigquery.job import LoadJob, LoadJobConfig + + PROJECT = 'PROJECT' + DS_ID = 'DATASET_ID' + TABLE_ID = 'TABLE_ID' + + # Create mocks to be checked for doing transport. + resumable_url = 'http://test.invalid?upload_id=hey-you' + response_headers = {'location': resumable_url} + fake_transport = self._mock_transport( + http_client.OK, response_headers) + client = self._make_one(project=PROJECT, _http=fake_transport) + conn = client._connection = _Connection() + table_ref = DatasetReference(PROJECT, DS_ID).table(TABLE_ID) + + # Create some mock arguments and call the method under test. + data = b'goodbye gudbi gootbee' + stream = io.BytesIO(data) + config = LoadJobConfig() + config.source_format = 'CSV' + job = LoadJob(None, None, table_ref, client, job_config=config) + metadata = job._build_resource() + upload, transport = client._initiate_resumable_upload( + stream, metadata, num_retries) + + # Check the returned values. + self.assertIsInstance(upload, ResumableUpload) + upload_url = ( + 'https://www.googleapis.com/upload/bigquery/v2/projects/' + + PROJECT + + '/jobs?uploadType=resumable') + self.assertEqual(upload.upload_url, upload_url) + expected_headers = _get_upload_headers(conn.USER_AGENT) + self.assertEqual(upload._headers, expected_headers) + self.assertFalse(upload.finished) + self.assertEqual(upload._chunk_size, _DEFAULT_CHUNKSIZE) + self.assertIs(upload._stream, stream) + self.assertIsNone(upload._total_bytes) + self.assertEqual(upload._content_type, _GENERIC_CONTENT_TYPE) + self.assertEqual(upload.resumable_url, resumable_url) + + retry_strategy = upload._retry_strategy + self.assertEqual(retry_strategy.max_sleep, 64.0) + if num_retries is None: + self.assertEqual(retry_strategy.max_cumulative_retry, 600.0) + self.assertIsNone(retry_strategy.max_retries) + else: + self.assertIsNone(retry_strategy.max_cumulative_retry) + self.assertEqual(retry_strategy.max_retries, num_retries) + self.assertIs(transport, fake_transport) + # Make sure we never read from the stream. + self.assertEqual(stream.tell(), 0) + + # Check the mocks. + request_headers = expected_headers.copy() + request_headers['x-upload-content-type'] = _GENERIC_CONTENT_TYPE + fake_transport.request.assert_called_once_with( + 'POST', + upload_url, + data=json.dumps(metadata).encode('utf-8'), + headers=request_headers, + ) + + def test__initiate_resumable_upload(self): + self._initiate_resumable_upload_helper() + + def test__initiate_resumable_upload_with_retry(self): + self._initiate_resumable_upload_helper(num_retries=11) + + def _do_multipart_upload_success_helper( + self, get_boundary, num_retries=None): + from google.cloud.bigquery.client import _get_upload_headers + from google.cloud.bigquery.job import LoadJob, LoadJobConfig + + PROJECT = 'PROJECT' + DS_ID = 'DATASET_ID' + TABLE_ID = 'TABLE_ID' + + fake_transport = self._mock_transport(http_client.OK, {}) + client = self._make_one(project=PROJECT, _http=fake_transport) + conn = client._connection = _Connection() + table_ref = DatasetReference(PROJECT, DS_ID).table(TABLE_ID) + + # Create some mock arguments. + data = b'Bzzzz-zap \x00\x01\xf4' + stream = io.BytesIO(data) + config = LoadJobConfig() + config.source_format = 'CSV' + job = LoadJob(None, None, table_ref, client, job_config=config) + metadata = job._build_resource() + size = len(data) + response = client._do_multipart_upload( + stream, metadata, size, num_retries) + + # Check the mocks and the returned value. + self.assertIs(response, fake_transport.request.return_value) + self.assertEqual(stream.tell(), size) + get_boundary.assert_called_once_with() + + upload_url = ( + 'https://www.googleapis.com/upload/bigquery/v2/projects/' + + PROJECT + + '/jobs?uploadType=multipart') + payload = ( + b'--==0==\r\n' + + b'content-type: application/json; charset=UTF-8\r\n\r\n' + + json.dumps(metadata).encode('utf-8') + b'\r\n' + + b'--==0==\r\n' + + b'content-type: */*\r\n\r\n' + + data + b'\r\n' + + b'--==0==--') + headers = _get_upload_headers(conn.USER_AGENT) + headers['content-type'] = b'multipart/related; boundary="==0=="' + fake_transport.request.assert_called_once_with( + 'POST', + upload_url, + data=payload, + headers=headers, + ) + + @mock.patch(u'google.resumable_media._upload.get_boundary', + return_value=b'==0==') + def test__do_multipart_upload(self, get_boundary): + self._do_multipart_upload_success_helper(get_boundary) + + @mock.patch(u'google.resumable_media._upload.get_boundary', + return_value=b'==0==') + def test__do_multipart_upload_with_retry(self, get_boundary): + self._do_multipart_upload_success_helper(get_boundary, num_retries=8) + def test_copy_table(self): from google.cloud.bigquery.job import CopyJob @@ -2180,8 +2335,328 @@ def test_list_rows_errors(self): client.list_rows(1) +class TestClientUpload(object): + # NOTE: This is a "partner" to `TestClient` meant to test some of the + # "load_table_from_file" portions of `Client`. It also uses + # `pytest`-style tests rather than `unittest`-style. + + TABLE_REF = DatasetReference( + 'project_id', 'test_dataset').table('test_table') + + @staticmethod + def _make_client(transport=None): + from google.cloud.bigquery import _http + from google.cloud.bigquery import client + + cl = client.Client(project='project_id', + credentials=_make_credentials(), + _http=transport) + cl._connection = mock.create_autospec(_http.Connection, instance=True) + return cl + + @staticmethod + def _make_response(status_code, content='', headers={}): + """Make a mock HTTP response.""" + import requests + response = requests.Response() + response.request = requests.Request( + 'POST', 'http://example.com').prepare() + response._content = content.encode('utf-8') + response.headers.update(headers) + response.status_code = status_code + return response + + @classmethod + def _make_do_upload_patch(cls, client, method, + resource={}, side_effect=None): + """Patches the low-level upload helpers.""" + if side_effect is None: + side_effect = [cls._make_response( + http_client.OK, + json.dumps(resource), + {'Content-Type': 'application/json'})] + return mock.patch.object( + client, method, side_effect=side_effect, autospec=True) + + EXPECTED_CONFIGURATION = { + 'jobReference': {'projectId': 'project_id', 'jobId': 'job_id'}, + 'configuration': { + 'load': { + 'sourceFormat': 'CSV', + 'destinationTable': { + 'projectId': 'project_id', + 'datasetId': 'test_dataset', + 'tableId': 'test_table' + } + } + } + } + + @staticmethod + def _make_file_obj(): + return io.BytesIO(b'hello, is it me you\'re looking for?') + + @staticmethod + def _make_config(): + from google.cloud.bigquery.job import LoadJobConfig + + config = LoadJobConfig() + config.source_format = 'CSV' + return config + + # High-level tests + + def test_load_table_from_file_resumable(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + + client = self._make_client() + file_obj = self._make_file_obj() + + do_upload_patch = self._make_do_upload_patch( + client, '_do_resumable_upload', self.EXPECTED_CONFIGURATION) + with do_upload_patch as do_upload: + client.load_table_from_file(file_obj, self.TABLE_REF, + job_id='job_id', + job_config=self._make_config()) + + do_upload.assert_called_once_with( + file_obj, + self.EXPECTED_CONFIGURATION, + _DEFAULT_NUM_RETRIES) + + def test_load_table_from_file_resumable_metadata(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + + client = self._make_client() + file_obj = self._make_file_obj() + + config = self._make_config() + config.allow_jagged_rows = False + config.allow_quoted_newlines = False + config.create_disposition = 'CREATE_IF_NEEDED' + config.encoding = 'utf8' + config.field_delimiter = ',' + config.ignore_unknown_values = False + config.max_bad_records = 0 + config.quote_character = '"' + config.skip_leading_rows = 1 + config.write_disposition = 'WRITE_APPEND' + config.null_marker = r'\N' + + expected_config = { + 'jobReference': {'projectId': 'project_id', 'jobId': 'job_id'}, + 'configuration': { + 'load': { + 'destinationTable': { + 'projectId': self.TABLE_REF.project, + 'datasetId': self.TABLE_REF.dataset_id, + 'tableId': self.TABLE_REF.table_id, + }, + 'sourceFormat': config.source_format, + 'allowJaggedRows': config.allow_jagged_rows, + 'allowQuotedNewlines': config.allow_quoted_newlines, + 'createDisposition': config.create_disposition, + 'encoding': config.encoding, + 'fieldDelimiter': config.field_delimiter, + 'ignoreUnknownValues': config.ignore_unknown_values, + 'maxBadRecords': config.max_bad_records, + 'quote': config.quote_character, + 'skipLeadingRows': str(config.skip_leading_rows), + 'writeDisposition': config.write_disposition, + 'nullMarker': config.null_marker, + }, + }, + } + + do_upload_patch = self._make_do_upload_patch( + client, '_do_resumable_upload', expected_config) + with do_upload_patch as do_upload: + client.load_table_from_file( + file_obj, self.TABLE_REF, job_id='job_id', job_config=config) + + do_upload.assert_called_once_with( + file_obj, + expected_config, + _DEFAULT_NUM_RETRIES) + + def test_load_table_from_file_multipart(self): + from google.cloud.bigquery.client import _DEFAULT_NUM_RETRIES + + client = self._make_client() + file_obj = self._make_file_obj() + file_obj_size = 10 + config = self._make_config() + + do_upload_patch = self._make_do_upload_patch( + client, '_do_multipart_upload', self.EXPECTED_CONFIGURATION) + with do_upload_patch as do_upload: + client.load_table_from_file( + file_obj, self.TABLE_REF, job_id='job_id', job_config=config, + size=file_obj_size) + + do_upload.assert_called_once_with( + file_obj, + self.EXPECTED_CONFIGURATION, + file_obj_size, + _DEFAULT_NUM_RETRIES) + + def test_load_table_from_file_with_retries(self): + client = self._make_client() + file_obj = self._make_file_obj() + num_retries = 20 + + do_upload_patch = self._make_do_upload_patch( + client, '_do_resumable_upload', self.EXPECTED_CONFIGURATION) + with do_upload_patch as do_upload: + client.load_table_from_file( + file_obj, self.TABLE_REF, num_retries=num_retries, + job_id='job_id', job_config=self._make_config()) + + do_upload.assert_called_once_with( + file_obj, + self.EXPECTED_CONFIGURATION, + num_retries) + + def test_load_table_from_file_with_rewind(self): + client = self._make_client() + file_obj = self._make_file_obj() + file_obj.seek(2) + + with self._make_do_upload_patch( + client, '_do_resumable_upload', self.EXPECTED_CONFIGURATION): + client.load_table_from_file( + file_obj, self.TABLE_REF, rewind=True) + + assert file_obj.tell() == 0 + + def test_load_table_from_file_failure(self): + from google.resumable_media import InvalidResponse + from google.cloud import exceptions + + client = self._make_client() + file_obj = self._make_file_obj() + + response = self._make_response( + content='Someone is already in this spot.', + status_code=http_client.CONFLICT) + + do_upload_patch = self._make_do_upload_patch( + client, '_do_resumable_upload', + side_effect=InvalidResponse(response)) + + with do_upload_patch, pytest.raises(exceptions.Conflict) as exc_info: + client.load_table_from_file( + file_obj, self.TABLE_REF, rewind=True) + + assert response.text in exc_info.value.message + assert exc_info.value.errors == [] + + def test_load_table_from_file_bad_mode(self): + client = self._make_client() + file_obj = mock.Mock(spec=['mode']) + file_obj.mode = 'x' + + with pytest.raises(ValueError): + client.load_table_from_file(file_obj, self.TABLE_REF) + + # Low-level tests + + @classmethod + def _make_resumable_upload_responses(cls, size): + """Make a series of responses for a successful resumable upload.""" + from google import resumable_media + + resumable_url = 'http://test.invalid?upload_id=and-then-there-was-1' + initial_response = cls._make_response( + http_client.OK, '', {'location': resumable_url}) + data_response = cls._make_response( + resumable_media.PERMANENT_REDIRECT, + '', {'range': 'bytes=0-{:d}'.format(size - 1)}) + final_response = cls._make_response( + http_client.OK, + json.dumps({'size': size}), + {'Content-Type': 'application/json'}) + return [initial_response, data_response, final_response] + + @staticmethod + def _make_transport(responses=None): + import google.auth.transport.requests + + transport = mock.create_autospec( + google.auth.transport.requests.AuthorizedSession, instance=True) + transport.request.side_effect = responses + return transport + + def test__do_resumable_upload(self): + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + transport = self._make_transport( + self._make_resumable_upload_responses(file_obj_len)) + client = self._make_client(transport) + + result = client._do_resumable_upload( + file_obj, + self.EXPECTED_CONFIGURATION, + None) + + content = result.content.decode('utf-8') + assert json.loads(content) == {'size': file_obj_len} + + # Verify that configuration data was passed in with the initial + # request. + transport.request.assert_any_call( + 'POST', + mock.ANY, + data=json.dumps(self.EXPECTED_CONFIGURATION).encode('utf-8'), + headers=mock.ANY) + + def test__do_multipart_upload(self): + transport = self._make_transport([self._make_response(http_client.OK)]) + client = self._make_client(transport) + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + + client._do_multipart_upload( + file_obj, + self.EXPECTED_CONFIGURATION, + file_obj_len, + None) + + # Verify that configuration data was passed in with the initial + # request. + request_args = transport.request.mock_calls[0][2] + request_data = request_args['data'].decode('utf-8') + request_headers = request_args['headers'] + + request_content = email.message_from_string( + 'Content-Type: {}\r\n{}'.format( + request_headers['content-type'].decode('utf-8'), + request_data)) + + # There should be two payloads: the configuration and the binary daya. + configuration_data = request_content.get_payload(0).get_payload() + binary_data = request_content.get_payload(1).get_payload() + + assert json.loads(configuration_data) == self.EXPECTED_CONFIGURATION + assert binary_data.encode('utf-8') == file_obj.getvalue() + + def test__do_multipart_upload_wrong_size(self): + client = self._make_client() + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + + with pytest.raises(ValueError): + client._do_multipart_upload( + file_obj, + {}, + file_obj_len+1, + None) + + class _Connection(object): + USER_AGENT = 'testing 1.2.3' + def __init__(self, *responses): self._responses = responses self._requested = [] diff --git a/bigquery/tests/unit/test_table.py b/bigquery/tests/unit/test_table.py index 6e00bd73c9c6..9661a449c4fb 100644 --- a/bigquery/tests/unit/test_table.py +++ b/bigquery/tests/unit/test_table.py @@ -12,14 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import email -import io -import json import unittest import mock -from six.moves import http_client -import pytest from google.cloud.bigquery.dataset import DatasetReference @@ -1029,498 +1024,6 @@ def _row_data(row): self.assertEqual(req['path'], '/%s' % PATH) self.assertEqual(req['data'], SENT) - def test__populate_view_use_legacy_sql_resource_w_existing_view(self): - query = 'select * from foo' - resource = {'view': {'query': query}} - client = mock.Mock(spec=[u'_credentials', '_http']) - client._http = mock.sentinel.http - dataset = DatasetReference(self.PROJECT, self.DS_ID) - table = self._make_one(dataset.table(self.TABLE_NAME), client=client) - table.view_use_legacy_sql = True - - table._populate_view_use_legacy_sql_resource(resource) - - self.assertEqual( - resource['view']['useLegacySql'], table.view_use_legacy_sql) - self.assertEqual(resource['view']['query'], query) - - def test__get_transport(self): - client = mock.Mock(spec=[u'_credentials', '_http']) - client._http = mock.sentinel.http - dataset = DatasetReference(self.PROJECT, self.DS_ID) - table_ref = dataset.table(self.TABLE_NAME) - table = self._make_one(table_ref, client=client) - - transport = table._get_transport(client) - - self.assertIs(transport, mock.sentinel.http) - - @staticmethod - def _mock_requests_response(status_code, headers, content=b''): - return mock.Mock( - content=content, headers=headers, status_code=status_code, - spec=['content', 'headers', 'status_code']) - - def _mock_transport(self, status_code, headers, content=b''): - fake_transport = mock.Mock(spec=['request']) - fake_response = self._mock_requests_response( - status_code, headers, content=content) - fake_transport.request.return_value = fake_response - return fake_transport - - def _initiate_resumable_upload_helper(self, num_retries=None): - from google.resumable_media.requests import ResumableUpload - from google.cloud.bigquery.table import _DEFAULT_CHUNKSIZE - from google.cloud.bigquery.table import _GENERIC_CONTENT_TYPE - from google.cloud.bigquery.table import _get_upload_headers - from google.cloud.bigquery.table import _get_upload_metadata - - connection = _Connection() - client = _Client(self.PROJECT, connection=connection) - dataset = DatasetReference(self.PROJECT, self.DS_ID) - table_ref = dataset.table(self.TABLE_NAME) - table = self._make_one(table_ref, client=client) - - # Create mocks to be checked for doing transport. - resumable_url = 'http://test.invalid?upload_id=hey-you' - response_headers = {'location': resumable_url} - fake_transport = self._mock_transport( - http_client.OK, response_headers) - client._http = fake_transport - - # Create some mock arguments and call the method under test. - data = b'goodbye gudbi gootbee' - stream = io.BytesIO(data) - metadata = _get_upload_metadata( - 'CSV', table._schema, table.project, - table.dataset_id, table.table_id) - upload, transport = table._initiate_resumable_upload( - client, stream, metadata, num_retries) - - # Check the returned values. - self.assertIsInstance(upload, ResumableUpload) - upload_url = ( - 'https://www.googleapis.com/upload/bigquery/v2/projects/' + - self.PROJECT + - '/jobs?uploadType=resumable') - self.assertEqual(upload.upload_url, upload_url) - expected_headers = _get_upload_headers(connection.USER_AGENT) - self.assertEqual(upload._headers, expected_headers) - self.assertFalse(upload.finished) - self.assertEqual(upload._chunk_size, _DEFAULT_CHUNKSIZE) - self.assertIs(upload._stream, stream) - self.assertIsNone(upload._total_bytes) - self.assertEqual(upload._content_type, _GENERIC_CONTENT_TYPE) - self.assertEqual(upload.resumable_url, resumable_url) - - retry_strategy = upload._retry_strategy - self.assertEqual(retry_strategy.max_sleep, 64.0) - if num_retries is None: - self.assertEqual(retry_strategy.max_cumulative_retry, 600.0) - self.assertIsNone(retry_strategy.max_retries) - else: - self.assertIsNone(retry_strategy.max_cumulative_retry) - self.assertEqual(retry_strategy.max_retries, num_retries) - self.assertIs(transport, fake_transport) - # Make sure we never read from the stream. - self.assertEqual(stream.tell(), 0) - - # Check the mocks. - request_headers = expected_headers.copy() - request_headers['x-upload-content-type'] = _GENERIC_CONTENT_TYPE - fake_transport.request.assert_called_once_with( - 'POST', - upload_url, - data=json.dumps(metadata).encode('utf-8'), - headers=request_headers, - ) - - def test__initiate_resumable_upload(self): - self._initiate_resumable_upload_helper() - - def test__initiate_resumable_upload_with_retry(self): - self._initiate_resumable_upload_helper(num_retries=11) - - def _do_multipart_upload_success_helper( - self, get_boundary, num_retries=None): - from google.cloud.bigquery.table import _get_upload_headers - from google.cloud.bigquery.table import _get_upload_metadata - - connection = _Connection() - client = _Client(self.PROJECT, connection=connection) - dataset = DatasetReference(self.PROJECT, self.DS_ID) - table_ref = dataset.table(self.TABLE_NAME) - table = self._make_one(table_ref, client=client) - - # Create mocks to be checked for doing transport. - fake_transport = self._mock_transport(http_client.OK, {}) - client._http = fake_transport - - # Create some mock arguments. - data = b'Bzzzz-zap \x00\x01\xf4' - stream = io.BytesIO(data) - metadata = _get_upload_metadata( - 'CSV', table._schema, table.project, - table.dataset_id, table.table_id) - size = len(data) - response = table._do_multipart_upload( - client, stream, metadata, size, num_retries) - - # Check the mocks and the returned value. - self.assertIs(response, fake_transport.request.return_value) - self.assertEqual(stream.tell(), size) - get_boundary.assert_called_once_with() - - upload_url = ( - 'https://www.googleapis.com/upload/bigquery/v2/projects/' + - self.PROJECT + - '/jobs?uploadType=multipart') - payload = ( - b'--==0==\r\n' + - b'content-type: application/json; charset=UTF-8\r\n\r\n' + - json.dumps(metadata).encode('utf-8') + b'\r\n' + - b'--==0==\r\n' + - b'content-type: */*\r\n\r\n' + - data + b'\r\n' + - b'--==0==--') - headers = _get_upload_headers(connection.USER_AGENT) - headers['content-type'] = b'multipart/related; boundary="==0=="' - fake_transport.request.assert_called_once_with( - 'POST', - upload_url, - data=payload, - headers=headers, - ) - - @mock.patch(u'google.resumable_media._upload.get_boundary', - return_value=b'==0==') - def test__do_multipart_upload(self, get_boundary): - self._do_multipart_upload_success_helper(get_boundary) - - @mock.patch(u'google.resumable_media._upload.get_boundary', - return_value=b'==0==') - def test__do_multipart_upload_with_retry(self, get_boundary): - self._do_multipart_upload_success_helper(get_boundary, num_retries=8) - - -class TestTableUpload(object): - # NOTE: This is a "partner" to `TestTable` meant to test some of the - # "upload" portions of `Table`. It also uses `pytest`-style tests - # rather than `unittest`-style. - - @staticmethod - def _make_table(transport=None): - from google.cloud.bigquery import _http - from google.cloud.bigquery import client - from google.cloud.bigquery import dataset - from google.cloud.bigquery import table - - connection = mock.create_autospec(_http.Connection, instance=True) - client = mock.create_autospec(client.Client, instance=True) - client._connection = connection - client._credentials = mock.sentinel.credentials - client._http = transport - client.project = 'project_id' - - dataset_ref = dataset.DatasetReference('project_id', 'test_dataset') - table_ref = dataset_ref.table('test_table') - table = table.Table(table_ref, client=client) - - return table - - @staticmethod - def _make_response(status_code, content='', headers={}): - """Make a mock HTTP response.""" - import requests - response = requests.Response() - response.request = requests.Request( - 'POST', 'http://example.com').prepare() - response._content = content.encode('utf-8') - response.headers.update(headers) - response.status_code = status_code - return response - - @classmethod - def _make_do_upload_patch(cls, table, method, side_effect=None): - """Patches the low-level upload helpers.""" - if side_effect is None: - side_effect = [cls._make_response( - http_client.OK, - json.dumps({}), - {'Content-Type': 'application/json'})] - return mock.patch.object( - table, method, side_effect=side_effect, autospec=True) - - EXPECTED_CONFIGURATION = { - 'configuration': { - 'load': { - 'sourceFormat': 'CSV', - 'destinationTable': { - 'projectId': 'project_id', - 'datasetId': 'test_dataset', - 'tableId': 'test_table' - } - } - } - } - - @staticmethod - def _make_file_obj(): - return io.BytesIO(b'hello, is it me you\'re looking for?') - - # High-level tests - - def test_upload_from_file_resumable(self): - import google.cloud.bigquery.table - - table = self._make_table() - file_obj = self._make_file_obj() - - do_upload_patch = self._make_do_upload_patch( - table, '_do_resumable_upload') - with do_upload_patch as do_upload: - table.upload_from_file(file_obj, source_format='CSV') - - do_upload.assert_called_once_with( - table._client, - file_obj, - self.EXPECTED_CONFIGURATION, - google.cloud.bigquery.table._DEFAULT_NUM_RETRIES) - - def test_upload_file_resumable_metadata(self): - table = self._make_table() - file_obj = self._make_file_obj() - - config_args = { - 'source_format': 'CSV', - 'allow_jagged_rows': False, - 'allow_quoted_newlines': False, - 'create_disposition': 'CREATE_IF_NEEDED', - 'encoding': 'utf8', - 'field_delimiter': ',', - 'ignore_unknown_values': False, - 'max_bad_records': 0, - 'quote_character': '"', - 'skip_leading_rows': 1, - 'write_disposition': 'WRITE_APPEND', - 'job_name': 'oddjob', - 'null_marker': r'\N', - } - - expected_config = { - 'configuration': { - 'load': { - 'sourceFormat': config_args['source_format'], - 'destinationTable': { - 'projectId': table.project, - 'datasetId': table.dataset_id, - 'tableId': table.table_id, - }, - 'allowJaggedRows': config_args['allow_jagged_rows'], - 'allowQuotedNewlines': - config_args['allow_quoted_newlines'], - 'createDisposition': config_args['create_disposition'], - 'encoding': config_args['encoding'], - 'fieldDelimiter': config_args['field_delimiter'], - 'ignoreUnknownValues': - config_args['ignore_unknown_values'], - 'maxBadRecords': config_args['max_bad_records'], - 'quote': config_args['quote_character'], - 'skipLeadingRows': config_args['skip_leading_rows'], - 'writeDisposition': config_args['write_disposition'], - 'jobReference': {'jobId': config_args['job_name']}, - 'nullMarker': config_args['null_marker'], - }, - }, - } - - do_upload_patch = self._make_do_upload_patch( - table, '_do_resumable_upload') - with do_upload_patch as do_upload: - table.upload_from_file( - file_obj, **config_args) - - do_upload.assert_called_once_with( - table._client, - file_obj, - expected_config, - mock.ANY) - - def test_upload_from_file_multipart(self): - import google.cloud.bigquery.table - - table = self._make_table() - file_obj = self._make_file_obj() - file_obj_size = 10 - - do_upload_patch = self._make_do_upload_patch( - table, '_do_multipart_upload') - with do_upload_patch as do_upload: - table.upload_from_file( - file_obj, source_format='CSV', size=file_obj_size) - - do_upload.assert_called_once_with( - table._client, - file_obj, - self.EXPECTED_CONFIGURATION, - file_obj_size, - google.cloud.bigquery.table._DEFAULT_NUM_RETRIES) - - def test_upload_from_file_with_retries(self): - table = self._make_table() - file_obj = self._make_file_obj() - num_retries = 20 - - do_upload_patch = self._make_do_upload_patch( - table, '_do_resumable_upload') - with do_upload_patch as do_upload: - table.upload_from_file( - file_obj, source_format='CSV', num_retries=num_retries) - - do_upload.assert_called_once_with( - table._client, - file_obj, - self.EXPECTED_CONFIGURATION, - num_retries) - - def test_upload_from_file_with_rewind(self): - table = self._make_table() - file_obj = self._make_file_obj() - file_obj.seek(2) - - with self._make_do_upload_patch(table, '_do_resumable_upload'): - table.upload_from_file( - file_obj, source_format='CSV', rewind=True) - - assert file_obj.tell() == 0 - - def test_upload_from_file_failure(self): - from google.resumable_media import InvalidResponse - from google.cloud import exceptions - - table = self._make_table() - file_obj = self._make_file_obj() - - response = self._make_response( - content='Someone is already in this spot.', - status_code=http_client.CONFLICT) - - do_upload_patch = self._make_do_upload_patch( - table, '_do_resumable_upload', - side_effect=InvalidResponse(response)) - - with do_upload_patch, pytest.raises(exceptions.Conflict) as exc_info: - table.upload_from_file( - file_obj, source_format='CSV', rewind=True) - - assert response.text in exc_info.value.message - assert exc_info.value.errors == [] - - def test_upload_from_file_bad_mode(self): - table = self._make_table() - file_obj = mock.Mock(spec=['mode']) - file_obj.mode = 'x' - - with pytest.raises(ValueError): - table.upload_from_file( - file_obj, source_format='CSV',) - - # Low-level tests - - @classmethod - def _make_resumable_upload_responses(cls, size): - """Make a series of responses for a successful resumable upload.""" - from google import resumable_media - - resumable_url = 'http://test.invalid?upload_id=and-then-there-was-1' - initial_response = cls._make_response( - http_client.OK, '', {'location': resumable_url}) - data_response = cls._make_response( - resumable_media.PERMANENT_REDIRECT, - '', {'range': 'bytes=0-{:d}'.format(size - 1)}) - final_response = cls._make_response( - http_client.OK, - json.dumps({'size': size}), - {'Content-Type': 'application/json'}) - return [initial_response, data_response, final_response] - - @staticmethod - def _make_transport(responses=None): - import google.auth.transport.requests - - transport = mock.create_autospec( - google.auth.transport.requests.AuthorizedSession, instance=True) - transport.request.side_effect = responses - return transport - - def test__do_resumable_upload(self): - file_obj = self._make_file_obj() - file_obj_len = len(file_obj.getvalue()) - transport = self._make_transport( - self._make_resumable_upload_responses(file_obj_len)) - table = self._make_table(transport) - - result = table._do_resumable_upload( - table._client, - file_obj, - self.EXPECTED_CONFIGURATION, - None) - - content = result.content.decode('utf-8') - assert json.loads(content) == {'size': file_obj_len} - - # Verify that configuration data was passed in with the initial - # request. - transport.request.assert_any_call( - 'POST', - mock.ANY, - data=json.dumps(self.EXPECTED_CONFIGURATION).encode('utf-8'), - headers=mock.ANY) - - def test__do_multipart_upload(self): - transport = self._make_transport([self._make_response(http_client.OK)]) - table = self._make_table(transport) - file_obj = self._make_file_obj() - file_obj_len = len(file_obj.getvalue()) - - table._do_multipart_upload( - table._client, - file_obj, - self.EXPECTED_CONFIGURATION, - file_obj_len, - None) - - # Verify that configuration data was passed in with the initial - # request. - request_args = transport.request.mock_calls[0][2] - request_data = request_args['data'].decode('utf-8') - request_headers = request_args['headers'] - - request_content = email.message_from_string( - 'Content-Type: {}\r\n{}'.format( - request_headers['content-type'].decode('utf-8'), - request_data)) - - # There should be two payloads: the configuration and the binary daya. - configuration_data = request_content.get_payload(0).get_payload() - binary_data = request_content.get_payload(1).get_payload() - - assert json.loads(configuration_data) == self.EXPECTED_CONFIGURATION - assert binary_data.encode('utf-8') == file_obj.getvalue() - - def test__do_multipart_upload_wrong_size(self): - table = self._make_table() - file_obj = self._make_file_obj() - file_obj_len = len(file_obj.getvalue()) - - with pytest.raises(ValueError): - table._do_multipart_upload( - table._client, - file_obj, - {}, - file_obj_len+1, - None) - class Test_parse_schema_resource(unittest.TestCase, _SchemaBase):