Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ability to automatically set DAG to off after X times it failed sequentially #36935

Merged
merged 42 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
2bfca29
Add max_failure_runs field to DagModel
pateash Jan 18, 2024
f6d568e
Implement max_failure_runs option for DAGs
pateash Jan 18, 2024
1c5170a
Add method to check last N DAG runs' failure status
pateash Jan 21, 2024
f4a542c
Add 'max_failure_runs' to various airflow files
pateash Jan 21, 2024
73885c3
Refactor 'check_last_N_dagruns_failed' method and add 'max_failure_ru…
pateash Jan 21, 2024
21cda40
Add auto-pausing of DAG on exceeding 'max_failure_runs'
pateash Jan 21, 2024
6bad270
Rename 'max_failure_runs' to 'max_consecutive_failed_dag_runs'
pateash Jan 27, 2024
38f8f57
Refactor method and variable names, enhance log messages in dagrun.py
pateash Jan 27, 2024
2838705
Corrected downgrade method's docstring in migration file
pateash Feb 4, 2024
4652b41
Refactor variable names and update descriptions in Airflow models
pateash Feb 4, 2024
6df361d
Refined logic for pausing DAG upon failures
pateash Feb 5, 2024
67d1022
Add 'max_consecutive_failed_dag_runs' to DAG schema
pateash Feb 5, 2024
dd8e7eb
Add default configuration for `max_consecutive_failed_dag_runs`
pateash Feb 5, 2024
6a8db74
Update comment in `dagrun.py`
pateash Feb 5, 2024
c1a2707
Add tracing to test_dag and new DAG pause test case
pateash Feb 12, 2024
6264ec2
Refactor test_dag and add new test for DAG pause behavior
pateash Feb 13, 2024
ff1d833
Refactor test_dag and add new test for DAG pause behavior
pateash Feb 13, 2024
e84b902
Remove debug code from test_dag.py
pateash Feb 13, 2024
8aaafa3
Add max_consecutive_failed_dag_runs to test data
pateash Feb 13, 2024
a42ac73
Refactor code for readability and standards compliance
pateash Feb 13, 2024
4a338db
Update version in migration script filename and contents
pateash Feb 13, 2024
d41a8d9
tests: fixed
pateash Feb 22, 2024
c43d53a
fixed: pre-commits
pateash Feb 22, 2024
0ada114
fixed: migrations added
pateash Mar 5, 2024
45d444f
fixed: migrations added
pateash Mar 5, 2024
397b09e
fixed: migrations added
pateash Mar 11, 2024
ffceb9b
fixed: migrations added
pateash Mar 11, 2024
93ba3e1
fixes: pr comments
pateash Mar 11, 2024
d975e6f
fixes: pr comments
pateash Mar 11, 2024
8b9992b
Update airflow/config_templates/config.yml
pateash Mar 14, 2024
0fbaef2
fixes: experimental added
pateash Mar 14, 2024
5bdcb9d
fixes: reverting unrelated changes
pateash Mar 14, 2024
aa26787
fixes: pr comments
pateash Mar 14, 2024
c90016a
fixes: pr comments
pateash Mar 14, 2024
2314d4e
fixes: pr comments
pateash Mar 14, 2024
f805612
fixes: pr comments
pateash Mar 14, 2024
53aaffe
fixes: docs added
pateash Mar 14, 2024
5d962c3
fixed: pre-commits
pateash Mar 14, 2024
0a6f687
fixed: pre-commits
pateash Mar 14, 2024
db10bdf
Update airflow/config_templates/config.yml
eladkal Mar 14, 2024
2c0c8ba
Update airflow/models/dag.py
eladkal Mar 14, 2024
bc56ee4
fixed: scalars giving wrong state
pateash Mar 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Meta:
tags = fields.List(fields.Nested(DagTagSchema), dump_only=True)
max_active_tasks = auto_field(dump_only=True)
max_active_runs = auto_field(dump_only=True)
max_consecutive_failed_dag_runs = auto_field(dump_only=True)
pateash marked this conversation as resolved.
Show resolved Hide resolved
has_task_concurrency_limits = auto_field(dump_only=True)
has_import_errors = auto_field(dump_only=True)
next_dagrun = auto_field(dump_only=True)
Expand Down
1 change: 1 addition & 0 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
"tags": dag.tags,
"max_active_tasks": dag.max_active_tasks,
"max_active_runs": dag.max_active_runs,
"max_consecutive_failed_dag_runs": dag.max_consecutive_failed_dag_runs,
"has_task_concurrency_limits": any(
t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None for t in dag.tasks
),
Expand Down
11 changes: 11 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ core:
type: string
example: ~
default: "16"
max_consecutive_failed_dag_runs_per_dag:
description: |
(experimental) The maximum number of consecutive DAG failures before DAG is automatically paused.
This is also configurable per DAG level with ``max_consecutive_failed_dag_runs``,
which is defaulted as ``max_consecutive_failed_dag_runs_per_dag``.
If not specified, then the value is considered as 0,
meaning that the dags are never paused out by default.
version_added: 2.9.0
type: string
example: ~
default: "0"
mp_start_method:
description: |
The name of the method used in order to start Python processes via the multiprocessing module.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Adding max_consecutive_failed_dag_runs column to dag_model table

Revision ID: 8e1c784a4fc7
Revises: 1fd565369930
Create Date: 2024-01-18 15:02:24.587206

"""

import sqlalchemy as sa
from alembic import op


# revision identifiers, used by Alembic.
revision = '8e1c784a4fc7'
down_revision = 'ab34f260b71c'
branch_labels = None
depends_on = None
airflow_version = '2.9.0'


def upgrade():
"""Apply Adding max_consecutive_failed_dag_runs column to dag_model table"""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('dag', schema=None) as batch_op:
batch_op.add_column(sa.Column('max_consecutive_failed_dag_runs', sa.Integer()))

# ### end Alembic commands ###


def downgrade():
"""Unapply Adding max_consecutive_failed_dag_runs column to dag_model table"""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('dag', schema=None) as batch_op:
batch_op.drop_column('max_consecutive_failed_dag_runs')

# ### end Alembic commands ###
26 changes: 26 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ class DAG(LoggingMixin):
:param max_active_runs: maximum number of active DAG runs, beyond this
number of DAG runs in a running state, the scheduler won't create
new active DAG runs
:param max_consecutive_failed_dag_runs: (experimental) maximum number of consecutive failed DAG runs,
beyond this the scheduler will disable the DAG
:param dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created.
:param sla_miss_callback: specify a function or list of functions to call when reporting SLA
Expand Down Expand Up @@ -456,6 +458,9 @@ def __init__(
concurrency: int | None = None,
max_active_tasks: int = airflow_conf.getint("core", "max_active_tasks_per_dag"),
max_active_runs: int = airflow_conf.getint("core", "max_active_runs_per_dag"),
max_consecutive_failed_dag_runs: int = airflow_conf.getint(
"core", "max_consecutive_failed_dag_runs_per_dag"
),
dagrun_timeout: timedelta | None = None,
sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None,
default_view: str = airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower(),
Expand Down Expand Up @@ -617,6 +622,16 @@ def __init__(
self.last_loaded: datetime = timezone.utcnow()
self.safe_dag_id = dag_id.replace(".", "__dot__")
self.max_active_runs = max_active_runs
self.max_consecutive_failed_dag_runs = max_consecutive_failed_dag_runs
if self.max_consecutive_failed_dag_runs == 0:
self.max_consecutive_failed_dag_runs = airflow_conf.getint(
"core", "max_consecutive_failed_dag_runs_per_dag"
)
if self.max_consecutive_failed_dag_runs < 0:
raise AirflowException(
f"Invalid max_consecutive_failed_dag_runs: {self.max_consecutive_failed_dag_runs}."
f"Requires max_consecutive_failed_dag_runs >= 0"
)
if self.timetable.active_runs_limit is not None:
if self.timetable.active_runs_limit < self.max_active_runs:
raise AirflowException(
Expand Down Expand Up @@ -3123,6 +3138,7 @@ def bulk_write_to_db(
orm_dag.description = dag.description
orm_dag.max_active_tasks = dag.max_active_tasks
orm_dag.max_active_runs = dag.max_active_runs
orm_dag.max_consecutive_failed_dag_runs = dag.max_consecutive_failed_dag_runs
orm_dag.has_task_concurrency_limits = any(
t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None
for t in dag.tasks
Expand Down Expand Up @@ -3594,6 +3610,7 @@ class DagModel(Base):

max_active_tasks = Column(Integer, nullable=False)
max_active_runs = Column(Integer, nullable=True)
max_consecutive_failed_dag_runs = Column(Integer, nullable=False)

has_task_concurrency_limits = Column(Boolean, nullable=False)
has_import_errors = Column(Boolean(), default=False, server_default="0")
Expand Down Expand Up @@ -3645,6 +3662,11 @@ def __init__(self, concurrency=None, **kwargs):
if self.max_active_runs is None:
self.max_active_runs = airflow_conf.getint("core", "max_active_runs_per_dag")

if self.max_consecutive_failed_dag_runs is None:
self.max_consecutive_failed_dag_runs = airflow_conf.getint(
"core", "max_consecutive_failed_dag_runs_per_dag"
)

if self.has_task_concurrency_limits is None:
# Be safe -- this will be updated later once the DAG is parsed
self.has_task_concurrency_limits = True
Expand Down Expand Up @@ -3942,6 +3964,9 @@ def dag(
concurrency: int | None = None,
max_active_tasks: int = airflow_conf.getint("core", "max_active_tasks_per_dag"),
max_active_runs: int = airflow_conf.getint("core", "max_active_runs_per_dag"),
max_consecutive_failed_dag_runs: int = airflow_conf.getint(
"core", "max_consecutive_failed_dag_runs_per_dag"
),
dagrun_timeout: timedelta | None = None,
sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None,
default_view: str = airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower(),
Expand Down Expand Up @@ -3996,6 +4021,7 @@ def factory(*args, **kwargs):
concurrency=concurrency,
max_active_tasks=max_active_tasks,
max_active_runs=max_active_runs,
max_consecutive_failed_dag_runs=max_consecutive_failed_dag_runs,
dagrun_timeout=dagrun_timeout,
sla_miss_callback=sla_miss_callback,
default_view=default_view,
Expand Down
49 changes: 49 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,45 @@ def fetch_task_instances(
tis = tis.where(TI.task_id.in_(task_ids))
return session.scalars(tis).all()

@internal_api_call
def _check_last_n_dagruns_failed(self, dag_id, max_consecutive_failed_dag_runs, session):
"""Check if last N dags failed."""
dag_runs = (
session.query(DagRun)
.filter(DagRun.dag_id == dag_id)
.order_by(DagRun.execution_date.desc())
.limit(max_consecutive_failed_dag_runs)
.all()
)
""" Marking dag as paused, if needed"""
pateash marked this conversation as resolved.
Show resolved Hide resolved
to_be_paused = len(dag_runs) >= max_consecutive_failed_dag_runs and all(
dag_run.state == DagRunState.FAILED for dag_run in dag_runs
)

if to_be_paused:
from airflow.models.dag import DagModel

self.log.info(
"Pausing DAG %s because last %s DAG runs failed.",
self.dag_id,
max_consecutive_failed_dag_runs,
)
filter_query = [
DagModel.dag_id == self.dag_id,
DagModel.root_dag_id == self.dag_id, # for sub-dags
]
session.execute(
update(DagModel)
.where(or_(*filter_query))
.values(is_paused=True)
.execution_options(synchronize_session="fetch")
)
else:
self.log.debug(
"Limit of consecutive DAG failed dag runs is not reached, DAG %s will not be paused.",
self.dag_id,
)

@provide_session
def get_task_instances(
self,
Expand Down Expand Up @@ -787,6 +826,16 @@ def recalculate(self) -> _UnfinishedStates:
msg="task_failure",
)

# Check if the max_consecutive_failed_dag_runs has been provided and not 0
# and last consecutive failures are more
if dag.max_consecutive_failed_dag_runs > 0:
self.log.debug(
"Checking consecutive failed DAG runs for DAG %s, limit is %s",
self.dag_id,
dag.max_consecutive_failed_dag_runs,
)
self._check_last_n_dagruns_failed(dag.dag_id, dag.max_consecutive_failed_dag_runs, session)

# if all leaves succeeded and no unfinished tasks, the run succeeded
elif not unfinished.tis and all(x.state in State.success_states for x in tis_for_dagrun_state):
self.log.info("Marking run %s successful", self)
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/pydantic/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class DagModelPydantic(BaseModelPydantic):

max_active_tasks: int
max_active_runs: Optional[int]
max_consecutive_failed_dag_runs: Optional[int]

has_task_concurrency_limits: bool
has_import_errors: Optional[bool] = False
Expand Down
1 change: 1 addition & 0 deletions airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
"_concurrency": { "type" : "number"},
"_max_active_tasks": { "type" : "number"},
"max_active_runs": { "type" : "number"},
"max_consecutive_failed_dag_runs": { "type" : "number"},
"default_args": { "$ref": "#/definitions/dict" },
"start_date": { "$ref": "#/definitions/datetime" },
"end_date": { "$ref": "#/definitions/datetime" },
Expand Down
12 changes: 12 additions & 0 deletions docs/apache-airflow/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -911,3 +911,15 @@ it in three steps:
* pause the DAG
* delete the historical metadata from the database, via UI or API
* delete the DAG file from the ``DAGS_FOLDER`` and wait until it becomes inactive

DAG Auto-pausing (Experimental)
-------------------------------
Dags can be configured to be auto-paused as well.
There is a Airflow configuration which allows for automatically disabling of a dag
if it fails for ``N`` number of times consecutively.

- :ref:`config:core__max_consecutive_failed_dag_runs_per_dag`

we can also provide and override these configuration from DAG argument:

- ``max_consecutive_failed_dag_runs``: Overrides :ref:`config:core__max_consecutive_failed_dag_runs_per_dag`.
7 changes: 6 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ Here's the list of all the Database Migrations that are executed via when you ru
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=================================+===================+===================+==============================================================+
| ``d75389605139`` (head) | ``1fd565369930`` | ``2.9.0`` | Add run_id to (Audit) log table and increase event name |
| ``8e1c784a4fc7`` (head) | ``ab34f260b71c`` | ``2.9.0`` | Adding max_consecutive_failed_dag_runs column to dag_model |
| | | | table |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``ab34f260b71c`` | ``d75389605139`` | ``2.9.0`` | add dataset_expression in DagModel |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``d75389605139`` | ``1fd565369930`` | ``2.9.0`` | Add run_id to (Audit) log table and increase event name |
| | | | length |
+---------------------------------+-------------------+-------------------+--------------------------------------------------------------+
| ``1fd565369930`` | ``88344c1d9134`` | ``2.9.0`` | Add rendered_map_index to TaskInstance. |
Expand Down
Loading