Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not fail requeued TIs #23846

Merged
merged 9 commits into from
Jun 28, 2022
Merged
16 changes: 14 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,6 @@ def _process_executor_events(self, session: Session = None) -> int:
buffer_key = ti.key.with_try_number(try_number)
state, info = event_buffer.pop(buffer_key)

# TODO: should we fail RUNNING as well, as we do in Backfills?
if state == TaskInstanceState.QUEUED:
ti.external_executor_id = info
self.log.info("Setting external_id for %s to %s", ti, info)
Expand Down Expand Up @@ -664,7 +663,20 @@ def _process_executor_events(self, session: Session = None) -> int:
ti.pid,
)

if ti.try_number == buffer_key.try_number and ti.state == State.QUEUED:
# There are two scenarios why the same TI with the same try_number is queued
# after executor is finished with it:
# 1) the TI was killed externally and it had no time to mark itself failed
# - in this case we should mark it as failed here.
# 2) the TI has been requeued after getting deferred - in this case either our executor has it
# or the TI is queued by another job. Either ways we should not fail it.

# All of this could also happen if the state is "running",
# but that is handled by the zombie detection.

ti_queued = ti.try_number == buffer_key.try_number and ti.state == TaskInstanceState.QUEUED
ti_requeued = ti.queued_by_job_id != self.id or self.executor.has_task(ti)
ashb marked this conversation as resolved.
Show resolved Hide resolved

if ti_queued and not ti_requeued:
Stats.incr('scheduler.tasks.killed_externally')
msg = (
"Executor reports task instance %s finished (%s) although the "
Expand Down
62 changes: 62 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,68 @@ def test_process_executor_event_missing_dag(self, mock_stats_incr, mock_task_cal
ti1.refresh_from_db()
assert ti1.state == State.FAILED

@mock.patch('airflow.jobs.scheduler_job.TaskCallbackRequest')
@mock.patch('airflow.jobs.scheduler_job.Stats.incr')
def test_process_executor_events_ti_requeued(self, mock_stats_incr, mock_task_callback, dag_maker):
dag_id = "test_process_executor_events_ti_requeued"
task_id_1 = 'dummy_task'

session = settings.Session()
with dag_maker(dag_id=dag_id, fileloc='/test_path1/'):
task1 = EmptyOperator(task_id=task_id_1)
ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)

mock_stats_incr.reset_mock()

executor = MockExecutor(do_update=False)
task_callback = mock.MagicMock()
mock_task_callback.return_value = task_callback
self.scheduler_job = SchedulerJob(executor=executor)
self.scheduler_job.id = 1
self.scheduler_job.processor_agent = mock.MagicMock()

# ti is queued with another try number - do not fail it
ti1.state = State.QUEUED
ti1.queued_by_job_id = 1
ti1.try_number = 2
session.merge(ti1)
session.commit()

executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None

self.scheduler_job._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
self.scheduler_job.executor.callback_sink.send.assert_not_called()
Comment on lines +404 to +416
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebased a this should check try number handling


# ti is queued by another scheduler - do not fail it
ti1.state = State.QUEUED
ti1.queued_by_job_id = 2
session.merge(ti1)
session.commit()

executor.event_buffer[ti1.key] = State.SUCCESS, None

self.scheduler_job._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
self.scheduler_job.executor.callback_sink.send.assert_not_called()

# ti is queued by this scheduler but it is handed back to the executor - do not fail it
ti1.state = State.QUEUED
ti1.queued_by_job_id = 1
session.merge(ti1)
session.commit()

executor.event_buffer[ti1.key] = State.SUCCESS, None
executor.has_task = mock.MagicMock(return_value=True)

self.scheduler_job._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
self.scheduler_job.executor.callback_sink.send.assert_not_called()
mock_stats_incr.assert_not_called()

def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker):
dag_id = 'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute'
task_id_1 = 'dummy_task'
Expand Down