diff --git a/airflow/providers/google/cloud/hooks/bigquery.py b/airflow/providers/google/cloud/hooks/bigquery.py index 0a279d95e5d332..dabc842dfd7536 100644 --- a/airflow/providers/google/cloud/hooks/bigquery.py +++ b/airflow/providers/google/cloud/hooks/bigquery.py @@ -1588,7 +1588,7 @@ def get_job( job_id: str, project_id: str = PROVIDE_PROJECT_ID, location: str | None = None, - ) -> CopyJob | QueryJob | LoadJob | ExtractJob | UnknownJob: + ) -> BigQueryJob | UnknownJob: """Retrieve a BigQuery job. .. seealso:: https://cloud.google.com/bigquery/docs/reference/v2/jobs @@ -1596,8 +1596,8 @@ def get_job( :param job_id: The ID of the job. The ID must contain only letters (a-z, A-Z), numbers (0-9), underscores (_), or dashes (-). The maximum length is 1,024 characters. - :param project_id: Google Cloud Project where the job is running - :param location: location the job is running + :param project_id: Google Cloud Project where the job is running. + :param location: Location where the job is running. """ client = self.get_client(project_id=project_id, location=location) job = client.get_job(job_id=job_id, project=project_id, location=location) @@ -2849,15 +2849,16 @@ def rowcount(self) -> int: return -1 def execute(self, operation: str, parameters: dict | None = None) -> None: - """Execute a BigQuery query, and return the job ID. + """Execute a BigQuery query, and update the BigQueryCursor description. :param operation: The query to execute. :param parameters: Parameters to substitute into the query. """ sql = _bind_parameters(operation, parameters) if parameters else operation self.flush_results() - self.job_id = self._run_query(sql) - + job = self._run_query(sql) + self.job_id = job.job_id + self.location = self.location or job.location query_results = self._get_query_result() if "schema" in query_results: self.description = _format_schema_for_description(query_results["schema"]) @@ -2997,15 +2998,15 @@ def _run_query( self, sql, location: str | None = None, - ) -> str: - """Run job query.""" + ) -> BigQueryJob: + """Run a job query and return the job instance.""" if not self.project_id: raise ValueError("The project_id should be set") configuration = self._prepare_query_configuration(sql) job = self.hook.insert_job(configuration=configuration, project_id=self.project_id, location=location) - return job.job_id + return job def _prepare_query_configuration( self, @@ -3357,7 +3358,7 @@ async def get_job_instance( async def _get_job( self, job_id: str | None, project_id: str = PROVIDE_PROJECT_ID, location: str | None = None - ) -> CopyJob | QueryJob | LoadJob | ExtractJob | UnknownJob: + ) -> BigQueryJob | UnknownJob: """ Get BigQuery job by its ID, project ID and location.