diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index c835ead04564..54dc947b2f04 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -1418,9 +1418,10 @@ def cancel_job( :param project_id: Google Cloud Project where the job is running :param location: location the job is running """ + project_id = project_id or self.project_id location = location or self.location - if self.poll_job_complete(job_id=job_id): + if self.poll_job_complete(job_id=job_id, project_id=project_id, location=location): self.log.info("No running BigQuery jobs to cancel.") return @@ -1434,17 +1435,18 @@ def cancel_job( job_complete = False while polling_attempts < max_polling_attempts and not job_complete: polling_attempts += 1 - job_complete = self.poll_job_complete(job_id) + job_complete = self.poll_job_complete(job_id=job_id, project_id=project_id, location=location) if job_complete: self.log.info("Job successfully canceled: %s, %s", project_id, job_id) elif polling_attempts == max_polling_attempts: self.log.info( - "Stopping polling due to timeout. Job with id %s " + "Stopping polling due to timeout. Job %s, %s " "has not completed cancel and may or may not finish.", + project_id, job_id, ) else: - self.log.info("Waiting for canceled job with id %s to finish.", job_id) + self.log.info("Waiting for canceled job %s, %s to finish.", project_id, job_id) time.sleep(5) @GoogleBaseHook.fallback_to_default_project_id diff --git a/tests/providers/google/cloud/hooks/test_bigquery.py b/tests/providers/google/cloud/hooks/test_bigquery.py index 15f815b6a693..5dadc914da15 100644 --- a/tests/providers/google/cloud/hooks/test_bigquery.py +++ b/tests/providers/google/cloud/hooks/test_bigquery.py @@ -203,7 +203,10 @@ def test_cancel_queries(self, mock_client, mock_poll_job_complete): self.hook.running_job_id = running_job_id self.hook.cancel_query() - calls = [mock.call(job_id=running_job_id), mock.call(running_job_id)] + calls = [ + mock.call(job_id=running_job_id, project_id=PROJECT_ID, location=None), + mock.call(job_id=running_job_id, project_id=PROJECT_ID, location=None), + ] mock_poll_job_complete.assert_has_calls(calls) mock_client.assert_called_once_with(project_id=PROJECT_ID, location=None) mock_client.return_value.cancel_job.assert_called_once_with(job_id=running_job_id) @@ -599,7 +602,7 @@ def test_cancel_query_jobs_to_cancel( self.hook.running_job_id = JOB_ID self.hook.cancel_query() - poll_job_complete.assert_called_once_with(job_id=JOB_ID) + poll_job_complete.assert_called_once_with(job_id=JOB_ID, project_id=PROJECT_ID, location=None) mock_logger_info.has_call(mock.call("No running BigQuery jobs to cancel.")) @mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")