Skip to content

Commit

Permalink
Do not fail requeued TIs (#23846)
Browse files Browse the repository at this point in the history
(cherry picked from commit 66ffe39)
  • Loading branch information
tanelk authored and ephraimbuddy committed Jun 29, 2022
1 parent 0aae6bb commit 96a2e79
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
16 changes: 14 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,6 @@ def _process_executor_events(self, session: Session = None) -> int:
buffer_key = ti.key.with_try_number(try_number)
state, info = event_buffer.pop(buffer_key)

# TODO: should we fail RUNNING as well, as we do in Backfills?
if state == TaskInstanceState.QUEUED:
ti.external_executor_id = info
self.log.info("Setting external_id for %s to %s", ti, info)
Expand Down Expand Up @@ -663,7 +662,20 @@ def _process_executor_events(self, session: Session = None) -> int:
ti.pid,
)

if ti.try_number == buffer_key.try_number and ti.state == State.QUEUED:
# There are two scenarios why the same TI with the same try_number is queued
# after executor is finished with it:
# 1) the TI was killed externally and it had no time to mark itself failed
# - in this case we should mark it as failed here.
# 2) the TI has been requeued after getting deferred - in this case either our executor has it
# or the TI is queued by another job. Either ways we should not fail it.

# All of this could also happen if the state is "running",
# but that is handled by the zombie detection.

ti_queued = ti.try_number == buffer_key.try_number and ti.state == TaskInstanceState.QUEUED
ti_requeued = ti.queued_by_job_id != self.id or self.executor.has_task(ti)

if ti_queued and not ti_requeued:
Stats.incr('scheduler.tasks.killed_externally')
msg = (
"Executor reports task instance %s finished (%s) although the "
Expand Down
62 changes: 62 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,68 @@ def test_process_executor_event_missing_dag(self, mock_stats_incr, mock_task_cal
ti1.refresh_from_db()
assert ti1.state == State.FAILED

@mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest')
@mock.patch('airflow.jobs.scheduler_job.Stats.incr')
def test_process_executor_events_ti_requeued(self, mock_stats_incr, mock_task_callback, dag_maker):
dag_id = "test_process_executor_events_ti_requeued"
task_id_1 = 'dummy_task'

session = settings.Session()
with dag_maker(dag_id=dag_id, fileloc='/test_path1/'):
task1 = EmptyOperator(task_id=task_id_1)
ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)

mock_stats_incr.reset_mock()

executor = MockExecutor(do_update=False)
task_callback = mock.MagicMock()
mock_task_callback.return_value = task_callback
self.scheduler_job = SchedulerJob(executor=executor)
self.scheduler_job.id = 1
self.scheduler_job.processor_agent = mock.MagicMock()

# ti is queued with another try number - do not fail it
ti1.state = State.QUEUED
ti1.queued_by_job_id = 1
ti1.try_number = 2
session.merge(ti1)
session.commit()

executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None

self.scheduler_job._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
self.scheduler_job.executor.callback_sink.send.assert_not_called()

# ti is queued by another scheduler - do not fail it
ti1.state = State.QUEUED
ti1.queued_by_job_id = 2
session.merge(ti1)
session.commit()

executor.event_buffer[ti1.key] = State.SUCCESS, None

self.scheduler_job._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
self.scheduler_job.executor.callback_sink.send.assert_not_called()

# ti is queued by this scheduler but it is handed back to the executor - do not fail it
ti1.state = State.QUEUED
ti1.queued_by_job_id = 1
session.merge(ti1)
session.commit()

executor.event_buffer[ti1.key] = State.SUCCESS, None
executor.has_task = mock.MagicMock(return_value=True)

self.scheduler_job._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
self.scheduler_job.executor.callback_sink.send.assert_not_called()
mock_stats_incr.assert_not_called()

def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker):
dag_id = 'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
task_id_1 = 'dummy_task'
Expand Down

0 comments on commit 96a2e79

Please sign in to comment.