From 4077d04eb7d8046aa65390f0448fcff565369169 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Sat, 23 Sep 2017 06:39:10 -0400 Subject: [PATCH] bigquery: modify CopyJob Update CopyJob and CopyJobConfig to conform to the new design for jobs. --- bigquery/google/cloud/bigquery/__init__.py | 2 + bigquery/google/cloud/bigquery/client.py | 56 +++++-- bigquery/google/cloud/bigquery/job.py | 168 +++++++++++++-------- bigquery/tests/system.py | 32 ++++ bigquery/tests/unit/test_client.py | 40 ++++- bigquery/tests/unit/test_job.py | 63 ++++---- 6 files changed, 255 insertions(+), 106 deletions(-) diff --git a/bigquery/google/cloud/bigquery/__init__.py b/bigquery/google/cloud/bigquery/__init__.py index 333854035376e..ec92e7c401287 100644 --- a/bigquery/google/cloud/bigquery/__init__.py +++ b/bigquery/google/cloud/bigquery/__init__.py @@ -32,6 +32,7 @@ from google.cloud.bigquery.client import Client from google.cloud.bigquery.dataset import AccessEntry from google.cloud.bigquery.dataset import Dataset +from google.cloud.bigquery.job import CopyJobConfig from google.cloud.bigquery.job import ExtractJobConfig from google.cloud.bigquery.schema import SchemaField from google.cloud.bigquery.table import Table @@ -42,6 +43,7 @@ 'ArrayQueryParameter', 'Client', 'Dataset', + 'CopyJobConfig', 'ExtractJobConfig', 'ScalarQueryParameter', 'SchemaField', diff --git a/bigquery/google/cloud/bigquery/client.py b/bigquery/google/cloud/bigquery/client.py index bbeac294680d3..1c45d51f92da5 100644 --- a/bigquery/google/cloud/bigquery/client.py +++ b/bigquery/google/cloud/bigquery/client.py @@ -23,7 +23,7 @@ from google.cloud.bigquery._http import Connection from google.cloud.bigquery.dataset import Dataset from google.cloud.bigquery.dataset import DatasetReference -from google.cloud.bigquery.table import Table +from google.cloud.bigquery.table import Table, TableReference from google.cloud.bigquery.job import CopyJob from google.cloud.bigquery.job import ExtractJob from google.cloud.bigquery.job import LoadJob @@ -492,25 +492,45 @@ def load_table_from_storage(self, job_id, destination, *source_uris): """ return LoadJob(job_id, destination, source_uris, client=self) - def copy_table(self, job_id, destination, *sources): - """Construct a job for copying one or more tables into another table. + def copy_table(self, sources, destination, **kwargs): + """Start a job for copying one or more tables into another table. See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy - :type job_id: str - :param job_id: Name of the job. + :type sources: One of: + :class:`~google.cloud.bigquery.table.TableReference` + sequence of + :class:`~google.cloud.bigquery.table.TableReference` + :param sources: Table or tables to be copied. - :type destination: :class:`google.cloud.bigquery.table.Table` + + :type destination: :class:`google.cloud.bigquery.table.TableReference` :param destination: Table into which data is to be copied. - :type sources: sequence of :class:`google.cloud.bigquery.table.Table` - :param sources: tables to be copied. + :type kwargs: dict + :param kwargs: Additional keyword arguments. + + :Keyword Arguments: + * *job_config* + (:class:`google.cloud.bigquery.job.CopyJobConfig`) -- + (Optional) Extra configuration options for the copy job. + * *job_id* (``str``) -- + Additional content + (Optional) The ID of the job. :rtype: :class:`google.cloud.bigquery.job.CopyJob` :returns: a new ``CopyJob`` instance """ - return CopyJob(job_id, destination, sources, client=self) + job_config = kwargs.get('job_config') + job_id = _make_job_id(kwargs.get('job_id')) + + if isinstance(sources, TableReference): + sources = [sources] + job = CopyJob(job_id, sources, destination, client=self, + job_config=job_config) + job.begin() + return job def extract_table(self, source, *destination_uris, **kwargs): """Start a job to extract a table into Cloud Storage files. @@ -541,9 +561,7 @@ def extract_table(self, source, *destination_uris, **kwargs): :returns: a new ``ExtractJob`` instance """ job_config = kwargs.get('job_config') - job_id = kwargs.get('job_id') - if job_id is None: - job_id = str(uuid.uuid4()) + job_id = _make_job_id(kwargs.get('job_id')) job = ExtractJob( job_id, source, list(destination_uris), client=self, @@ -667,3 +685,17 @@ def _item_to_table(iterator, resource): :returns: The next table in the page. """ return Table.from_api_repr(resource, iterator.client) + + +def _make_job_id(job_id): + """Construct an ID for a new job. + + :type job_id: str or ``NoneType`` + :param job_id: the user-provided job ID + + :rtype: str + :returns: A job ID + """ + if job_id is None: + return str(uuid.uuid4()) + return job_id diff --git a/bigquery/google/cloud/bigquery/job.py b/bigquery/google/cloud/bigquery/job.py index a79fc8e53d206..98012336480f4 100644 --- a/bigquery/google/cloud/bigquery/job.py +++ b/bigquery/google/cloud/bigquery/job.py @@ -126,7 +126,7 @@ class Compression(_EnumApiResourceProperty): NONE = 'NONE' -class CreateDisposition(_EnumProperty): +class CreateDisposition(_EnumApiResourceProperty): """Pseudo-enum for ``create_disposition`` properties.""" CREATE_IF_NEEDED = 'CREATE_IF_NEEDED' CREATE_NEVER = 'CREATE_NEVER' @@ -159,7 +159,7 @@ class SourceFormat(_EnumProperty): AVRO = 'AVRO' -class WriteDisposition(_EnumProperty): +class WriteDisposition(_EnumApiResourceProperty): """Pseudo-enum for ``write_disposition`` properties.""" WRITE_APPEND = 'WRITE_APPEND' WRITE_TRUNCATE = 'WRITE_TRUNCATE' @@ -688,7 +688,8 @@ def output_rows(self): https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.autodetect """ - create_disposition = CreateDisposition('create_disposition') + create_disposition = CreateDisposition('create_disposition', + 'createDisposition') """See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.createDisposition """ @@ -733,7 +734,8 @@ def output_rows(self): https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.sourceFormat """ - write_disposition = WriteDisposition('write_disposition') + write_disposition = WriteDisposition('write_disposition', + 'writeDisposition') """See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.writeDisposition """ @@ -853,56 +855,98 @@ def from_api_repr(cls, resource, client): return job -class _CopyConfiguration(object): - """User-settable configuration options for copy jobs. +class CopyJobConfig(object): + """Configuration options for copy jobs. - Values which are ``None`` -> server defaults. + All properties in this class are optional. Values which are ``None`` -> + server defaults. """ - _create_disposition = None - _write_disposition = None + + def __init__(self): + self._properties = {} + + create_disposition = CreateDisposition('create_disposition', + 'createDisposition') + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.createDisposition + """ + + write_disposition = WriteDisposition('write_disposition', + 'writeDisposition') + """See + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.writeDisposition + """ + + def to_api_repr(self): + """Build an API representation of the copy job config. + + :rtype: dict + :returns: A dictionary in the format used by the BigQuery API. + """ + return copy.deepcopy(self._properties) + + @classmethod + def from_api_repr(cls, resource): + """Factory: construct a job configuration given its API representation + + :type resource: dict + :param resource: + An extract job configuration in the same representation as is + returned from the API. + + :rtype: :class:`google.cloud.bigquery.job.ExtractJobConfig` + :returns: Configuration parsed from ``resource``. + """ + config = cls() + config._properties = copy.deepcopy(resource) + return config class CopyJob(_AsyncJob): - """Asynchronous job: copy data into a table from other tables. + """Asynchr_onous job: copy data into a table from other tables. :type job_id: str :param job_id: the job's ID, within the project belonging to ``client``. - :type destination: :class:`google.cloud.bigquery.table.Table` - :param destination: Table into which data is to be loaded. - - :type sources: list of :class:`google.cloud.bigquery.table.Table` + :type sources: list of :class:`google.cloud.bigquery.table.TableReference` :param sources: Table into which data is to be loaded. + :type destination: :class:`google.cloud.bigquery.table.TableReference` + :param destination: Table into which data is to be loaded. + :type client: :class:`google.cloud.bigquery.client.Client` :param client: A client which holds credentials and project configuration for the dataset (which requires a project). - """ + :type job_config: :class:`~google.cloud.bigquery.job.CopyJobConfig` + :param job_config: + (Optional) Extra configuration options for the copy job. + """ _JOB_TYPE = 'copy' - def __init__(self, job_id, destination, sources, client): + def __init__(self, job_id, sources, destination, client, job_config=None): super(CopyJob, self).__init__(job_id, client) + + if job_config is None: + job_config = CopyJobConfig() + self.destination = destination self.sources = sources - self._configuration = _CopyConfiguration() - - create_disposition = CreateDisposition('create_disposition') - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.createDisposition - """ + self._configuration = job_config - write_disposition = WriteDisposition('write_disposition') - """See - https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.copy.writeDisposition - """ + @property + def create_disposition(self): + """See + :class:`~google.cloud.bigquery.job.CopyJobConfig.create_disposition`. + """ + return self._configuration.create_disposition - def _populate_config_resource(self, configuration): - """Helper for _build_resource: copy config properties to resource""" - if self.create_disposition is not None: - configuration['createDisposition'] = self.create_disposition - if self.write_disposition is not None: - configuration['writeDisposition'] = self.write_disposition + @property + def write_disposition(self): + """See + :class:`~google.cloud.bigquery.job.CopyJobConfig.write_disposition`. + """ + return self._configuration.write_disposition def _build_resource(self): """Generate a resource for :meth:`begin`.""" @@ -913,31 +957,27 @@ def _build_resource(self): 'tableId': table.table_id, } for table in self.sources] - resource = { + configuration = self._configuration.to_api_repr() + configuration['sourceTables'] = source_refs + configuration['destinationTable'] = { + 'projectId': self.destination.project, + 'datasetId': self.destination.dataset_id, + 'tableId': self.destination.table_id, + } + + return { 'jobReference': { 'projectId': self.project, 'jobId': self.job_id, }, 'configuration': { - self._JOB_TYPE: { - 'sourceTables': source_refs, - 'destinationTable': { - 'projectId': self.destination.project, - 'datasetId': self.destination.dataset_id, - 'tableId': self.destination.table_id, - }, - }, + self._JOB_TYPE: configuration, }, } - configuration = resource['configuration'][self._JOB_TYPE] - self._populate_config_resource(configuration) - - return resource def _copy_configuration_properties(self, configuration): """Helper: assign subclass configuration properties in cleaned.""" - self.create_disposition = configuration.get('createDisposition') - self.write_disposition = configuration.get('writeDisposition') + self._configuration._properties = copy.deepcopy(configuration) @classmethod def from_api_repr(cls, resource, client): @@ -958,27 +998,23 @@ def from_api_repr(cls, resource, client): :rtype: :class:`google.cloud.bigquery.job.CopyJob` :returns: Job parsed from ``resource``. """ - job_id, config = cls._get_resource_config(resource) - dest_config = config['destinationTable'] - ds_ref = DatasetReference(dest_config['projectId'], - dest_config['datasetId'],) - dataset = Dataset(ds_ref) - table_ref = TableReference(dataset, dest_config['tableId']) - destination = Table(table_ref, client=client) + job_id, config_resource = cls._get_resource_config(resource) + config = CopyJobConfig.from_api_repr(config_resource) + destination = TableReference.from_api_repr( + config_resource['destinationTable']) sources = [] - source_configs = config.get('sourceTables') + source_configs = config_resource.get('sourceTables') if source_configs is None: - single = config.get('sourceTable') + single = config_resource.get('sourceTable') if single is None: raise KeyError( "Resource missing 'sourceTables' / 'sourceTable'") source_configs = [single] for source_config in source_configs: - ds_ref = DatasetReference(source_config['projectId'], - source_config['datasetId']) - table_ref = ds_ref.table(source_config['tableId']) - sources.append(Table(table_ref, client=client)) - job = cls(job_id, destination, sources, client=client) + table_ref = TableReference.from_api_repr(source_config) + sources.append(table_ref) + job = cls( + job_id, sources, destination, client=client, job_config=config) job._set_properties(resource) return job @@ -1017,7 +1053,7 @@ def __init__(self): """ def to_api_repr(self): - """Build an API representation of the extact job config. + """Build an API representation of the extract job config. :rtype: dict :returns: A dictionary in the format used by the BigQuery API. @@ -1033,7 +1069,7 @@ def from_api_repr(cls, resource): An extract job configuration in the same representation as is returned from the API. - :rtype: :class:`google.cloud.bigquery.job.ExtractJobConfig` + :rtype: :class:`google.cloud.bigquery.job.CopyJobConfig` :returns: Configuration parsed from ``resource``. """ config = cls() @@ -1243,7 +1279,8 @@ def __init__(self, job_id, query, client, https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.allowLargeResults """ - create_disposition = CreateDisposition('create_disposition') + create_disposition = CreateDisposition('create_disposition', + 'createDisposition') """See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.createDisposition """ @@ -1289,7 +1326,8 @@ def __init__(self, job_id, query, client, reference/rest/v2/jobs#configuration.dryRun """ - write_disposition = WriteDisposition('write_disposition') + write_disposition = WriteDisposition('write_disposition', + 'writeDisposition') """See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.writeDisposition """ diff --git a/bigquery/tests/system.py b/bigquery/tests/system.py index c20de0561b71b..f130ea3383d4d 100644 --- a/bigquery/tests/system.py +++ b/bigquery/tests/system.py @@ -642,6 +642,38 @@ def test_extract_table_w_job_config(self): got = destination.download_as_string().decode('utf-8') self.assertIn('"Bharney Rhubble"', got) + def test_copy_table(self): + dataset = self.temp_dataset(_make_dataset_id('copy_table')) + schema = ( + bigquery.SchemaField('full_name', 'STRING', mode='REQUIRED'), + bigquery.SchemaField('age', 'INTEGER', mode='REQUIRED'), + ) + source_ref = dataset.table('source_table') + source_arg = Table(source_ref, schema=schema, client=Config.CLIENT) + source_table = retry_403(Config.CLIENT.create_table)(source_arg) + self.to_delete.insert(0, source_table) + rows = [ + ('Phred Phlyntstone', 32), + ('Bharney Rhubble', 33), + ('Wylma Phlyntstone', 29), + ('Bhettye Rhubble', 27), + ] + errors = source_table.insert_data(rows) + self.assertEqual(len(errors), 0) + + destination_ref = dataset.table('destination_table') + job_config = bigquery.CopyJobConfig() + job = Config.CLIENT.copy_table( + source_ref, destination_ref, job_config=job_config) + job.result() + + destination_table = Config.CLIENT.get_table(destination_ref) + self.to_delete.insert(0, destination_table) + got_rows = self._fetch_single_page(destination_table) + by_age = operator.itemgetter(1) + self.assertEqual(sorted(got_rows, key=by_age), + sorted(rows, key=by_age)) + def test_job_cancel(self): DATASET_ID = _make_dataset_id('job_cancel') JOB_NAME = 'fetch_' + DATASET_ID diff --git a/bigquery/tests/unit/test_client.py b/bigquery/tests/unit/test_client.py index 9cfa61234fcc5..955a08f5d19ce 100644 --- a/bigquery/tests/unit/test_client.py +++ b/bigquery/tests/unit/test_client.py @@ -1113,19 +1113,57 @@ def test_copy_table(self): DATASET = 'dataset_name' SOURCE = 'source_table' DESTINATION = 'destination_table' + RESOURCE = { + 'jobReference': { + 'projectId': PROJECT, + 'jobId': JOB, + }, + 'configuration': { + 'copy': { + 'sourceTable': { + 'projectId': PROJECT, + 'datasetId': DATASET, + 'tableId': SOURCE, + }, + 'destinationTable': { + 'projectId': PROJECT, + 'datasetId': DATASET, + 'tableId': DESTINATION, + }, + }, + }, + } creds = _make_credentials() http = object() client = self._make_one(project=PROJECT, credentials=creds, _http=http) + conn = client._connection = _Connection(RESOURCE) dataset = client.dataset(DATASET) source = dataset.table(SOURCE) destination = dataset.table(DESTINATION) - job = client.copy_table(JOB, destination, source) + + job = client.copy_table(source, destination, job_id=JOB) + + # Check that copy_table actually starts the job. + self.assertEqual(len(conn._requested), 1) + req = conn._requested[0] + self.assertEqual(req['method'], 'POST') + self.assertEqual(req['path'], '/projects/PROJECT/jobs') + self.assertIsInstance(job, CopyJob) self.assertIs(job._client, client) self.assertEqual(job.job_id, JOB) self.assertEqual(list(job.sources), [source]) self.assertIs(job.destination, destination) + conn = client._connection = _Connection(RESOURCE) + source2 = dataset.table(SOURCE + '2') + job = client.copy_table([source, source2], destination, job_id=JOB) + self.assertIsInstance(job, CopyJob) + self.assertIs(job._client, client) + self.assertEqual(job.job_id, JOB) + self.assertEqual(list(job.sources), [source, source2]) + self.assertIs(job.destination, destination) + def test_extract_table(self): from google.cloud.bigquery.job import ExtractJob diff --git a/bigquery/tests/unit/test_job.py b/bigquery/tests/unit/test_job.py index ca348704127cc..d0a654c0c15dd 100644 --- a/bigquery/tests/unit/test_job.py +++ b/bigquery/tests/unit/test_job.py @@ -17,7 +17,7 @@ from six.moves import http_client import unittest -from google.cloud.bigquery.job import ExtractJobConfig +from google.cloud.bigquery.job import ExtractJobConfig, CopyJobConfig from google.cloud.bigquery.dataset import DatasetReference @@ -83,9 +83,12 @@ def test_missing_reason(self): class _Base(object): + from google.cloud.bigquery.dataset import DatasetReference + PROJECT = 'project' SOURCE1 = 'http://example.com/source1.csv' DS_ID = 'datset_id' + DS_REF = DatasetReference(PROJECT, DS_ID) TABLE_ID = 'table_id' JOB_NAME = 'job_name' @@ -104,6 +107,11 @@ def _setUpConstants(self): self.RESOURCE_URL = 'http://example.com/path/to/resource' self.USER_EMAIL = 'phred@example.com' + def _table_ref(self, table_id): + from google.cloud.bigquery.table import TableReference + + return TableReference(self.DS_REF, table_id) + def _makeResource(self, started=False, ended=False): self._setUpConstants() resource = { @@ -895,9 +903,9 @@ def _verifyResourceProperties(self, job, resource): def test_ctor(self): client = _Client(self.PROJECT) - source = _Table(self.SOURCE_TABLE) - destination = _Table(self.DESTINATION_TABLE) - job = self._make_one(self.JOB_NAME, destination, [source], client) + source = self._table_ref(self.SOURCE_TABLE) + destination = self._table_ref(self.DESTINATION_TABLE) + job = self._make_one(self.JOB_NAME, [source], destination, client) self.assertIs(job.destination, destination) self.assertEqual(job.sources, [source]) self.assertIs(job._client, client) @@ -1035,9 +1043,9 @@ def test_begin_w_bound_client(self): del RESOURCE['user_email'] conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) - source = _Table(self.SOURCE_TABLE) - destination = _Table(self.DESTINATION_TABLE) - job = self._make_one(self.JOB_NAME, destination, [source], client) + source = self._table_ref(self.SOURCE_TABLE) + destination = self._table_ref(self.DESTINATION_TABLE) + job = self._make_one(self.JOB_NAME, [source], destination, client) job.begin() @@ -1090,13 +1098,13 @@ def test_begin_w_alternate_client(self): client1 = _Client(project=self.PROJECT, connection=conn1) conn2 = _Connection(RESOURCE) client2 = _Client(project=self.PROJECT, connection=conn2) - source = _Table(self.SOURCE_TABLE) - destination = _Table(self.DESTINATION_TABLE) - job = self._make_one(self.JOB_NAME, destination, [source], client1) - - job.create_disposition = 'CREATE_NEVER' - job.write_disposition = 'WRITE_TRUNCATE' - + source = self._table_ref(self.SOURCE_TABLE) + destination = self._table_ref(self.DESTINATION_TABLE) + config = CopyJobConfig() + config.create_disposition = 'CREATE_NEVER' + config.write_disposition = 'WRITE_TRUNCATE' + job = self._make_one(self.JOB_NAME, [source], destination, client1, + config) job.begin(client=client2) self.assertEqual(len(conn1._requested), 0) @@ -1120,9 +1128,10 @@ def test_exists_miss_w_bound_client(self): PATH = '/projects/%s/jobs/%s' % (self.PROJECT, self.JOB_NAME) conn = _Connection() client = _Client(project=self.PROJECT, connection=conn) - source = _Table(self.SOURCE_TABLE) - destination = _Table(self.DESTINATION_TABLE) - job = self._make_one(self.JOB_NAME, destination, [source], client) + + source = self._table_ref(self.SOURCE_TABLE) + destination = self._table_ref(self.DESTINATION_TABLE) + job = self._make_one(self.JOB_NAME, [source], destination, client) self.assertFalse(job.exists()) @@ -1138,9 +1147,9 @@ def test_exists_hit_w_alternate_client(self): client1 = _Client(project=self.PROJECT, connection=conn1) conn2 = _Connection({}) client2 = _Client(project=self.PROJECT, connection=conn2) - source = _Table(self.SOURCE_TABLE) - destination = _Table(self.DESTINATION_TABLE) - job = self._make_one(self.JOB_NAME, destination, [source], client1) + source = self._table_ref(self.SOURCE_TABLE) + destination = self._table_ref(self.DESTINATION_TABLE) + job = self._make_one(self.JOB_NAME, [source], destination, client1) self.assertTrue(job.exists(client=client2)) @@ -1156,9 +1165,9 @@ def test_reload_w_bound_client(self): RESOURCE = self._makeResource() conn = _Connection(RESOURCE) client = _Client(project=self.PROJECT, connection=conn) - source = _Table(self.SOURCE_TABLE) - destination = _Table(self.DESTINATION_TABLE) - job = self._make_one(self.JOB_NAME, destination, [source], client) + source = self._table_ref(self.SOURCE_TABLE) + destination = self._table_ref(self.DESTINATION_TABLE) + job = self._make_one(self.JOB_NAME, [source], destination, client) job.reload() @@ -1175,9 +1184,9 @@ def test_reload_w_alternate_client(self): client1 = _Client(project=self.PROJECT, connection=conn1) conn2 = _Connection(RESOURCE) client2 = _Client(project=self.PROJECT, connection=conn2) - source = _Table(self.SOURCE_TABLE) - destination = _Table(self.DESTINATION_TABLE) - job = self._make_one(self.JOB_NAME, destination, [source], client1) + source = self._table_ref(self.SOURCE_TABLE) + destination = self._table_ref(self.DESTINATION_TABLE) + job = self._make_one(self.JOB_NAME, [source], destination, client1) job.reload(client=client2) @@ -2709,8 +2718,6 @@ def __init__(self, table_id=None): @property def table_id(self): - if self._table_id is not None: - return self._table_id return TestLoadJob.TABLE_ID @property