diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index f7c9026805f..fe7b29cb62e 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -38,7 +38,7 @@ from airflow import settings from airflow.configuration import conf as airflow_conf -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, TaskNotFound from airflow.models.base import ID_LEN, Base from airflow.models.taskinstance import TaskInstance as TI from airflow.settings import task_instance_mutation_hook @@ -490,7 +490,14 @@ def task_instance_scheduling_decisions(self, session: Session = None) -> TISched tis = list(self.get_task_instances(session=session, state=State.task_states + (State.SHUTDOWN,))) self.log.debug("number of tis tasks for %s: %s task(s)", self, len(tis)) for ti in tis: - ti.task = self.get_dag().get_task(ti.task_id) + try: + ti.task = self.get_dag().get_task(ti.task_id) + except TaskNotFound: + self.log.warning( + "Failed to get task '%s' for dag '%s'. Marking it as removed.", ti, ti.dag_id + ) + ti.state = State.REMOVED + session.flush() unfinished_tasks = [t for t in tis if t.state in State.unfinished] finished_tasks = [t for t in tis if t.state in State.finished]