Skip to content

Commit

Permalink
Fix BigQueryCursor execute method if the location is missing
Browse files Browse the repository at this point in the history
  • Loading branch information
e-galan authored and eladkal committed May 18, 2024
1 parent 77a6b4f commit 2cd6917
Showing 1 changed file with 11 additions and 10 deletions.
21 changes: 11 additions & 10 deletions airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1588,16 +1588,16 @@ def get_job(
job_id: str,
project_id: str = PROVIDE_PROJECT_ID,
location: str | None = None,
) -> CopyJob | QueryJob | LoadJob | ExtractJob | UnknownJob:
) -> BigQueryJob | UnknownJob:
"""Retrieve a BigQuery job.
.. seealso:: https://cloud.google.com/bigquery/docs/reference/v2/jobs
:param job_id: The ID of the job. The ID must contain only letters (a-z, A-Z),
numbers (0-9), underscores (_), or dashes (-). The maximum length is 1,024
characters.
:param project_id: Google Cloud Project where the job is running
:param location: location the job is running
:param project_id: Google Cloud Project where the job is running.
:param location: Location where the job is running.
"""
client = self.get_client(project_id=project_id, location=location)
job = client.get_job(job_id=job_id, project=project_id, location=location)
Expand Down Expand Up @@ -2849,15 +2849,16 @@ def rowcount(self) -> int:
return -1

def execute(self, operation: str, parameters: dict | None = None) -> None:
"""Execute a BigQuery query, and return the job ID.
"""Execute a BigQuery query, and update the BigQueryCursor description.
:param operation: The query to execute.
:param parameters: Parameters to substitute into the query.
"""
sql = _bind_parameters(operation, parameters) if parameters else operation
self.flush_results()
self.job_id = self._run_query(sql)

job = self._run_query(sql)
self.job_id = job.job_id
self.location = self.location or job.location
query_results = self._get_query_result()
if "schema" in query_results:
self.description = _format_schema_for_description(query_results["schema"])
Expand Down Expand Up @@ -2997,15 +2998,15 @@ def _run_query(
self,
sql,
location: str | None = None,
) -> str:
"""Run job query."""
) -> BigQueryJob:
"""Run a job query and return the job instance."""
if not self.project_id:
raise ValueError("The project_id should be set")

configuration = self._prepare_query_configuration(sql)
job = self.hook.insert_job(configuration=configuration, project_id=self.project_id, location=location)

return job.job_id
return job

def _prepare_query_configuration(
self,
Expand Down Expand Up @@ -3357,7 +3358,7 @@ async def get_job_instance(

async def _get_job(
self, job_id: str | None, project_id: str = PROVIDE_PROJECT_ID, location: str | None = None
) -> CopyJob | QueryJob | LoadJob | ExtractJob | UnknownJob:
) -> BigQueryJob | UnknownJob:
"""
Get BigQuery job by its ID, project ID and location.
Expand Down

0 comments on commit 2cd6917

Please sign in to comment.