From 5ebc27d17c639241e86c7b0eeaf51585a286cc12 Mon Sep 17 00:00:00 2001 From: Chao-Han Tsai Date: Tue, 22 Oct 2019 11:37:22 -0700 Subject: [PATCH] [AIRFLOW-5714] Collect SLA miss emails only from tasks missed SLA (#6384) Currently when a task in the DAG missed the SLA, Airflow would traverse through all the tasks in the DAG and collect all the task-level emails. Then Airflow would send an SLA miss email to all those collected emails, which can add unnecessary noise to task owners that does not contribute to the SLA miss. Thus, changing the code to only collect emails from the tasks that missed the SLA. (cherry picked from commit bc5341223429efe777af7b492ad09b96c7c77b17) --- airflow/jobs/scheduler_job.py | 4 +++- tests/jobs/test_scheduler_job.py | 41 ++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 1012a31323df95..188bf2c6153be9 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -540,8 +540,10 @@ def manage_slas(self, dag, session=None):
{blocking_task_list}\n{bug}
""".format(task_list=task_list, blocking_task_list=blocking_task_list, bug=asciiart.bug) + + tasks_missed_sla = [dag.get_task(sla.task_id) for sla in slas] emails = set() - for task in dag.tasks: + for task in tasks_missed_sla: if task.email: if isinstance(task.email, basestring): emails |= set(get_email_address_list(task.email)) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index ee2ee4a1ab02c2..ad0f4b14380764 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1993,6 +1993,47 @@ def test_scheduler_sla_miss_callback_exception(self): 'Could not call sla_miss_callback for DAG %s', 'test_sla_miss') + @mock.patch('airflow.jobs.scheduler_job.send_email') + def test_scheduler_only_collect_emails_from_sla_missed_tasks(self, mock_send_email): + session = settings.Session() + + test_start_date = days_ago(2) + dag = DAG(dag_id='test_sla_miss', + default_args={'start_date': test_start_date, + 'sla': datetime.timedelta(days=1)}) + + email1 = 'test1@test.com' + task = DummyOperator(task_id='sla_missed', + dag=dag, + owner='airflow', + email=email1, + sla=datetime.timedelta(hours=1)) + + session.merge(models.TaskInstance(task=task, + execution_date=test_start_date, + state='Success')) + + email2 = 'test2@test.com' + DummyOperator(task_id='sla_not_missed', + dag=dag, + owner='airflow', + email=email2) + + session.merge(SlaMiss(task_id='sla_missed', + dag_id='test_sla_miss', + execution_date=test_start_date)) + + scheduler = SchedulerJob(dag_id='test_sla_miss', + num_runs=1) + + scheduler.manage_slas(dag=dag, session=session) + + self.assertTrue(1, len(mock_send_email.call_args_list)) + + send_email_to = mock_send_email.call_args_list[0][0][0] + self.assertIn(email1, send_email_to) + self.assertNotIn(email2, send_email_to) + @mock.patch("airflow.utils.email.send_email") def test_scheduler_sla_miss_email_exception(self, mock_send_email): """