Skip to content

Commit

Permalink
Handle DAG disappearing mid-flight when dag verification happens
Browse files Browse the repository at this point in the history
When scheduler schedules a DAG and it disappears mid-flight by
DagFileProcessor, it might lead to scheduler crashing in the
verify_integrity method.

This PR simply skips scheduling the DAG in such case rather than
attempting to schedule it.

Fixes: #27622
  • Loading branch information
potiuk committed Nov 25, 2022
1 parent 542cfdc commit b23a7b6
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
17 changes: 13 additions & 4 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,9 @@ def _schedule_dag_run(
self.log.error("Execution date is in future: %s", dag_run.execution_date)
return callback

self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session)
if not self._verify_integrity_if_dag_changed(dag_run=dag_run, session=session):
self.log.warning("The DAG disappeared before verifying integrity: %s. Skipping.", dag_run.dag_id)
return callback
# TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else?
schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)
if dag_run.state in State.finished:
Expand All @@ -1331,20 +1333,27 @@ def _schedule_dag_run(

return callback_to_run

def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> None:
"""Only run DagRun.verify integrity if Serialized DAG has changed since it is slow"""
def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> bool:
"""
Only run DagRun.verify integrity if Serialized DAG has changed since it is slow.
Return True if we determine that DAG still exists.
"""
latest_version = SerializedDagModel.get_latest_version_hash(dag_run.dag_id, session=session)
if dag_run.dag_hash == latest_version:
self.log.debug("DAG %s not changed structure, skipping dagrun.verify_integrity", dag_run.dag_id)
return
return True

dag_run.dag_hash = latest_version

# Refresh the DAG
dag_run.dag = self.dagbag.get_dag(dag_id=dag_run.dag_id, session=session)
if not dag_run.dag:
return False

# Verify integrity also takes care of session.flush
dag_run.verify_integrity(session=session)
return True

def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackRequest | None = None) -> None:
self._send_sla_callbacks_to_processor(dag)
Expand Down
44 changes: 44 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2699,6 +2699,50 @@ def test_verify_integrity_if_dag_changed(self, dag_maker):
session.rollback()
session.close()

def test_verify_integrity_if_dag_disappeared(self, dag_maker, caplog):
# CleanUp
with create_session() as session:
session.query(SerializedDagModel).filter(
SerializedDagModel.dag_id == "test_verify_integrity_if_dag_disappeared"
).delete(synchronize_session=False)

with dag_maker(dag_id="test_verify_integrity_if_dag_disappeared") as dag:
BashOperator(task_id="dummy", bash_command="echo hi")

self.scheduler_job = SchedulerJob(subdir=os.devnull)

session = settings.Session()
orm_dag = dag_maker.dag_model
assert orm_dag is not None

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.processor_agent = mock.MagicMock()
dag = self.scheduler_job.dagbag.get_dag("test_verify_integrity_if_dag_disappeared", session=session)
self.scheduler_job._create_dag_runs([orm_dag], session)
dag_id = dag.dag_id
drs = DagRun.find(dag_id=dag_id, session=session)
assert len(drs) == 1
dr = drs[0]

dag_version_1 = SerializedDagModel.get_latest_version_hash(dag_id, session=session)
assert dr.dag_hash == dag_version_1
assert self.scheduler_job.dagbag.dags == {"test_verify_integrity_if_dag_disappeared": dag}
assert len(self.scheduler_job.dagbag.dags.get("test_verify_integrity_if_dag_disappeared").tasks) == 1

SerializedDagModel.remove_dag(dag_id=dag_id)
dag = self.scheduler_job.dagbag.dags[dag_id]
self.scheduler_job.dagbag.dags = MagicMock()
self.scheduler_job.dagbag.dags.get.side_effect = [dag, None]
session.flush()
with caplog.at_level(logging.WARNING):
callback = self.scheduler_job._schedule_dag_run(dr, session)
assert "The DAG disappeared before verifying integrity" in caplog.text

assert callback is None

session.rollback()
session.close()

@pytest.mark.need_serialized_dag
def test_retry_still_in_executor(self, dag_maker):
"""
Expand Down

0 comments on commit b23a7b6

Please sign in to comment.