From 881e2da9287863e56666e76f4c21cecb68df6af5 Mon Sep 17 00:00:00 2001 From: raphaelauv Date: Fri, 3 Jun 2022 15:19:31 +0200 Subject: [PATCH] fix BigQueryInsertJobOperator --- .../google/cloud/operators/bigquery.py | 19 ++++---- .../google/cloud/operators/test_bigquery.py | 44 ++++++++++++++++++- 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 268bbc1cad8c2..a53b4bad0516a 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -2167,14 +2167,17 @@ def execute(self, context: Any): f"Or, if you want to reattach in this scenario add {job.state} to `reattach_states`" ) - table = job.to_api_repr()["configuration"]["query"]["destinationTable"] - BigQueryTableLink.persist( - context=context, - task_instance=self, - dataset_id=table["datasetId"], - project_id=table["projectId"], - table_id=table["tableId"], - ) + if "query" in job.to_api_repr()["configuration"]: + if "destinationTable" in job.to_api_repr()["configuration"]["query"]: + table = job.to_api_repr()["configuration"]["query"]["destinationTable"] + BigQueryTableLink.persist( + context=context, + task_instance=self, + dataset_id=table["datasetId"], + project_id=table["projectId"], + table_id=table["tableId"], + ) + self.job_id = job.job_id return job.job_id diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index b5e42cce2f96a..42f8794e8af6b 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -788,7 +788,7 @@ def test_execute(self, mock_hook): class TestBigQueryInsertJobOperator: @mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5') @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook') - def test_execute_success(self, mock_hook, mock_md5): + def test_execute_query_success(self, mock_hook, mock_md5): job_id = "123456" hash_ = "hash" real_job_id = f"{job_id}_{hash_}" @@ -822,6 +822,48 @@ def test_execute_success(self, mock_hook, mock_md5): assert result == real_job_id + @mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5') + @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook') + def test_execute_copy_success(self, mock_hook, mock_md5): + job_id = "123456" + hash_ = "hash" + real_job_id = f"{job_id}_{hash_}" + mock_md5.return_value.hexdigest.return_value = hash_ + + configuration = { + "copy": { + "sourceTable": "aaa", + "destinationTable": "bbb", + } + } + mock_configuration = { + "configuration": configuration, + "jobReference": "a", + } + mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False) + + mock_hook.return_value.insert_job.return_value.to_api_repr.return_value = mock_configuration + + op = BigQueryInsertJobOperator( + task_id="copy_query_job", + configuration=configuration, + location=TEST_DATASET_LOCATION, + job_id=job_id, + project_id=TEST_GCP_PROJECT_ID, + ) + result = op.execute(context=MagicMock()) + + mock_hook.return_value.insert_job.assert_called_once_with( + configuration=configuration, + location=TEST_DATASET_LOCATION, + job_id=real_job_id, + project_id=TEST_GCP_PROJECT_ID, + retry=DEFAULT_RETRY, + timeout=None, + ) + + assert result == real_job_id + @mock.patch('airflow.providers.google.cloud.operators.bigquery.hashlib.md5') @mock.patch('airflow.providers.google.cloud.operators.bigquery.BigQueryHook') def test_on_kill(self, mock_hook, mock_md5):