From fc396ad22a375eb520c609b4f2fdb159323d9d85 Mon Sep 17 00:00:00 2001 From: Wei Lee Date: Fri, 22 Mar 2024 18:40:31 +0800 Subject: [PATCH] fix(google): add return statement to yield within a while loop in triggers (#38394) --- airflow/providers/google/cloud/triggers/bigquery.py | 3 +-- airflow/providers/google/cloud/triggers/cloud_run.py | 1 + airflow/providers/google/cloud/triggers/cloud_sql.py | 1 + airflow/providers/google/cloud/triggers/kubernetes_engine.py | 1 - airflow/providers/google/cloud/triggers/mlengine.py | 2 ++ airflow/providers/google/cloud/triggers/pubsub.py | 5 ++--- 6 files changed, 7 insertions(+), 6 deletions(-) diff --git a/airflow/providers/google/cloud/triggers/bigquery.py b/airflow/providers/google/cloud/triggers/bigquery.py index 302316e4ae581f..eafa4825bec2e4 100644 --- a/airflow/providers/google/cloud/triggers/bigquery.py +++ b/airflow/providers/google/cloud/triggers/bigquery.py @@ -160,7 +160,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] "records": None, } ) - return else: # Extract only first record from the query results first_record = records.pop(0) @@ -171,7 +170,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] "records": first_record, } ) - return + return elif job_status["status"] == "error": yield TriggerEvent({"status": "error", "message": job_status["message"]}) return diff --git a/airflow/providers/google/cloud/triggers/cloud_run.py b/airflow/providers/google/cloud/triggers/cloud_run.py index 4e740aa048e40b..840d8effb0892d 100644 --- a/airflow/providers/google/cloud/triggers/cloud_run.py +++ b/airflow/providers/google/cloud/triggers/cloud_run.py @@ -120,6 +120,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: "job_name": self.job_name, } ) + return elif operation.error.message: raise AirflowException(f"Cloud Run Job error: {operation.error.message}") diff --git a/airflow/providers/google/cloud/triggers/cloud_sql.py b/airflow/providers/google/cloud/triggers/cloud_sql.py index 256b384c081c4d..62043ee0e6fdb6 100644 --- a/airflow/providers/google/cloud/triggers/cloud_sql.py +++ b/airflow/providers/google/cloud/triggers/cloud_sql.py @@ -80,6 +80,7 @@ async def run(self): } ) return + yield TriggerEvent( { "operation_name": operation["name"], diff --git a/airflow/providers/google/cloud/triggers/kubernetes_engine.py b/airflow/providers/google/cloud/triggers/kubernetes_engine.py index de5984db47ee4a..84c771691afb96 100644 --- a/airflow/providers/google/cloud/triggers/kubernetes_engine.py +++ b/airflow/providers/google/cloud/triggers/kubernetes_engine.py @@ -211,7 +211,6 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] self.log.info("Operation is still running.") self.log.info("Sleeping for %ss...", self.poll_interval) await asyncio.sleep(self.poll_interval) - else: yield TriggerEvent( { diff --git a/airflow/providers/google/cloud/triggers/mlengine.py b/airflow/providers/google/cloud/triggers/mlengine.py index 9e1e90df3c689f..87fb1f57353a78 100644 --- a/airflow/providers/google/cloud/triggers/mlengine.py +++ b/airflow/providers/google/cloud/triggers/mlengine.py @@ -103,12 +103,14 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] "message": "Job completed", } ) + return elif response_from_hook == "pending": self.log.info("Job is still running...") self.log.info("Sleeping for %s seconds.", self.poll_interval) await asyncio.sleep(self.poll_interval) else: yield TriggerEvent({"status": "error", "message": response_from_hook}) + return except Exception as e: self.log.exception("Exception occurred while checking for query completion") diff --git a/airflow/providers/google/cloud/triggers/pubsub.py b/airflow/providers/google/cloud/triggers/pubsub.py index cd513d299d51ef..6b95e283adddb4 100644 --- a/airflow/providers/google/cloud/triggers/pubsub.py +++ b/airflow/providers/google/cloud/triggers/pubsub.py @@ -102,9 +102,8 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] if pulled_messages: if self.ack_messages: await self.message_acknowledgement(pulled_messages) - yield TriggerEvent({"status": "success", "message": pulled_messages}) - else: - yield TriggerEvent({"status": "success", "message": pulled_messages}) + yield TriggerEvent({"status": "success", "message": pulled_messages}) + return else: pulled_messages = await self.hook.pull( project_id=self.project_id,