diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 1012a31323df9..188bf2c6153be 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -540,8 +540,10 @@ def manage_slas(self, dag, session=None):
{blocking_task_list}\n{bug}
""".format(task_list=task_list, blocking_task_list=blocking_task_list,
bug=asciiart.bug)
+
+ tasks_missed_sla = [dag.get_task(sla.task_id) for sla in slas]
emails = set()
- for task in dag.tasks:
+ for task in tasks_missed_sla:
if task.email:
if isinstance(task.email, basestring):
emails |= set(get_email_address_list(task.email))
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index ee2ee4a1ab02c..ad0f4b1438076 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -1993,6 +1993,47 @@ def test_scheduler_sla_miss_callback_exception(self):
'Could not call sla_miss_callback for DAG %s',
'test_sla_miss')
+ @mock.patch('airflow.jobs.scheduler_job.send_email')
+ def test_scheduler_only_collect_emails_from_sla_missed_tasks(self, mock_send_email):
+ session = settings.Session()
+
+ test_start_date = days_ago(2)
+ dag = DAG(dag_id='test_sla_miss',
+ default_args={'start_date': test_start_date,
+ 'sla': datetime.timedelta(days=1)})
+
+ email1 = 'test1@test.com'
+ task = DummyOperator(task_id='sla_missed',
+ dag=dag,
+ owner='airflow',
+ email=email1,
+ sla=datetime.timedelta(hours=1))
+
+ session.merge(models.TaskInstance(task=task,
+ execution_date=test_start_date,
+ state='Success'))
+
+ email2 = 'test2@test.com'
+ DummyOperator(task_id='sla_not_missed',
+ dag=dag,
+ owner='airflow',
+ email=email2)
+
+ session.merge(SlaMiss(task_id='sla_missed',
+ dag_id='test_sla_miss',
+ execution_date=test_start_date))
+
+ scheduler = SchedulerJob(dag_id='test_sla_miss',
+ num_runs=1)
+
+ scheduler.manage_slas(dag=dag, session=session)
+
+ self.assertTrue(1, len(mock_send_email.call_args_list))
+
+ send_email_to = mock_send_email.call_args_list[0][0][0]
+ self.assertIn(email1, send_email_to)
+ self.assertNotIn(email2, send_email_to)
+
@mock.patch("airflow.utils.email.send_email")
def test_scheduler_sla_miss_email_exception(self, mock_send_email):
"""