diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index b4f2714e3a6ae1..3440832275ef23 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -627,7 +627,6 @@ def _process_executor_events(self, session: Session = None) -> int: buffer_key = ti.key.with_try_number(try_number) state, info = event_buffer.pop(buffer_key) - # TODO: should we fail RUNNING as well, as we do in Backfills? if state == TaskInstanceState.QUEUED: ti.external_executor_id = info self.log.info("Setting external_id for %s to %s", ti, info) @@ -663,7 +662,20 @@ def _process_executor_events(self, session: Session = None) -> int: ti.pid, ) - if ti.try_number == buffer_key.try_number and ti.state == State.QUEUED: + # There are two scenarios why the same TI with the same try_number is queued + # after executor is finished with it: + # 1) the TI was killed externally and it had no time to mark itself failed + # - in this case we should mark it as failed here. + # 2) the TI has been requeued after getting deferred - in this case either our executor has it + # or the TI is queued by another job. Either ways we should not fail it. + + # All of this could also happen if the state is "running", + # but that is handled by the zombie detection. + + ti_queued = ti.try_number == buffer_key.try_number and ti.state == TaskInstanceState.QUEUED + ti_requeued = ti.queued_by_job_id != self.id or self.executor.has_task(ti) + + if ti_queued and not ti_requeued: Stats.incr('scheduler.tasks.killed_externally') msg = ( "Executor reports task instance %s finished (%s) although the " diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 1b11ae3b96d65b..de909ef6e69bda 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -381,6 +381,68 @@ def test_process_executor_event_missing_dag(self, mock_stats_incr, mock_task_cal ti1.refresh_from_db() assert ti1.state == State.FAILED + @mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest') + @mock.patch('airflow.jobs.scheduler_job.Stats.incr') + def test_process_executor_events_ti_requeued(self, mock_stats_incr, mock_task_callback, dag_maker): + dag_id = "test_process_executor_events_ti_requeued" + task_id_1 = 'dummy_task' + + session = settings.Session() + with dag_maker(dag_id=dag_id, fileloc='/test_path1/'): + task1 = EmptyOperator(task_id=task_id_1) + ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id) + + mock_stats_incr.reset_mock() + + executor = MockExecutor(do_update=False) + task_callback = mock.MagicMock() + mock_task_callback.return_value = task_callback + self.scheduler_job = SchedulerJob(executor=executor) + self.scheduler_job.id = 1 + self.scheduler_job.processor_agent = mock.MagicMock() + + # ti is queued with another try number - do not fail it + ti1.state = State.QUEUED + ti1.queued_by_job_id = 1 + ti1.try_number = 2 + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None + + self.scheduler_job._process_executor_events(session=session) + ti1.refresh_from_db(session=session) + assert ti1.state == State.QUEUED + self.scheduler_job.executor.callback_sink.send.assert_not_called() + + # ti is queued by another scheduler - do not fail it + ti1.state = State.QUEUED + ti1.queued_by_job_id = 2 + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key] = State.SUCCESS, None + + self.scheduler_job._process_executor_events(session=session) + ti1.refresh_from_db(session=session) + assert ti1.state == State.QUEUED + self.scheduler_job.executor.callback_sink.send.assert_not_called() + + # ti is queued by this scheduler but it is handed back to the executor - do not fail it + ti1.state = State.QUEUED + ti1.queued_by_job_id = 1 + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key] = State.SUCCESS, None + executor.has_task = mock.MagicMock(return_value=True) + + self.scheduler_job._process_executor_events(session=session) + ti1.refresh_from_db(session=session) + assert ti1.state == State.QUEUED + self.scheduler_job.executor.callback_sink.send.assert_not_called() + mock_stats_incr.assert_not_called() + def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker): dag_id = 'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute' task_id_1 = 'dummy_task'