Skip to content

Commit

Permalink
[AIRFLOW-5640] Document and test email parameters of BaseOperator (#…
Browse files Browse the repository at this point in the history
…6315)

* Refactored get_email_address_list to have a better
  separation between string handling and other iterables.
* Explicitely casting get_email_address_list argument
  to a list in case the argument was an iterable. This
  enables direct support for tuples, sets or the like.
* Fixed type annotation of email parameter of
  BaseOperator to show that iterables are directly
  supported.
* Added docstring entries for email, email_on_retry,
  email_on_failure and queue in BaseOperator.

(cherry picked from commit 9ec562f)
  • Loading branch information
SaturnFromTitan authored and kaxil committed Dec 17, 2019
1 parent e93bd30 commit 12603b2
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 11 deletions.
16 changes: 14 additions & 2 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from abc import ABCMeta, abstractmethod
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, FrozenSet, Iterable, List, Optional, Set, Type
from typing import Any, Callable, Dict, FrozenSet, Iterable, List, Optional, Set, Type, Union


from cached_property import cached_property
Expand Down Expand Up @@ -81,6 +81,16 @@ class derived from this one results in the creation of a task object,
:type task_id: str
:param owner: the owner of the task, using the unix username is recommended
:type owner: str
:param email: the 'to' email address(es) used in email alerts. This can be a
single email or multiple ones. Multiple addresses can be specified as a
comma or semi-colon separated string or by passing a list of strings.
:type email: str or list[str]
:param email_on_retry: Indicates whether email alerts should be sent when a
task is retried
:type email_on_retry: bool
:param email_on_failure: Indicates whether email alerts should be sent when
a task failed
:type email_on_failure: bool
:param retries: the number of retries that should be performed before
failing the task
:type retries: int
Expand Down Expand Up @@ -156,6 +166,8 @@ class derived from this one results in the creation of a task object,
DAGS. Options can be set as string or using the constants defined in
the static class ``airflow.utils.WeightRule``
:type weight_rule: str
:param queue: specifies which task queue to use
:type queue: str
:param pool: the slot pool this task should run in, slot pools are a
way to limit concurrency for certain tasks
:type pool: str
Expand Down Expand Up @@ -274,7 +286,7 @@ def __init__(
self,
task_id, # type: str
owner=conf.get('operators', 'DEFAULT_OWNER'), # type: str
email=None, # type: Optional[str]
email=None, # type: Optional[Union[str, Iterable[str]]]
email_on_retry=True, # type: bool
email_on_failure=True, # type: bool
retries=conf.getint('core', 'default_task_retries', fallback=0), # type: int
Expand Down
32 changes: 23 additions & 9 deletions airflow/utils/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
from __future__ import unicode_literals

from past.builtins import basestring
try:
from collections.abc import Iterable as CollectionIterable
except ImportError:
from collections import Iterable as CollectionIterable

import importlib
import os
Expand All @@ -32,6 +36,7 @@
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.utils import formatdate
from typing import Iterable, List, Union

from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException
Expand Down Expand Up @@ -128,13 +133,22 @@ def send_MIME_email(e_from, e_to, mime_msg, dryrun=False):
s.quit()


def get_email_address_list(address_string):
if isinstance(address_string, basestring):
if ',' in address_string:
address_string = [address.strip() for address in address_string.split(',')]
elif ';' in address_string:
address_string = [address.strip() for address in address_string.split(';')]
else:
address_string = [address_string]
def get_email_address_list(addresses): # type: (Union[str, Iterable[str]]) -> List[str]
if isinstance(addresses, basestring):
return _get_email_list_from_str(addresses)

return address_string
elif isinstance(addresses, CollectionIterable):
if not all(isinstance(item, basestring) for item in addresses):
raise TypeError("The items in your iterable must be strings.")
return list(addresses)

received_type = type(addresses).__name__
raise TypeError("Unexpected argument type: Received '{}'.".format(received_type))


def _get_email_list_from_str(addresses): # type: (str) -> List[str]
delimiters = [",", ";"]
for delimiter in delimiters:
if delimiter in addresses:
return [address.strip() for address in addresses.split(delimiter)]
return [addresses]
24 changes: 24 additions & 0 deletions tests/utils/test_email.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@

class EmailTest(unittest.TestCase):

def test_get_email_address_single_email(self):
emails_string = '[email protected]'

self.assertEqual(
get_email_address_list(emails_string), [emails_string])

def test_get_email_address_comma_sep_string(self):
emails_string = '[email protected], [email protected]'

Expand All @@ -42,3 +48,21 @@ def test_get_email_address_list(self):

self.assertEqual(
get_email_address_list(emails_list), EMAILS)

def test_get_email_address_tuple(self):
emails_tuple = ('[email protected]', '[email protected]')

self.assertEqual(
get_email_address_list(emails_tuple), EMAILS)

def test_get_email_address_invalid_type(self):
emails_string = 1

self.assertRaises(
TypeError, get_email_address_list, emails_string)

def test_get_email_address_invalid_type_in_iterable(self):
emails_list = ['[email protected]', 2]

self.assertRaises(
TypeError, get_email_address_list, emails_list)

0 comments on commit 12603b2

Please sign in to comment.