Skip to content

Commit

Permalink
Persist start/end date and duration for DummyOperator Task Instance (a…
Browse files Browse the repository at this point in the history
…pache#8663)

Otherwise the behaviour in UI is incorrect
Addressing issue apache#8662

(cherry picked from commit d92e848)
  • Loading branch information
XD-DENG authored and Chris Fei committed Mar 5, 2021
1 parent 336bf2b commit 10d75d8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 0 deletions.
2 changes: 2 additions & 0 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1653,6 +1653,8 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None):
if isinstance(ti.task, DummyOperator) \
and not ti.task.on_success_callback:
ti.state = State.SUCCESS
ti.start_date = ti.end_date = timezone.utcnow()
ti.duration = 0

# Also save this task instance to the DB.
self.log.info("Creating / updating %s in ORM", ti)
Expand Down
20 changes: 20 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2895,6 +2895,16 @@ def test_should_mark_dummy_task_as_success(self):
('test_task_on_execute', 'scheduled'),
('test_task_on_success', 'scheduled'),
}, {(ti.task_id, ti.state) for ti in tis})
for state, start_date, end_date, duration in [(ti.state, ti.start_date, ti.end_date, ti.duration) for
ti in tis]:
if state == 'success':
self.assertIsNotNone(start_date)
self.assertIsNotNone(end_date)
self.assertEqual(0.0, duration)
else:
self.assertIsNone(start_date)
self.assertIsNone(end_date)
self.assertIsNone(duration)

scheduler_job.process_file(file_path=dag_file, zombies=[])
with create_session() as session:
Expand All @@ -2908,3 +2918,13 @@ def test_should_mark_dummy_task_as_success(self):
('test_task_on_execute', 'scheduled'),
('test_task_on_success', 'scheduled'),
}, {(ti.task_id, ti.state) for ti in tis})
for state, start_date, end_date, duration in [(ti.state, ti.start_date, ti.end_date, ti.duration) for
ti in tis]:
if state == 'success':
self.assertIsNotNone(start_date)
self.assertIsNotNone(end_date)
self.assertEqual(0.0, duration)
else:
self.assertIsNone(start_date)
self.assertIsNone(end_date)
self.assertIsNone(duration)

0 comments on commit 10d75d8

Please sign in to comment.