Skip to content

Commit

Permalink
[AIRFLOW-5714] Collect SLA miss emails only from tasks missed SLA (#6384
Browse files Browse the repository at this point in the history
)

Currently when a task in the DAG missed the SLA,
Airflow would traverse through all the tasks in the DAG
and collect all the task-level emails. Then Airflow would
send an SLA miss email to all those collected emails,
which can add unnecessary noise to task owners that
does not contribute to the SLA miss.

Thus, changing the code to only collect emails
from the tasks that missed the SLA.

(cherry picked from commit bc53412)
  • Loading branch information
milton0825 authored and kaxil committed Dec 17, 2019
1 parent 2379061 commit 5ebc27d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
4 changes: 3 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,10 @@ def manage_slas(self, dag, session=None):
<pre><code>{blocking_task_list}\n{bug}<code></pre>
""".format(task_list=task_list, blocking_task_list=blocking_task_list,
bug=asciiart.bug)

tasks_missed_sla = [dag.get_task(sla.task_id) for sla in slas]
emails = set()
for task in dag.tasks:
for task in tasks_missed_sla:
if task.email:
if isinstance(task.email, basestring):
emails |= set(get_email_address_list(task.email))
Expand Down
41 changes: 41 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1993,6 +1993,47 @@ def test_scheduler_sla_miss_callback_exception(self):
'Could not call sla_miss_callback for DAG %s',
'test_sla_miss')

@mock.patch('airflow.jobs.scheduler_job.send_email')
def test_scheduler_only_collect_emails_from_sla_missed_tasks(self, mock_send_email):
session = settings.Session()

test_start_date = days_ago(2)
dag = DAG(dag_id='test_sla_miss',
default_args={'start_date': test_start_date,
'sla': datetime.timedelta(days=1)})

email1 = '[email protected]'
task = DummyOperator(task_id='sla_missed',
dag=dag,
owner='airflow',
email=email1,
sla=datetime.timedelta(hours=1))

session.merge(models.TaskInstance(task=task,
execution_date=test_start_date,
state='Success'))

email2 = '[email protected]'
DummyOperator(task_id='sla_not_missed',
dag=dag,
owner='airflow',
email=email2)

session.merge(SlaMiss(task_id='sla_missed',
dag_id='test_sla_miss',
execution_date=test_start_date))

scheduler = SchedulerJob(dag_id='test_sla_miss',
num_runs=1)

scheduler.manage_slas(dag=dag, session=session)

self.assertTrue(1, len(mock_send_email.call_args_list))

send_email_to = mock_send_email.call_args_list[0][0][0]
self.assertIn(email1, send_email_to)
self.assertNotIn(email2, send_email_to)

@mock.patch("airflow.utils.email.send_email")
def test_scheduler_sla_miss_email_exception(self, mock_send_email):
"""
Expand Down

0 comments on commit 5ebc27d

Please sign in to comment.