Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bigquery: add Client.load_table_from_file #4136

Merged
merged 1 commit into from
Oct 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 220 additions & 1 deletion bigquery/google/cloud/bigquery/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
from __future__ import absolute_import

import collections
import os
import uuid

import six

from google import resumable_media
from google.resumable_media.requests import MultipartUpload
from google.resumable_media.requests import ResumableUpload

from google.api.core import page_iterator
from google.cloud import exceptions
from google.cloud.client import ClientWithProject
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery.dataset import Dataset
Expand All @@ -38,6 +44,20 @@
from google.cloud.bigquery._helpers import _rows_page_start


_DEFAULT_CHUNKSIZE = 1048576 # 1024 * 1024 B = 1 MB
_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
_DEFAULT_NUM_RETRIES = 6
_BASE_UPLOAD_TEMPLATE = (
u'https://www.googleapis.com/upload/bigquery/v2/projects/'
u'{project}/jobs?uploadType=')
_MULTIPART_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'multipart'
_RESUMABLE_URL_TEMPLATE = _BASE_UPLOAD_TEMPLATE + u'resumable'
_GENERIC_CONTENT_TYPE = u'*/*'
_READ_LESS_THAN_SIZE = (
'Size {:d} was specified but the file-like object only had '
'{:d} bytes remaining.')


class Project(object):
"""Wrapper for resource describing a BigQuery project.

Expand Down Expand Up @@ -536,7 +556,7 @@ def load_table_from_storage(self, source_uris, destination,
:param destination: Table into which data is to be loaded.

:type job_id: str
:param job_id: Name of the job.
:param job_id: (Optional) Name of the job.

:type job_config: :class:`google.cloud.bigquery.job.LoadJobConfig`
:param job_config: (Optional) Extra configuration options for the job.
Expand All @@ -551,6 +571,171 @@ def load_table_from_storage(self, source_uris, destination,
job.begin()
return job

def load_table_from_file(self, file_obj, destination,
rewind=False,
size=None,
num_retries=_DEFAULT_NUM_RETRIES,
job_id=None, job_config=None):
"""Upload the contents of this table from a file-like object.

Like load_table_from_storage, this creates, starts and returns
a ``LoadJob``.

:type file_obj: file
:param file_obj: A file handle opened in binary mode for reading.

:type destination: :class:`google.cloud.bigquery.table.TableReference`
:param destination: Table into which data is to be loaded.

:type rewind: bool
:param rewind: If True, seek to the beginning of the file handle before
reading the file.

:type size: int
:param size: The number of bytes to read from the file handle.
If size is ``None`` or large, resumable upload will be
used. Otherwise, multipart upload will be used.

:type num_retries: int
:param num_retries: Number of upload retries. Defaults to 6.

:type job_id: str
:param job_id: (Optional) Name of the job.

:type job_config: :class:`google.cloud.bigquery.job.LoadJobConfig`
:param job_config: (Optional) Extra configuration options for the job.

:rtype: :class:`~google.cloud.bigquery.jobs.LoadJob`

:returns: the job instance used to load the data (e.g., for
querying status). Note that the job is already started:
do not call ``job.begin()``.
:raises: :class:`ValueError` if ``size`` is not passed in and can not
be determined, or if the ``file_obj`` can be detected to be
a file opened in text mode.
"""
job_id = _make_job_id(job_id)
job = LoadJob(job_id, None, destination, self, job_config)
job_resource = job._build_resource()
if rewind:
file_obj.seek(0, os.SEEK_SET)
_check_mode(file_obj)
try:
if size is None or size >= _MAX_MULTIPART_SIZE:
response = self._do_resumable_upload(
file_obj, job_resource, num_retries)
else:
response = self._do_multipart_upload(
file_obj, job_resource, size, num_retries)
except resumable_media.InvalidResponse as exc:
raise exceptions.from_http_response(exc.response)
return self.job_from_resource(response.json())

def _do_resumable_upload(self, stream, metadata, num_retries):
"""Perform a resumable upload.

:type stream: IO[bytes]
:param stream: A bytes IO object open for reading.

:type metadata: dict
:param metadata: The metadata associated with the upload.

:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

:rtype: :class:`~requests.Response`
:returns: The "200 OK" response object returned after the final chunk
is uploaded.
"""
upload, transport = self._initiate_resumable_upload(
stream, metadata, num_retries)

while not upload.finished:
response = upload.transmit_next_chunk(transport)

return response

def _initiate_resumable_upload(self, stream, metadata, num_retries):
"""Initiate a resumable upload.

:type stream: IO[bytes]
:param stream: A bytes IO object open for reading.

:type metadata: dict
:param metadata: The metadata associated with the upload.

:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

:rtype: tuple
:returns:
Pair of

* The :class:`~google.resumable_media.requests.ResumableUpload`
that was created
* The ``transport`` used to initiate the upload.
"""
chunk_size = _DEFAULT_CHUNKSIZE
transport = self._http
headers = _get_upload_headers(self._connection.USER_AGENT)
upload_url = _RESUMABLE_URL_TEMPLATE.format(project=self.project)
upload = ResumableUpload(upload_url, chunk_size, headers=headers)

if num_retries is not None:
upload._retry_strategy = resumable_media.RetryStrategy(
max_retries=num_retries)

upload.initiate(
transport, stream, metadata, _GENERIC_CONTENT_TYPE,
stream_final=False)

return upload, transport

def _do_multipart_upload(self, stream, metadata, size, num_retries):
"""Perform a multipart upload.

:type stream: IO[bytes]
:param stream: A bytes IO object open for reading.

:type metadata: dict
:param metadata: The metadata associated with the upload.

:type size: int
:param size: The number of bytes to be uploaded (which will be read
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).

:type num_retries: int
:param num_retries: Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

:rtype: :class:`~requests.Response`
:returns: The "200 OK" response object returned after the multipart
upload request.
:raises: :exc:`ValueError` if the ``stream`` has fewer than ``size``
bytes remaining.
"""
data = stream.read(size)
if len(data) < size:
msg = _READ_LESS_THAN_SIZE.format(size, len(data))
raise ValueError(msg)

headers = _get_upload_headers(self._connection.USER_AGENT)

upload_url = _MULTIPART_URL_TEMPLATE.format(project=self.project)
upload = MultipartUpload(upload_url, headers=headers)

if num_retries is not None:
upload._retry_strategy = resumable_media.RetryStrategy(
max_retries=num_retries)

response = upload.transmit(
self._http, data, metadata, _GENERIC_CONTENT_TYPE)

return response

def copy_table(self, sources, destination, job_id=None, job_config=None):
"""Start a job for copying one or more tables into another table.

Expand Down Expand Up @@ -843,3 +1028,37 @@ def _make_job_id(job_id):
if job_id is None:
return str(uuid.uuid4())
return job_id


def _check_mode(stream):
"""Check that a stream was opened in read-binary mode.

:type stream: IO[bytes]
:param stream: A bytes IO object open for reading.

:raises: :exc:`ValueError` if the ``stream.mode`` is a valid attribute
and is not among ``rb``, ``r+b`` or ``rb+``.
"""
mode = getattr(stream, 'mode', None)

if mode is not None and mode not in ('rb', 'r+b', 'rb+'):
raise ValueError(
"Cannot upload files opened in text mode: use "
"open(filename, mode='rb') or open(filename, mode='r+b')")


def _get_upload_headers(user_agent):
"""Get the headers for an upload request.

:type user_agent: str
:param user_agent: The user-agent for requests.

:rtype: dict
:returns: The headers to be used for the request.
"""
return {
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
'User-Agent': user_agent,
'content-type': 'application/json',
}
10 changes: 5 additions & 5 deletions bigquery/google/cloud/bigquery/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,11 +686,11 @@ class LoadJob(_AsyncJob):
:type job_id: str
:param job_id: the job's ID

:type source_uris: sequence of string
:type source_uris: sequence of string or ``NoneType``
:param source_uris:
URIs of one or more data files to be loaded. See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.sourceUris
for supported URI formats.
for supported URI formats. Pass None for jobs that load from a file.

:type destination: :class:`google.cloud.bigquery.table.TableReference`
:param destination: reference to table into which data is to be loaded.
Expand Down Expand Up @@ -858,7 +858,8 @@ def output_rows(self):
def _build_resource(self):
"""Generate a resource for :meth:`begin`."""
configuration = self._configuration.to_api_repr()
configuration['sourceUris'] = self.source_uris
if self.source_uris is not None:
configuration['sourceUris'] = self.source_uris
configuration['destinationTable'] = self.destination.to_api_repr()

return {
Expand Down Expand Up @@ -900,8 +901,7 @@ def from_api_repr(cls, resource, client):
ds_ref = DatasetReference(dest_config['projectId'],
dest_config['datasetId'],)
destination = TableReference(ds_ref, dest_config['tableId'])
# TODO(jba): sourceUris should not be absent if there are no LoadJobs
# for file uploads.
# sourceUris will be absent if this is a file upload.
source_uris = config_resource.get('sourceUris')
job = cls(job_id, source_uris, destination, client, config)
job._set_properties(resource)
Expand Down
Loading