Skip to content

Commit

Permalink
[AIRFLOW-6535] Add AirflowFailException to fail without any retry (#7133
Browse files Browse the repository at this point in the history
)



* use preferred boolean check idiom

Co-Authored-By: Jarek Potiuk <[email protected]>

* add test coverage for AirflowFailException

* add docs for some exception usage patterns

* autoformatting

* remove extraneous newline, poke travis build

* clean up TaskInstance.handle_failure

Try to reduce nesting and repetition of logic for different conditions.
Also try to tighten up the scope of the exception handling ... it looks
like the large block that catches an Exception and logs it as a failure
to send an email may have been swallowing some TypeErrors coming out
of trying to compose a log info message and calling strftime on
start_date and end_date when they're set to None; this is why I've added
lines in the test to set those values on the TaskInstance objects.

* let sphinx generate docs for exceptions module

* keep session kwarg last in handle_failure

* explain allowed_top_level

* add black-box tests for retry/fail immediately cases

* don't lose safety measures in logging date attrs

* fix flake8 too few blank lines

* grammar nitpick

* add import to AirflowFailException example

Co-authored-by: Jarek Potiuk <[email protected]>
  • Loading branch information
jstern and potiuk authored May 16, 2020
1 parent f6d5917 commit 707bb0c
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 60 deletions.
4 changes: 4 additions & 0 deletions airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class AirflowSkipException(AirflowException):
"""Raise when the task should be skipped"""


class AirflowFailException(AirflowException):
"""Raise when the task should be failed without retrying"""


class AirflowDagCycleException(AirflowException):
"""Raise when there is a cycle in Dag definition"""

Expand Down
112 changes: 55 additions & 57 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
from airflow import settings
from airflow.configuration import conf
from airflow.exceptions import (
AirflowException, AirflowRescheduleException, AirflowSkipException, AirflowTaskTimeout,
AirflowException, AirflowFailException, AirflowRescheduleException, AirflowSkipException,
AirflowTaskTimeout,
)
from airflow.models.base import COLLATION_ARGS, ID_LEN, Base
from airflow.models.log import Log
Expand Down Expand Up @@ -1067,6 +1068,10 @@ def signal_handler(signum, frame):
self.refresh_from_db()
self._handle_reschedule(actual_start_date, reschedule_exception, test_mode, context)
return
except AirflowFailException as e:
self.refresh_from_db()
self.handle_failure(e, test_mode, context, force_fail=True)
raise
except AirflowException as e:
self.refresh_from_db()
# for case when task is marked as success/failed externally
Expand Down Expand Up @@ -1177,7 +1182,7 @@ def _handle_reschedule(self, actual_start_date, reschedule_exception, test_mode=
self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE')

@provide_session
def handle_failure(self, error, test_mode=None, context=None, session=None):
def handle_failure(self, error, test_mode=None, context=None, force_fail=False, session=None):
if test_mode is None:
test_mode = self.test_mode
if context is None:
Expand All @@ -1198,64 +1203,51 @@ def handle_failure(self, error, test_mode=None, context=None, session=None):
if context is not None:
context['exception'] = error

# Let's go deeper
try:
# Since this function is called only when the TaskInstance state is running,
# try_number contains the current try_number (not the next). We
# only mark task instance as FAILED if the next task instance
# try_number exceeds the max_tries.
if self.is_eligible_to_retry():
self.state = State.UP_FOR_RETRY
self.log.info('Marking task as UP_FOR_RETRY')
if task.email_on_retry and task.email:
self.email_alert(error)
# Set state correctly and figure out how to log it,
# what callback to call if any, and how to decide whether to email

# Since this function is called only when the TaskInstance state is running,
# try_number contains the current try_number (not the next). We
# only mark task instance as FAILED if the next task instance
# try_number exceeds the max_tries ... or if force_fail is truthy

if force_fail or not self.is_eligible_to_retry():
self.state = State.FAILED
if force_fail:
log_message = "Immediate failure requested. Marking task as FAILED."
else:
self.state = State.FAILED
if task.retries:
self.log.info(
'All retries failed; marking task as FAILED.'
'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
self.dag_id,
self.task_id,
self.execution_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'execution_date') and self.execution_date else '',
self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'start_date') and self.start_date else '',
self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'end_date') and self.end_date else '')
else:
self.log.info(
'Marking task as FAILED.'
'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
self.dag_id,
self.task_id,
self.execution_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'execution_date') and self.execution_date else '',
self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'start_date') and self.start_date else '',
self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'end_date') and self.end_date else '')
if task.email_on_failure and task.email:
self.email_alert(error)
except Exception as e2:
self.log.error('Failed to send email to: %s', task.email)
self.log.exception(e2)
log_message = "Marking task as FAILED."
email_for_state = task.email_on_failure
callback = task.on_failure_callback
else:
self.state = State.UP_FOR_RETRY
log_message = "Marking task as UP_FOR_RETRY."
email_for_state = task.email_on_retry
callback = task.on_retry_callback

self.log.info(
'%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
log_message,
self.dag_id,
self.task_id,
self._safe_date('execution_date', '%Y%m%dT%H%M%S'),
self._safe_date('start_date', '%Y%m%dT%H%M%S'),
self._safe_date('end_date', '%Y%m%dT%H%M%S')
)
if email_for_state and task.email:
try:
self.email_alert(error)
except Exception as e2:
self.log.error('Failed to send email to: %s', task.email)
self.log.exception(e2)

# Handling callbacks pessimistically
try:
if self.state == State.UP_FOR_RETRY and task.on_retry_callback:
task.on_retry_callback(context)
if self.state == State.FAILED and task.on_failure_callback:
task.on_failure_callback(context)
except Exception as e3:
self.log.error("Failed at executing callback")
self.log.exception(e3)
if callback:
try:
callback(context)
except Exception as e3:
self.log.error("Failed at executing callback")
self.log.exception(e3)

if not test_mode:
session.merge(self)
Expand All @@ -1265,6 +1257,12 @@ def is_eligible_to_retry(self):
"""Is task instance is eligible for retry"""
return self.task.retries and self.try_number <= self.max_tries

def _safe_date(self, date_attr, fmt):
result = getattr(self, date_attr, None)
if result is not None:
return result.strftime(fmt)
return ''

@provide_session
def get_template_context(self, session=None) -> Dict[str, Any]:
task = self.task
Expand Down
13 changes: 12 additions & 1 deletion docs/autoapi_templates/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -357,10 +357,21 @@ persisted in the database.

airflow/models/index

.. _pythonapi:exceptions:

Exceptions
----------

.. toctree::
:includehidden:
:glob:
:maxdepth: 1

airflow/exceptions/index

Secrets Backends
----------------
Airflow uses relies on secrets backends to retrieve :class:`~airflow.models.connection.Connection` objects.
Airflow relies on secrets backends to retrieve :class:`~airflow.models.connection.Connection` objects.
All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`.

.. toctree::
Expand Down
45 changes: 45 additions & 0 deletions docs/concepts.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,51 @@ template string:
See `Jinja documentation <https://jinja.palletsprojects.com/en/master/api/#jinja2.Environment>`_
to find all available options.

.. _exceptions:

Exceptions
==========

Airflow defines a number of exceptions; most of these are used internally, but a few
are relevant to authors of custom operators or python callables called from ``PythonOperator``
tasks. Normally any exception raised from an ``execute`` method or python callable will either
cause a task instance to fail if it is not configured to retry or has reached its limit on
retry attempts, or to be marked as "up for retry". A few exceptions can be used when different
behavior is desired:

* ``AirflowSkipException`` can be raised to set the state of the current task instance to "skipped"
* ``AirflowFailException`` can be raised to set the state of the current task to "failed" regardless
of whether there are any retry attempts remaining.

This example illustrates some possibilities

.. code:: python
from airflow.exceptions import AirflowFailException, AirflowSkipException
def fetch_data():
try:
data = get_some_data(get_api_key())
if not data:
# Set state to skipped and do not retry
# Downstream task behavior will be determined by trigger rules
raise AirflowSkipException("No data available.")
except Unauthorized:
# If we retry, our api key will still be bad, so don't waste time retrying!
# Set state to failed and move on
raise AirflowFailException("Our api key is bad!")
except TransientError:
print("Looks like there was a blip.")
# Raise the exception and let the task retry unless max attempts were reached
raise
handle(data)
task = PythonOperator(task_id="fetch_data", python_callable=fetch_data, retries=10)
.. seealso::
- :ref:`List of Airflow exceptions <pythonapi:exceptions>`


Packaged DAGs
'''''''''''''
While often you will specify DAGs in a single ``.py`` file it might sometimes
Expand Down
6 changes: 5 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,13 @@
ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir))

# Generate top-level

# do not exclude these top-level modules from the doc build:
allowed_top_level = ("exceptions.py",)

for path in glob(f"{ROOT_DIR}/airflow/*"):
name = os.path.basename(path)
if os.path.isfile(path):
if os.path.isfile(path) and not path.endswith(allowed_top_level):
exclude_patterns.append(f"_api/airflow/{name.rpartition('.')[0]}")
browsable_packages = ["operators", "hooks", "sensors", "providers", "executors", "models", "secrets"]
if os.path.isdir(path) and name not in browsable_packages:
Expand Down
58 changes: 57 additions & 1 deletion tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from sqlalchemy.orm.session import Session

from airflow import models, settings
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.exceptions import AirflowException, AirflowFailException, AirflowSkipException
from airflow.models import (
DAG, DagRun, Pool, RenderedTaskInstanceFields, TaskFail, TaskInstance as TI, TaskReschedule, Variable,
)
Expand Down Expand Up @@ -1514,6 +1514,62 @@ def test_handle_failure(self):
context_arg_2 = mock_on_retry_2.call_args[0][0]
assert context_arg_2 and "task_instance" in context_arg_2

# test the scenario where normally we would retry but have been asked to fail
mock_on_failure_3 = mock.MagicMock()
mock_on_retry_3 = mock.MagicMock()
task3 = DummyOperator(task_id="test_handle_failure_on_force_fail",
on_failure_callback=mock_on_failure_3,
on_retry_callback=mock_on_retry_3,
retries=1,
dag=dag)
ti3 = TI(task=task3, execution_date=start_date)
ti3.state = State.FAILED
ti3.handle_failure("test force_fail handling", force_fail=True)

context_arg_3 = mock_on_failure_3.call_args[0][0]
assert context_arg_3 and "task_instance" in context_arg_3
mock_on_retry_3.assert_not_called()

def test_does_not_retry_on_airflow_fail_exception(self):
def fail():
raise AirflowFailException("hopeless")

dag = models.DAG(dag_id='test_does_not_retry_on_airflow_fail_exception')
task = PythonOperator(
task_id='test_raise_airflow_fail_exception',
dag=dag,
python_callable=fail,
owner='airflow',
start_date=timezone.datetime(2016, 2, 1, 0, 0, 0),
retries=1
)
ti = TI(task=task, execution_date=timezone.utcnow())
try:
ti.run()
except AirflowFailException:
pass # expected
self.assertEqual(State.FAILED, ti.state)

def test_retries_on_other_exceptions(self):
def fail():
raise AirflowException("maybe this will pass?")

dag = models.DAG(dag_id='test_retries_on_other_exceptions')
task = PythonOperator(
task_id='test_raise_other_exception',
dag=dag,
python_callable=fail,
owner='airflow',
start_date=timezone.datetime(2016, 2, 1, 0, 0, 0),
retries=1
)
ti = TI(task=task, execution_date=timezone.utcnow())
try:
ti.run()
except AirflowException:
pass # expected
self.assertEqual(State.UP_FOR_RETRY, ti.state)

def _env_var_check_callback(self):
self.assertEqual('test_echo_env_variables', os.environ['AIRFLOW_CTX_DAG_ID'])
self.assertEqual('hive_in_python_op', os.environ['AIRFLOW_CTX_TASK_ID'])
Expand Down

0 comments on commit 707bb0c

Please sign in to comment.