Skip to content

Commit

Permalink
Adding ability to automatically set DAG to off after X times it faile…
Browse files Browse the repository at this point in the history
…d sequentially (#36935)


---------

Co-authored-by: Elad Kalif <[email protected]>
  • Loading branch information
pateash and eladkal authored Mar 15, 2024
1 parent 83060e1 commit 777a216
Show file tree
Hide file tree
Showing 13 changed files with 245 additions and 1 deletion.
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Meta:
tags = fields.List(fields.Nested(DagTagSchema), dump_only=True)
max_active_tasks = auto_field(dump_only=True)
max_active_runs = auto_field(dump_only=True)
max_consecutive_failed_dag_runs = auto_field(dump_only=True)
has_task_concurrency_limits = auto_field(dump_only=True)
has_import_errors = auto_field(dump_only=True)
next_dagrun = auto_field(dump_only=True)
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
"tags": dag.tags,
"max_active_tasks": dag.max_active_tasks,
"max_active_runs": dag.max_active_runs,
"max_consecutive_failed_dag_runs": dag.max_consecutive_failed_dag_runs,
"has_task_concurrency_limits": any(
t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None for t in dag.tasks
),
Expand Down
11 changes: 11 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ core:
type: string
example: ~
default: "16"
max_consecutive_failed_dag_runs_per_dag:
description: |
(experimental) The maximum number of consecutive DAG failures before DAG is automatically paused.
This is also configurable per DAG level with ``max_consecutive_failed_dag_runs``,
which is defaulted as ``max_consecutive_failed_dag_runs_per_dag``.
If not specified, then the value is considered as 0,
meaning that the dags are never paused out by default.
version_added: 2.9.0
type: string
example: ~
default: "0"
mp_start_method:
description: |
The name of the method used in order to start Python processes via the multiprocessing module.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Adding max_consecutive_failed_dag_runs column to dag_model table
Revision ID: 8e1c784a4fc7
Revises: 1fd565369930
Create Date: 2024-01-18 15:02:24.587206
"""

import sqlalchemy as sa
from alembic import op


# revision identifiers, used by Alembic.
revision = '8e1c784a4fc7'
down_revision = 'ab34f260b71c'
branch_labels = None
depends_on = None
airflow_version = '2.9.0'


def upgrade():
"""Apply Adding max_consecutive_failed_dag_runs column to dag_model table"""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('dag', schema=None) as batch_op:
batch_op.add_column(sa.Column('max_consecutive_failed_dag_runs', sa.Integer()))

# ### end Alembic commands ###


def downgrade():
"""Unapply Adding max_consecutive_failed_dag_runs column to dag_model table"""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('dag', schema=None) as batch_op:
batch_op.drop_column('max_consecutive_failed_dag_runs')

# ### end Alembic commands ###
26 changes: 26 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ class DAG(LoggingMixin):
:param max_active_runs: maximum number of active DAG runs, beyond this
number of DAG runs in a running state, the scheduler won't create
new active DAG runs
:param max_consecutive_failed_dag_runs: (experimental) maximum number of consecutive failed DAG runs,
beyond this the scheduler will disable the DAG
:param dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created.
:param sla_miss_callback: specify a function or list of functions to call when reporting SLA
Expand Down Expand Up @@ -456,6 +458,9 @@ def __init__(
concurrency: int | None = None,
max_active_tasks: int = airflow_conf.getint("core", "max_active_tasks_per_dag"),
max_active_runs: int = airflow_conf.getint("core", "max_active_runs_per_dag"),
max_consecutive_failed_dag_runs: int = airflow_conf.getint(
"core", "max_consecutive_failed_dag_runs_per_dag"
),
dagrun_timeout: timedelta | None = None,
sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None,
default_view: str = airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower(),
Expand Down Expand Up @@ -617,6 +622,16 @@ def __init__(
self.last_loaded: datetime = timezone.utcnow()
self.safe_dag_id = dag_id.replace(".", "__dot__")
self.max_active_runs = max_active_runs
self.max_consecutive_failed_dag_runs = max_consecutive_failed_dag_runs
if self.max_consecutive_failed_dag_runs == 0:
self.max_consecutive_failed_dag_runs = airflow_conf.getint(
"core", "max_consecutive_failed_dag_runs_per_dag"
)
if self.max_consecutive_failed_dag_runs < 0:
raise AirflowException(
f"Invalid max_consecutive_failed_dag_runs: {self.max_consecutive_failed_dag_runs}."
f"Requires max_consecutive_failed_dag_runs >= 0"
)
if self.timetable.active_runs_limit is not None:
if self.timetable.active_runs_limit < self.max_active_runs:
raise AirflowException(
Expand Down Expand Up @@ -3123,6 +3138,7 @@ def bulk_write_to_db(
orm_dag.description = dag.description
orm_dag.max_active_tasks = dag.max_active_tasks
orm_dag.max_active_runs = dag.max_active_runs
orm_dag.max_consecutive_failed_dag_runs = dag.max_consecutive_failed_dag_runs
orm_dag.has_task_concurrency_limits = any(
t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None
for t in dag.tasks
Expand Down Expand Up @@ -3594,6 +3610,7 @@ class DagModel(Base):

max_active_tasks = Column(Integer, nullable=False)
max_active_runs = Column(Integer, nullable=True)
max_consecutive_failed_dag_runs = Column(Integer, nullable=False)

has_task_concurrency_limits = Column(Boolean, nullable=False)
has_import_errors = Column(Boolean(), default=False, server_default="0")
Expand Down Expand Up @@ -3645,6 +3662,11 @@ def __init__(self, concurrency=None, **kwargs):
if self.max_active_runs is None:
self.max_active_runs = airflow_conf.getint("core", "max_active_runs_per_dag")

if self.max_consecutive_failed_dag_runs is None:
self.max_consecutive_failed_dag_runs = airflow_conf.getint(
"core", "max_consecutive_failed_dag_runs_per_dag"
)

if self.has_task_concurrency_limits is None:
# Be safe -- this will be updated later once the DAG is parsed
self.has_task_concurrency_limits = True
Expand Down Expand Up @@ -3942,6 +3964,9 @@ def dag(
concurrency: int | None = None,
max_active_tasks: int = airflow_conf.getint("core", "max_active_tasks_per_dag"),
max_active_runs: int = airflow_conf.getint("core", "max_active_runs_per_dag"),
max_consecutive_failed_dag_runs: int = airflow_conf.getint(
"core", "max_consecutive_failed_dag_runs_per_dag"
),
dagrun_timeout: timedelta | None = None,
sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None,
default_view: str = airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower(),
Expand Down Expand Up @@ -3996,6 +4021,7 @@ def factory(*args, **kwargs):
concurrency=concurrency,
max_active_tasks=max_active_tasks,
max_active_runs=max_active_runs,
max_consecutive_failed_dag_runs=max_consecutive_failed_dag_runs,
dagrun_timeout=dagrun_timeout,
sla_miss_callback=sla_miss_callback,
default_view=default_view,
Expand Down
49 changes: 49 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,45 @@ def fetch_task_instances(
tis = tis.where(TI.task_id.in_(task_ids))
return session.scalars(tis).all()

@internal_api_call
def _check_last_n_dagruns_failed(self, dag_id, max_consecutive_failed_dag_runs, session):
"""Check if last N dags failed."""
dag_runs = (
session.query(DagRun)
.filter(DagRun.dag_id == dag_id)
.order_by(DagRun.execution_date.desc())
.limit(max_consecutive_failed_dag_runs)
.all()
)
""" Marking dag as paused, if needed"""
to_be_paused = len(dag_runs) >= max_consecutive_failed_dag_runs and all(
dag_run.state == DagRunState.FAILED for dag_run in dag_runs
)

if to_be_paused:
from airflow.models.dag import DagModel

self.log.info(
"Pausing DAG %s because last %s DAG runs failed.",
self.dag_id,
max_consecutive_failed_dag_runs,
)
filter_query = [
DagModel.dag_id == self.dag_id,
DagModel.root_dag_id == self.dag_id, # for sub-dags
]
session.execute(
update(DagModel)
.where(or_(*filter_query))
.values(is_paused=True)
.execution_options(synchronize_session="fetch")
)
else:
self.log.debug(
"Limit of consecutive DAG failed dag runs is not reached, DAG %s will not be paused.",
self.dag_id,
)

@provide_session
def get_task_instances(
self,
Expand Down Expand Up @@ -787,6 +826,16 @@ def recalculate(self) -> _UnfinishedStates:
msg="task_failure",
)

# Check if the max_consecutive_failed_dag_runs has been provided and not 0
# and last consecutive failures are more
if dag.max_consecutive_failed_dag_runs > 0:
self.log.debug(
"Checking consecutive failed DAG runs for DAG %s, limit is %s",
self.dag_id,
dag.max_consecutive_failed_dag_runs,
)
self._check_last_n_dagruns_failed(dag.dag_id, dag.max_consecutive_failed_dag_runs, session)

# if all leaves succeeded and no unfinished tasks, the run succeeded
elif not unfinished.tis and all(x.state in State.success_states for x in tis_for_dagrun_state):
self.log.info("Marking run %s successful", self)
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/pydantic/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class DagModelPydantic(BaseModelPydantic):

max_active_tasks: int
max_active_runs: Optional[int]
max_consecutive_failed_dag_runs: Optional[int]

has_task_concurrency_limits: bool
has_import_errors: Optional[bool] = False
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
"_concurrency": { "type" : "number"},
"_max_active_tasks": { "type" : "number"},
"max_active_runs": { "type" : "number"},
"max_consecutive_failed_dag_runs": { "type" : "number"},
"default_args": { "$ref": "#/definitions/dict" },
"start_date": { "$ref": "#/definitions/datetime" },
"end_date": { "$ref": "#/definitions/datetime" },
Expand Down
12 changes: 12 additions & 0 deletions docs/apache-airflow/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -911,3 +911,15 @@ it in three steps:
* pause the DAG
* delete the historical metadata from the database, via UI or API
* delete the DAG file from the ``DAGS_FOLDER`` and wait until it becomes inactive

DAG Auto-pausing (Experimental)
-------------------------------
Dags can be configured to be auto-paused as well.
There is a Airflow configuration which allows for automatically disabling of a dag
if it fails for ``N`` number of times consecutively.

- :ref:`config:core__max_consecutive_failed_dag_runs_per_dag`

we can also provide and override these configuration from DAG argument:

- ``max_consecutive_failed_dag_runs``: Overrides :ref:`config:core__max_consecutive_failed_dag_runs_per_dag`.
7 changes: 6 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
| ``d75389605139`` (head) | ``1fd565369930`` | ``2.9.0`` | Add run_id to (Audit) log table and increase event name |
| ``8e1c784a4fc7`` (head) | ``ab34f260b71c`` | ``2.9.0`` | Adding max_consecutive_failed_dag_runs column to dag_model |
| | | | table |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``ab34f260b71c`` | ``d75389605139`` | ``2.9.0`` | add dataset_expression in DagModel |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``d75389605139`` | ``1fd565369930`` | ``2.9.0`` | Add run_id to (Audit) log table and increase event name |
| | | | length |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``1fd565369930`` | ``88344c1d9134`` | ``2.9.0`` | Add rendered_map_index to TaskInstance. |
Expand Down
Loading

0 comments on commit 777a216

Please sign in to comment.