From 5d9b89c9c5d1a3be21eed7ec9bd4ca137ce86064 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 4 Feb 2021 10:08:17 +0000 Subject: [PATCH] Bugfix: Scheduler fails if task is removed at runtime (#14057) closes https://github.com/apache/airflow/issues/13464 GitOrigin-RevId: eb78a8b86c6e372bbf4bfacb7628b154c16aa16b --- airflow/models/dagrun.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index f7c9026805f..fe7b29cb62e 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -38,7 +38,7 @@ from airflow import settings from airflow.configuration import conf as airflow_conf -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, TaskNotFound from airflow.models.base import ID_LEN, Base from airflow.models.taskinstance import TaskInstance as TI from airflow.settings import task_instance_mutation_hook @@ -490,7 +490,14 @@ def task_instance_scheduling_decisions(self, session: Session = None) -> TISched tis = list(self.get_task_instances(session=session, state=State.task_states + (State.SHUTDOWN,))) self.log.debug("number of tis tasks for %s: %s task(s)", self, len(tis)) for ti in tis: - ti.task = self.get_dag().get_task(ti.task_id) + try: + ti.task = self.get_dag().get_task(ti.task_id) + except TaskNotFound: + self.log.warning( + "Failed to get task '%s' for dag '%s'. Marking it as removed.", ti, ti.dag_id + ) + ti.state = State.REMOVED + session.flush() unfinished_tasks = [t for t in tis if t.state in State.unfinished] finished_tasks = [t for t in tis if t.state in State.finished]