Skip to content

Commit

Permalink
set project_id and location when canceling BigQuery job (#27521)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dalei Li authored Nov 7, 2022
1 parent a691ab5 commit 98a9c57
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
10 changes: 6 additions & 4 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1418,9 +1418,10 @@ def cancel_job(
:param project_id: Google Cloud Project where the job is running
:param location: location the job is running
"""
project_id = project_id or self.project_id
location = location or self.location

if self.poll_job_complete(job_id=job_id):
if self.poll_job_complete(job_id=job_id, project_id=project_id, location=location):
self.log.info("No running BigQuery jobs to cancel.")
return

Expand All @@ -1434,17 +1435,18 @@ def cancel_job(
job_complete = False
while polling_attempts < max_polling_attempts and not job_complete:
polling_attempts += 1
job_complete = self.poll_job_complete(job_id)
job_complete = self.poll_job_complete(job_id=job_id, project_id=project_id, location=location)
if job_complete:
self.log.info("Job successfully canceled: %s, %s", project_id, job_id)
elif polling_attempts == max_polling_attempts:
self.log.info(
"Stopping polling due to timeout. Job with id %s "
"Stopping polling due to timeout. Job %s, %s "
"has not completed cancel and may or may not finish.",
project_id,
job_id,
)
else:
self.log.info("Waiting for canceled job with id %s to finish.", job_id)
self.log.info("Waiting for canceled job %s, %s to finish.", project_id, job_id)
time.sleep(5)

@GoogleBaseHook.fallback_to_default_project_id
Expand Down
7 changes: 5 additions & 2 deletions tests/providers/google/cloud/hooks/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ def test_cancel_queries(self, mock_client, mock_poll_job_complete):
self.hook.running_job_id = running_job_id
self.hook.cancel_query()

calls = [mock.call(job_id=running_job_id), mock.call(running_job_id)]
calls = [
mock.call(job_id=running_job_id, project_id=PROJECT_ID, location=None),
mock.call(job_id=running_job_id, project_id=PROJECT_ID, location=None),
]
mock_poll_job_complete.assert_has_calls(calls)
mock_client.assert_called_once_with(project_id=PROJECT_ID, location=None)
mock_client.return_value.cancel_job.assert_called_once_with(job_id=running_job_id)
Expand Down Expand Up @@ -599,7 +602,7 @@ def test_cancel_query_jobs_to_cancel(

self.hook.running_job_id = JOB_ID
self.hook.cancel_query()
poll_job_complete.assert_called_once_with(job_id=JOB_ID)
poll_job_complete.assert_called_once_with(job_id=JOB_ID, project_id=PROJECT_ID, location=None)
mock_logger_info.has_call(mock.call("No running BigQuery jobs to cancel."))

@mock.patch("airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_client")
Expand Down

0 comments on commit 98a9c57

Please sign in to comment.