Skip to content

Commit

Permalink
bigquery: add Client.load_table_from_file (#4136)
Browse files Browse the repository at this point in the history
Move the method from Table to Client.
  • Loading branch information
jba authored and tswast committed Oct 16, 2017
1 parent de2a3b6 commit 9876ad6
Show file tree
Hide file tree
Showing 6 changed files with 726 additions and 937 deletions.
221 changes: 220 additions & 1 deletion bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
from __future__ import absolute_import

import collections
import os
import uuid

import six

from google import resumable_media
from google.resumable_media.requests import MultipartUpload
from google.resumable_media.requests import ResumableUpload

from google.api.core import page_iterator
from google.cloud import exceptions
from google.cloud.client import ClientWithProject
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery.dataset import Dataset
Expand All @@ -37,6 +43,20 @@
from google.cloud.bigquery._helpers import _rows_page_start


_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB
_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
_DEFAULT_NUM_RETRIES = 6
_BASE_UPLOAD_TEMPLATE = (
u'https://www.googleapis.com/upload/bigquery/v2/projects/'
u'{project}/jobs?uploadType=')
_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'multipart'
_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'resumable'
_GENERIC_CONTENT_TYPE = u'*/*'
_READ_LESS_THAN_SIZE = (
'Size {:d} was specified but the file-like object only had '
'{:d} bytes remaining.')


class Project(object):
"""Wrapper for resource describing a BigQuery project.
Expand Down Expand Up @@ -535,7 +555,7 @@ def load_table_from_storage(self, source_uris, destination,
:param destination: Table into which data is to be loaded.
:type job_id: str
:param job_id: Name of the job.
:param job_id: (Optional) Name of the job.
:type job_config: :class:`google.cloud.bigquery.job.LoadJobConfig`
:param job_config: (Optional) Extra configuration options for the job.
Expand All @@ -550,6 +570,171 @@ def load_table_from_storage(self, source_uris, destination,
job.begin()
return job

def load_table_from_file(self, file_obj, destination,
rewind=False,
size=None,
num_retries=_DEFAULT_NUM_RETRIES,
job_id=None, job_config=None):
"""Upload the contents of this table from a file-like object.
Like load_table_from_storage, this creates, starts and returns
a ``LoadJob``.
:type file_obj: file
:param file_obj: A file handle opened in binary mode for reading.
:type destination: :class:`google.cloud.bigquery.table.TableReference`
:param destination: Table into which data is to be loaded.
:type rewind: bool
:param rewind: If True, seek to the beginning of the file handle before
reading the file.
:type size: int
:param size: The number of bytes to read from the file handle.
If size is ``None`` or large, resumable upload will be
used. Otherwise, multipart upload will be used.
:type num_retries: int
:param num_retries: Number of upload retries. Defaults to 6.
:type job_id: str
:param job_id: (Optional) Name of the job.
:type job_config: :class:`google.cloud.bigquery.job.LoadJobConfig`
:param job_config: (Optional) Extra configuration options for the job.
:rtype: :class:`~google.cloud.bigquery.jobs.LoadJob`
:returns: the job instance used to load the data (e.g., for
querying status). Note that the job is already started:
do not call ``job.begin()``.
:raises: :class:`ValueError` if ``size`` is not passed in and can not
be determined, or if the ``file_obj`` can be detected to be
a file opened in text mode.
"""
job_id = _make_job_id(job_id)
job = LoadJob(job_id, None, destination, self, job_config)
job_resource = job._build_resource()
if rewind:
file_obj.seek(0, os.SEEK_SET)
_check_mode(file_obj)
try:
if size is None or size >= _MAX_MULTIPART_SIZE:
response = self._do_resumable_upload(
file_obj, job_resource, num_retries)
else:
response = self._do_multipart_upload(
file_obj, job_resource, size, num_retries)
except resumable_media.InvalidResponse as exc:
raise exceptions.from_http_response(exc.response)
return self.job_from_resource(response.json())

def _do_resumable_upload(self, stream, metadata, num_retries):
"""Perform a resumable upload.
:type stream: IO[bytes]
:param stream: A bytes IO object open for reading.
:type metadata: dict
:param metadata: The metadata associated with the upload.
:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
:rtype: :class:`~requests.Response`
:returns: The "200 OK" response object returned after the final chunk
is uploaded.
"""
upload, transport = self._initiate_resumable_upload(
stream, metadata, num_retries)

while not upload.finished:
response = upload.transmit_next_chunk(transport)

return response

def _initiate_resumable_upload(self, stream, metadata, num_retries):
"""Initiate a resumable upload.
:type stream: IO[bytes]
:param stream: A bytes IO object open for reading.
:type metadata: dict
:param metadata: The metadata associated with the upload.
:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
:rtype: tuple
:returns:
Pair of
* The :class:`~google.resumable_media.requests.ResumableUpload`
that was created
* The ``transport`` used to initiate the upload.
"""
chunk_size = _DEFAULT_CHUNKSIZE
transport = self._http
headers = _get_upload_headers(self._connection.USER_AGENT)
upload_url = _RESUMABLE_URL_TEMPLATE.format(project=self.project)
upload = ResumableUpload(upload_url, chunk_size, headers=headers)

if num_retries is not None:
upload._retry_strategy = resumable_media.RetryStrategy(
max_retries=num_retries)

upload.initiate(
transport, stream, metadata, _GENERIC_CONTENT_TYPE,
stream_final=False)

return upload, transport

def _do_multipart_upload(self, stream, metadata, size, num_retries):
"""Perform a multipart upload.
:type stream: IO[bytes]
:param stream: A bytes IO object open for reading.
:type metadata: dict
:param metadata: The metadata associated with the upload.
:type size: int
:param size: The number of bytes to be uploaded (which will be read
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).
:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
:rtype: :class:`~requests.Response`
:returns: The "200 OK" response object returned after the multipart
upload request.
:raises: :exc:`ValueError` if the ``stream`` has fewer than ``size``
bytes remaining.
"""
data = stream.read(size)
if len(data) < size:
msg = _READ_LESS_THAN_SIZE.format(size, len(data))
raise ValueError(msg)

headers = _get_upload_headers(self._connection.USER_AGENT)

upload_url = _MULTIPART_URL_TEMPLATE.format(project=self.project)
upload = MultipartUpload(upload_url, headers=headers)

if num_retries is not None:
upload._retry_strategy = resumable_media.RetryStrategy(
max_retries=num_retries)

response = upload.transmit(
self._http, data, metadata, _GENERIC_CONTENT_TYPE)

return response

def copy_table(self, sources, destination, job_id=None, job_config=None):
"""Start a job for copying one or more tables into another table.
Expand Down Expand Up @@ -832,3 +1017,37 @@ def _make_job_id(job_id):
if job_id is None:
return str(uuid.uuid4())
return job_id


def _check_mode(stream):
"""Check that a stream was opened in read-binary mode.
:type stream: IO[bytes]
:param stream: A bytes IO object open for reading.
:raises: :exc:`ValueError` if the ``stream.mode`` is a valid attribute
and is not among ``rb``, ``r+b`` or ``rb+``.
"""
mode = getattr(stream, 'mode', None)

if mode is not None and mode not in ('rb', 'r+b', 'rb+'):
raise ValueError(
"Cannot upload files opened in text mode: use "
"open(filename, mode='rb') or open(filename, mode='r+b')")


def _get_upload_headers(user_agent):
"""Get the headers for an upload request.
:type user_agent: str
:param user_agent: The user-agent for requests.
:rtype: dict
:returns: The headers to be used for the request.
"""
return {
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
'User-Agent': user_agent,
'content-type': 'application/json',
}
10 changes: 5 additions & 5 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -684,11 +684,11 @@ class LoadJob(_AsyncJob):
:type job_id: str
:param job_id: the job's ID
:type source_uris: sequence of string
:type source_uris: sequence of string or ``NoneType``
:param source_uris:
URIs of one or more data files to be loaded. See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.sourceUris
for supported URI formats.
for supported URI formats. Pass None for jobs that load from a file.
:type destination: :class:`google.cloud.bigquery.table.TableReference`
:param destination: reference to table into which data is to be loaded.
Expand Down Expand Up @@ -856,7 +856,8 @@ def output_rows(self):
def _build_resource(self):
"""Generate a resource for :meth:`begin`."""
configuration = self._configuration.to_api_repr()
configuration['sourceUris'] = self.source_uris
if self.source_uris is not None:
configuration['sourceUris'] = self.source_uris
configuration['destinationTable'] = self.destination.to_api_repr()

return {
Expand Down Expand Up @@ -898,8 +899,7 @@ def from_api_repr(cls, resource, client):
ds_ref = DatasetReference(dest_config['projectId'],
dest_config['datasetId'],)
destination = TableReference(ds_ref, dest_config['tableId'])
# TODO(jba): sourceUris should not be absent if there are no LoadJobs
# for file uploads.
# sourceUris will be absent if this is a file upload.
source_uris = config_resource.get('sourceUris')
job = cls(job_id, source_uris, destination, client, config)
job._set_properties(resource)
Expand Down
Loading

0 comments on commit 9876ad6

Please sign in to comment.