From 49c7fae767098483a5f3ce3397341fbf28e8ed3a Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Sat, 21 May 2022 17:06:29 +0300 Subject: [PATCH 1/4] Formatting --- airflow/jobs/scheduler_job.py | 16 +++++++++-- tests/jobs/test_scheduler_job.py | 48 ++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 22ba5decb4111..57d0a4954ac12 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -628,7 +628,6 @@ def _process_executor_events(self, session: Session = None) -> int: buffer_key = ti.key.with_try_number(try_number) state, info = event_buffer.pop(buffer_key) - # TODO: should we fail RUNNING as well, as we do in Backfills? if state == TaskInstanceState.QUEUED: ti.external_executor_id = info self.log.info("Setting external_id for %s to %s", ti, info) @@ -664,7 +663,20 @@ def _process_executor_events(self, session: Session = None) -> int: ti.pid, ) - if ti.try_number == buffer_key.try_number and ti.state == State.QUEUED: + # There are two scenarios why the same TI with the same try_number is queued + # after executor is finished with it: + # 1) the TI was killed externally and it had no time to mark itself failed + # - in this case we should mark it as failed here. + # 2) the TI has been re-queued after getting deferred - in this case either our executor has it + # or the TI is queued by another job. Either ways we should not fail it. + + # All of this could also happen if the state is "running", + # but that is handled by the zombie detection. + + ti_queued = ti.try_number == buffer_key.try_number and ti.state == State.QUEUED + ti_requeued = ti.queued_by_job_id != self.id or self.executor.has_task(ti) + + if ti_queued and not ti_requeued: Stats.incr('scheduler.tasks.killed_externally') msg = ( "Executor reports task instance %s finished (%s) although the " diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 6e0135ade78c5..830ab2b826159 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -381,6 +381,54 @@ def test_process_executor_event_missing_dag(self, mock_stats_incr, mock_task_cal ti1.refresh_from_db() assert ti1.state == State.FAILED + @mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest') + @mock.patch('airflow.jobs.scheduler_job.Stats.incr') + def test_process_executor_events_ti_requeued(self, mock_stats_incr, mock_task_callback, dag_maker): + dag_id = "test_process_executor_events_ti_requeued" + task_id_1 = 'dummy_task' + + session = settings.Session() + with dag_maker(dag_id=dag_id, fileloc='/test_path1/'): + task1 = EmptyOperator(task_id=task_id_1) + ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id) + + mock_stats_incr.reset_mock() + + executor = MockExecutor(do_update=False) + task_callback = mock.MagicMock() + mock_task_callback.return_value = task_callback + self.scheduler_job = SchedulerJob(executor=executor) + self.scheduler_job.id = 1 + self.scheduler_job.processor_agent = mock.MagicMock() + + # ti is queued by another scheduler - do not fail it + ti1.state = State.QUEUED + ti1.queued_by_job_id = 2 + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key] = State.SUCCESS, None + + self.scheduler_job._process_executor_events(session=session) + ti1.refresh_from_db(session=session) + assert ti1.state == State.QUEUED + self.scheduler_job.executor.callback_sink.send.assert_not_called() + + # ti is queued by this scheduler but it is handed back to the executor - do not fail it + ti1.state = State.QUEUED + ti1.queued_by_job_id = 1 + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key] = State.SUCCESS, None + executor.has_task = mock.MagicMock(return_value=True) + + self.scheduler_job._process_executor_events(session=session) + ti1.refresh_from_db(session=session) + assert ti1.state == State.QUEUED + self.scheduler_job.executor.callback_sink.send.assert_not_called() + mock_stats_incr.assert_not_called() + def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker): dag_id = 'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute' task_id_1 = 'dummy_task' From d33fdfcdd375fe4ebefb98a6fb84436aaf10252d Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Sat, 21 May 2022 17:09:14 +0300 Subject: [PATCH 2/4] Formatting --- airflow/jobs/scheduler_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 57d0a4954ac12..ae087f5827672 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -667,7 +667,7 @@ def _process_executor_events(self, session: Session = None) -> int: # after executor is finished with it: # 1) the TI was killed externally and it had no time to mark itself failed # - in this case we should mark it as failed here. - # 2) the TI has been re-queued after getting deferred - in this case either our executor has it + # 2) the TI has been requeued after getting deferred - in this case either our executor has it # or the TI is queued by another job. Either ways we should not fail it. # All of this could also happen if the state is "running", From 8b21a509b8f6b2f441cf2470af948bf91601d98d Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Mon, 23 May 2022 05:42:01 +0300 Subject: [PATCH 3/4] Update airflow/jobs/scheduler_job.py Co-authored-by: Tzu-ping Chung --- airflow/jobs/scheduler_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index ae087f5827672..9bcb0c9176f5b 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -673,7 +673,7 @@ def _process_executor_events(self, session: Session = None) -> int: # All of this could also happen if the state is "running", # but that is handled by the zombie detection. - ti_queued = ti.try_number == buffer_key.try_number and ti.state == State.QUEUED + ti_queued = ti.try_number == buffer_key.try_number and ti.state == TaskInstanceState.QUEUED ti_requeued = ti.queued_by_job_id != self.id or self.executor.has_task(ti) if ti_queued and not ti_requeued: From 6c20124cbbf0aaaba20672afef7ee6fb22b8fbe0 Mon Sep 17 00:00:00 2001 From: Tanel Kiis Date: Tue, 7 Jun 2022 07:45:53 +0300 Subject: [PATCH 4/4] Test process executor events with different try_number --- tests/jobs/test_scheduler_job.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 830ab2b826159..db6df7dfeb911 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -401,6 +401,20 @@ def test_process_executor_events_ti_requeued(self, mock_stats_incr, mock_task_ca self.scheduler_job.id = 1 self.scheduler_job.processor_agent = mock.MagicMock() + # ti is queued with another try number - do not fail it + ti1.state = State.QUEUED + ti1.queued_by_job_id = 1 + ti1.try_number = 2 + session.merge(ti1) + session.commit() + + executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None + + self.scheduler_job._process_executor_events(session=session) + ti1.refresh_from_db(session=session) + assert ti1.state == State.QUEUED + self.scheduler_job.executor.callback_sink.send.assert_not_called() + # ti is queued by another scheduler - do not fail it ti1.state = State.QUEUED ti1.queued_by_job_id = 2