From 707bb0c725fbc32929eea162993aa8fb9854fa9a Mon Sep 17 00:00:00 2001 From: Jonathan Stern Date: Sat, 16 May 2020 12:53:12 -0500 Subject: [PATCH] [AIRFLOW-6535] Add AirflowFailException to fail without any retry (#7133) * use preferred boolean check idiom Co-Authored-By: Jarek Potiuk * add test coverage for AirflowFailException * add docs for some exception usage patterns * autoformatting * remove extraneous newline, poke travis build * clean up TaskInstance.handle_failure Try to reduce nesting and repetition of logic for different conditions. Also try to tighten up the scope of the exception handling ... it looks like the large block that catches an Exception and logs it as a failure to send an email may have been swallowing some TypeErrors coming out of trying to compose a log info message and calling strftime on start_date and end_date when they're set to None; this is why I've added lines in the test to set those values on the TaskInstance objects. * let sphinx generate docs for exceptions module * keep session kwarg last in handle_failure * explain allowed_top_level * add black-box tests for retry/fail immediately cases * don't lose safety measures in logging date attrs * fix flake8 too few blank lines * grammar nitpick * add import to AirflowFailException example Co-authored-by: Jarek Potiuk --- airflow/exceptions.py | 4 ++ airflow/models/taskinstance.py | 112 +++++++++++++++--------------- docs/autoapi_templates/index.rst | 13 +++- docs/concepts.rst | 45 ++++++++++++ docs/conf.py | 6 +- tests/models/test_taskinstance.py | 58 +++++++++++++++- 6 files changed, 178 insertions(+), 60 deletions(-) diff --git a/airflow/exceptions.py b/airflow/exceptions.py index 6634d55190605a..f0559a825407e0 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -79,6 +79,10 @@ class AirflowSkipException(AirflowException): """Raise when the task should be skipped""" +class AirflowFailException(AirflowException): + """Raise when the task should be failed without retrying""" + + class AirflowDagCycleException(AirflowException): """Raise when there is a cycle in Dag definition""" diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 3260580bf48b63..2a897136b3779c 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -41,7 +41,8 @@ from airflow import settings from airflow.configuration import conf from airflow.exceptions import ( - AirflowException, AirflowRescheduleException, AirflowSkipException, AirflowTaskTimeout, + AirflowException, AirflowFailException, AirflowRescheduleException, AirflowSkipException, + AirflowTaskTimeout, ) from airflow.models.base import COLLATION_ARGS, ID_LEN, Base from airflow.models.log import Log @@ -1067,6 +1068,10 @@ def signal_handler(signum, frame): self.refresh_from_db() self._handle_reschedule(actual_start_date, reschedule_exception, test_mode, context) return + except AirflowFailException as e: + self.refresh_from_db() + self.handle_failure(e, test_mode, context, force_fail=True) + raise except AirflowException as e: self.refresh_from_db() # for case when task is marked as success/failed externally @@ -1177,7 +1182,7 @@ def _handle_reschedule(self, actual_start_date, reschedule_exception, test_mode= self.log.info('Rescheduling task, marking task as UP_FOR_RESCHEDULE') @provide_session - def handle_failure(self, error, test_mode=None, context=None, session=None): + def handle_failure(self, error, test_mode=None, context=None, force_fail=False, session=None): if test_mode is None: test_mode = self.test_mode if context is None: @@ -1198,64 +1203,51 @@ def handle_failure(self, error, test_mode=None, context=None, session=None): if context is not None: context['exception'] = error - # Let's go deeper - try: - # Since this function is called only when the TaskInstance state is running, - # try_number contains the current try_number (not the next). We - # only mark task instance as FAILED if the next task instance - # try_number exceeds the max_tries. - if self.is_eligible_to_retry(): - self.state = State.UP_FOR_RETRY - self.log.info('Marking task as UP_FOR_RETRY') - if task.email_on_retry and task.email: - self.email_alert(error) + # Set state correctly and figure out how to log it, + # what callback to call if any, and how to decide whether to email + + # Since this function is called only when the TaskInstance state is running, + # try_number contains the current try_number (not the next). We + # only mark task instance as FAILED if the next task instance + # try_number exceeds the max_tries ... or if force_fail is truthy + + if force_fail or not self.is_eligible_to_retry(): + self.state = State.FAILED + if force_fail: + log_message = "Immediate failure requested. Marking task as FAILED." else: - self.state = State.FAILED - if task.retries: - self.log.info( - 'All retries failed; marking task as FAILED.' - 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s', - self.dag_id, - self.task_id, - self.execution_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'execution_date') and self.execution_date else '', - self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'start_date') and self.start_date else '', - self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'end_date') and self.end_date else '') - else: - self.log.info( - 'Marking task as FAILED.' - 'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s', - self.dag_id, - self.task_id, - self.execution_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'execution_date') and self.execution_date else '', - self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'start_date') and self.start_date else '', - self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr( - self, - 'end_date') and self.end_date else '') - if task.email_on_failure and task.email: - self.email_alert(error) - except Exception as e2: - self.log.error('Failed to send email to: %s', task.email) - self.log.exception(e2) + log_message = "Marking task as FAILED." + email_for_state = task.email_on_failure + callback = task.on_failure_callback + else: + self.state = State.UP_FOR_RETRY + log_message = "Marking task as UP_FOR_RETRY." + email_for_state = task.email_on_retry + callback = task.on_retry_callback + + self.log.info( + '%s dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s', + log_message, + self.dag_id, + self.task_id, + self._safe_date('execution_date', '%Y%m%dT%H%M%S'), + self._safe_date('start_date', '%Y%m%dT%H%M%S'), + self._safe_date('end_date', '%Y%m%dT%H%M%S') + ) + if email_for_state and task.email: + try: + self.email_alert(error) + except Exception as e2: + self.log.error('Failed to send email to: %s', task.email) + self.log.exception(e2) # Handling callbacks pessimistically - try: - if self.state == State.UP_FOR_RETRY and task.on_retry_callback: - task.on_retry_callback(context) - if self.state == State.FAILED and task.on_failure_callback: - task.on_failure_callback(context) - except Exception as e3: - self.log.error("Failed at executing callback") - self.log.exception(e3) + if callback: + try: + callback(context) + except Exception as e3: + self.log.error("Failed at executing callback") + self.log.exception(e3) if not test_mode: session.merge(self) @@ -1265,6 +1257,12 @@ def is_eligible_to_retry(self): """Is task instance is eligible for retry""" return self.task.retries and self.try_number <= self.max_tries + def _safe_date(self, date_attr, fmt): + result = getattr(self, date_attr, None) + if result is not None: + return result.strftime(fmt) + return '' + @provide_session def get_template_context(self, session=None) -> Dict[str, Any]: task = self.task diff --git a/docs/autoapi_templates/index.rst b/docs/autoapi_templates/index.rst index d3513b7133e3af..ed1a91048c53fe 100644 --- a/docs/autoapi_templates/index.rst +++ b/docs/autoapi_templates/index.rst @@ -357,10 +357,21 @@ persisted in the database. airflow/models/index +.. _pythonapi:exceptions: + +Exceptions +---------- + +.. toctree:: + :includehidden: + :glob: + :maxdepth: 1 + + airflow/exceptions/index Secrets Backends ---------------- -Airflow uses relies on secrets backends to retrieve :class:`~airflow.models.connection.Connection` objects. +Airflow relies on secrets backends to retrieve :class:`~airflow.models.connection.Connection` objects. All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`. .. toctree:: diff --git a/docs/concepts.rst b/docs/concepts.rst index 60ceec66100e70..c8d5942a9e3fa7 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -1216,6 +1216,51 @@ template string: See `Jinja documentation `_ to find all available options. +.. _exceptions: + +Exceptions +========== + +Airflow defines a number of exceptions; most of these are used internally, but a few +are relevant to authors of custom operators or python callables called from ``PythonOperator`` +tasks. Normally any exception raised from an ``execute`` method or python callable will either +cause a task instance to fail if it is not configured to retry or has reached its limit on +retry attempts, or to be marked as "up for retry". A few exceptions can be used when different +behavior is desired: + +* ``AirflowSkipException`` can be raised to set the state of the current task instance to "skipped" +* ``AirflowFailException`` can be raised to set the state of the current task to "failed" regardless + of whether there are any retry attempts remaining. + +This example illustrates some possibilities + +.. code:: python + + from airflow.exceptions import AirflowFailException, AirflowSkipException + + def fetch_data(): + try: + data = get_some_data(get_api_key()) + if not data: + # Set state to skipped and do not retry + # Downstream task behavior will be determined by trigger rules + raise AirflowSkipException("No data available.") + except Unauthorized: + # If we retry, our api key will still be bad, so don't waste time retrying! + # Set state to failed and move on + raise AirflowFailException("Our api key is bad!") + except TransientError: + print("Looks like there was a blip.") + # Raise the exception and let the task retry unless max attempts were reached + raise + handle(data) + + task = PythonOperator(task_id="fetch_data", python_callable=fetch_data, retries=10) + +.. seealso:: + - :ref:`List of Airflow exceptions ` + + Packaged DAGs ''''''''''''' While often you will specify DAGs in a single ``.py`` file it might sometimes diff --git a/docs/conf.py b/docs/conf.py index d69fd9185aa52c..95d2f5ec26d94d 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -211,9 +211,13 @@ ROOT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir)) # Generate top-level + +# do not exclude these top-level modules from the doc build: +allowed_top_level = ("exceptions.py",) + for path in glob(f"{ROOT_DIR}/airflow/*"): name = os.path.basename(path) - if os.path.isfile(path): + if os.path.isfile(path) and not path.endswith(allowed_top_level): exclude_patterns.append(f"_api/airflow/{name.rpartition('.')[0]}") browsable_packages = ["operators", "hooks", "sensors", "providers", "executors", "models", "secrets"] if os.path.isdir(path) and name not in browsable_packages: diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 631769eb28ca64..52feaa5be54994 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -31,7 +31,7 @@ from sqlalchemy.orm.session import Session from airflow import models, settings -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import AirflowException, AirflowFailException, AirflowSkipException from airflow.models import ( DAG, DagRun, Pool, RenderedTaskInstanceFields, TaskFail, TaskInstance as TI, TaskReschedule, Variable, ) @@ -1514,6 +1514,62 @@ def test_handle_failure(self): context_arg_2 = mock_on_retry_2.call_args[0][0] assert context_arg_2 and "task_instance" in context_arg_2 + # test the scenario where normally we would retry but have been asked to fail + mock_on_failure_3 = mock.MagicMock() + mock_on_retry_3 = mock.MagicMock() + task3 = DummyOperator(task_id="test_handle_failure_on_force_fail", + on_failure_callback=mock_on_failure_3, + on_retry_callback=mock_on_retry_3, + retries=1, + dag=dag) + ti3 = TI(task=task3, execution_date=start_date) + ti3.state = State.FAILED + ti3.handle_failure("test force_fail handling", force_fail=True) + + context_arg_3 = mock_on_failure_3.call_args[0][0] + assert context_arg_3 and "task_instance" in context_arg_3 + mock_on_retry_3.assert_not_called() + + def test_does_not_retry_on_airflow_fail_exception(self): + def fail(): + raise AirflowFailException("hopeless") + + dag = models.DAG(dag_id='test_does_not_retry_on_airflow_fail_exception') + task = PythonOperator( + task_id='test_raise_airflow_fail_exception', + dag=dag, + python_callable=fail, + owner='airflow', + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0), + retries=1 + ) + ti = TI(task=task, execution_date=timezone.utcnow()) + try: + ti.run() + except AirflowFailException: + pass # expected + self.assertEqual(State.FAILED, ti.state) + + def test_retries_on_other_exceptions(self): + def fail(): + raise AirflowException("maybe this will pass?") + + dag = models.DAG(dag_id='test_retries_on_other_exceptions') + task = PythonOperator( + task_id='test_raise_other_exception', + dag=dag, + python_callable=fail, + owner='airflow', + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0), + retries=1 + ) + ti = TI(task=task, execution_date=timezone.utcnow()) + try: + ti.run() + except AirflowException: + pass # expected + self.assertEqual(State.UP_FOR_RETRY, ti.state) + def _env_var_check_callback(self): self.assertEqual('test_echo_env_variables', os.environ['AIRFLOW_CTX_DAG_ID']) self.assertEqual('hive_in_python_op', os.environ['AIRFLOW_CTX_TASK_ID'])