diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 623bb948dc8c31..64bc9008f5702e 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -1315,7 +1315,9 @@ def _schedule_dag_run( self.log.error("Execution date is in future: %s", dag_run.execution_date) return callback - self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session) + if not self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session): + self.log.warning("The DAG disappeared before verifying integrity: %s. Skipping.", dag_run.dag_id) + return callback # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) if dag_run.state in State.finished: @@ -1331,20 +1333,27 @@ def _schedule_dag_run( return callback_to_run - def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> None: - """Only run DagRun.verify integrity if Serialized DAG has changed since it is slow""" + def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> bool: + """ + Only run DagRun.verify integrity if Serialized DAG has changed since it is slow. + + Return True if we determine that DAG still exists. + """ latest_version = SerializedDagModel.get_latest_version_hash(dag_run.dag_id, session=session) if dag_run.dag_hash == latest_version: self.log.debug("DAG %s not changed structure, skipping dagrun.verify_integrity", dag_run.dag_id) - return + return True dag_run.dag_hash = latest_version # Refresh the DAG dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id, session=session) + if not dag_run.dag: + return False # Verify integrity also takes care of session.flush dag_run.verify_integrity(session=session) + return True def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackRequest | None = None) -> None: self._send_sla_callbacks_to_processor(dag) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index a1d7fb31e0712e..b0c8e016c83212 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2699,6 +2699,50 @@ def test_verify_integrity_if_dag_changed(self, dag_maker): session.rollback() session.close() + def test_verify_integrity_if_dag_disappeared(self, dag_maker, caplog): + # CleanUp + with create_session() as session: + session.query(SerializedDagModel).filter( + SerializedDagModel.dag_id == "test_verify_integrity_if_dag_disappeared" + ).delete(synchronize_session=False) + + with dag_maker(dag_id="test_verify_integrity_if_dag_disappeared") as dag: + BashOperator(task_id="dummy", bash_command="echo hi") + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + + session = settings.Session() + orm_dag = dag_maker.dag_model + assert orm_dag is not None + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.processor_agent = mock.MagicMock() + dag = self.scheduler_job.dagbag.get_dag("test_verify_integrity_if_dag_disappeared", session=session) + self.scheduler_job._create_dag_runs([orm_dag], session) + dag_id = dag.dag_id + drs = DagRun.find(dag_id=dag_id, session=session) + assert len(drs) == 1 + dr = drs[0] + + dag_version_1 = SerializedDagModel.get_latest_version_hash(dag_id, session=session) + assert dr.dag_hash == dag_version_1 + assert self.scheduler_job.dagbag.dags == {"test_verify_integrity_if_dag_disappeared": dag} + assert len(self.scheduler_job.dagbag.dags.get("test_verify_integrity_if_dag_disappeared").tasks) == 1 + + SerializedDagModel.remove_dag(dag_id=dag_id) + dag = self.scheduler_job.dagbag.dags[dag_id] + self.scheduler_job.dagbag.dags = MagicMock() + self.scheduler_job.dagbag.dags.get.side_effect = [dag, None] + session.flush() + with caplog.at_level(logging.WARNING): + callback = self.scheduler_job._schedule_dag_run(dr, session) + assert "The DAG disappeared before verifying integrity" in caplog.text + + assert callback is None + + session.rollback() + session.close() + @pytest.mark.need_serialized_dag def test_retry_still_in_executor(self, dag_maker): """