Skip to content

Commit

Permalink
Rename 'max_failure_runs' to 'max_consecutive_failed_dag_runs'
Browse files Browse the repository at this point in the history
The 'max_failure_runs' parameter across multiple files was renamed to 'max_consecutive_failed_dag_runs' to better represent its functionality. This change improves code readability, and reflects that this parameter is used to limit the number of consecutive failed DAG runs, after which the DAG will be disabled.
  • Loading branch information
pateash committed Jan 27, 2024
1 parent 48585b9 commit d9d29c2
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 28 deletions.
2 changes: 1 addition & 1 deletion airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class Meta:
tags = fields.List(fields.Nested(DagTagSchema), dump_only=True)
max_active_tasks = auto_field(dump_only=True)
max_active_runs = auto_field(dump_only=True)
max_failure_runs = auto_field(dump_only=True)
max_consecutive_failed_dag_runs = auto_field(dump_only=True)
has_task_concurrency_limits = auto_field(dump_only=True)
has_import_errors = auto_field(dump_only=True)
next_dagrun = auto_field(dump_only=True)
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def _get_dagbag_dag_details(dag: DAG) -> dict:
"tags": dag.tags,
"max_active_tasks": dag.max_active_tasks,
"max_active_runs": dag.max_active_runs,
"max_failure_runs": dag.max_failure_runs,
"max_consecutive_failed_dag_runs": dag.max_consecutive_failed_dag_runs,
"has_task_concurrency_limits": any(
t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None for t in dag.tasks
),
Expand Down
6 changes: 3 additions & 3 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,11 @@ core:
type: string
example: ~
default: "16"
max_failure_runs_per_dag:
max_consecutive_failed_dag_runs_per_dag:
description: |
The maximum number of consecutive DAG failures before DAG is turned off. The scheduler will disable the DAG

Check failure on line 121 in airflow/config_templates/config.yml

View workflow job for this annotation

GitHub Actions / Static checks

121:111 [line-length] line too long (115 > 110 characters)
if it reaches the limit. This is configurable at the DAG level with ``max_failure_runs``,
which is defaulted as ``max_failure_runs_per_dag``.
if it reaches the limit. This is configurable at the DAG level with ``max_consecutive_failed_dag_runs``,

Check failure on line 122 in airflow/config_templates/config.yml

View workflow job for this annotation

GitHub Actions / Static checks

122:111 [line-length] line too long (112 > 110 characters)
which is defaulted as ``max_consecutive_failed_dag_runs_per_dag``.
version_added: ~
type: string
example: ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.

"""Adding Adding max_failure_runs column to dag_model table
"""Adding Adding max_consecutive_failed_dag_runs column to dag_model table
Revision ID: 8e1c784a4fc7
Revises: 88344c1d9134
Expand All @@ -36,18 +36,18 @@


def upgrade():
"""Apply Adding Adding max_failure_runs column to dag_model table"""
"""Apply Adding max_consecutive_failed_dag_runs column to dag_model table"""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('dag', schema=None) as batch_op:
batch_op.add_column(sa.Column('max_failure_runs', sa.Integer(), nullable=True))
batch_op.add_column(sa.Column('max_consecutive_failed_dag_runs', sa.Integer(), nullable=True))

# ### end Alembic commands ###


def downgrade():
"""Unapply Adding Adding max_failure_runs column to dag_model table"""
"""Unapply Adding Adding max_consecutive_failed_dag_runs column to dag_model table"""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('dag', schema=None) as batch_op:
batch_op.drop_column('max_failure_runs')
batch_op.drop_column('max_consecutive_failed_dag_runs')

# ### end Alembic commands ###
24 changes: 12 additions & 12 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ class DAG(LoggingMixin):
:param max_active_runs: maximum number of active DAG runs, beyond this
number of DAG runs in a running state, the scheduler won't create
new active DAG runs
:param max_failure_runs: maximum number of consecutive failed DAG runs, beyond this
:param max_consecutive_failed_dag_runs: maximum number of consecutive failed DAG runs, beyond this
the scheduler will disable the DAG
:param dagrun_timeout: specify how long a DagRun should be up before
timing out / failing, so that new DagRuns can be created.
Expand Down Expand Up @@ -454,7 +454,7 @@ def __init__(
concurrency: int | None = None,
max_active_tasks: int = airflow_conf.getint("core", "max_active_tasks_per_dag"),
max_active_runs: int = airflow_conf.getint("core", "max_active_runs_per_dag"),
max_failure_runs: int = airflow_conf.getint("core", "max_failure_runs_per_dag"),
max_consecutive_failed_dag_runs: int = airflow_conf.getint("core", "max_consecutive_failed_dag_runs_per_dag"),
dagrun_timeout: timedelta | None = None,
sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None,
default_view: str = airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower(),
Expand Down Expand Up @@ -613,17 +613,17 @@ def __init__(
self.last_loaded: datetime = timezone.utcnow()
self.safe_dag_id = dag_id.replace(".", "__dot__")
self.max_active_runs = max_active_runs
self.max_failure_runs = max_failure_runs
self.max_consecutive_failed_dag_runs = max_consecutive_failed_dag_runs
if self.timetable.active_runs_limit is not None:
if self.timetable.active_runs_limit < self.max_active_runs:
raise AirflowException(
f"Invalid max_active_runs: {type(self.timetable)} "
f"requires max_active_runs <= {self.timetable.active_runs_limit}"
)
if self.max_failure_runs is not None and self.max_failure_runs < 0:
if self.max_consecutive_failed_dag_runs is not None and self.max_consecutive_failed_dag_runs < 0:
raise AirflowException(
f"Invalid max_failure_runs: {str(self.max_failure_runs)}"
f"requires max_failure_runs >= 0"
f"Invalid max_consecutive_failed_dag_runs: {str(self.max_consecutive_failed_dag_runs)}"
f"requires max_consecutive_failed_dag_runs >= 0"
)
self.dagrun_timeout = dagrun_timeout
self.sla_miss_callback = sla_miss_callback
Expand Down Expand Up @@ -3103,7 +3103,7 @@ def bulk_write_to_db(
orm_dag.description = dag.description
orm_dag.max_active_tasks = dag.max_active_tasks
orm_dag.max_active_runs = dag.max_active_runs
orm_dag.max_failure_runs = dag.max_failure_runs
orm_dag.max_consecutive_failed_dag_runs = dag.max_consecutive_failed_dag_runs
orm_dag.has_task_concurrency_limits = any(
t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None
for t in dag.tasks
Expand Down Expand Up @@ -3568,7 +3568,7 @@ class DagModel(Base):

max_active_tasks = Column(Integer, nullable=False)
max_active_runs = Column(Integer, nullable=True)
max_failure_runs = Column(Integer, nullable=True)
max_consecutive_failed_dag_runs = Column(Integer, nullable=True)

has_task_concurrency_limits = Column(Boolean, nullable=False)
has_import_errors = Column(Boolean(), default=False, server_default="0")
Expand Down Expand Up @@ -3620,8 +3620,8 @@ def __init__(self, concurrency=None, **kwargs):
if self.max_active_runs is None:
self.max_active_runs = airflow_conf.getint("core", "max_active_runs_per_dag")

if self.max_failure_runs is None:
self.max_failure_runs = airflow_conf.getint("core", "max_failure_runs_per_dag")
if self.max_consecutive_failed_dag_runs is None:
self.max_consecutive_failed_dag_runs = airflow_conf.getint("core", "max_consecutive_failed_dag_runs_per_dag")


if self.has_task_concurrency_limits is None:
Expand Down Expand Up @@ -3901,7 +3901,7 @@ def dag(
concurrency: int | None = None,
max_active_tasks: int = airflow_conf.getint("core", "max_active_tasks_per_dag"),
max_active_runs: int = airflow_conf.getint("core", "max_active_runs_per_dag"),
max_failure_runs: int = airflow_conf.getint("core", "max_failure_runs_per_dag"),
max_consecutive_failed_dag_runs: int = airflow_conf.getint("core", "max_consecutive_failed_dag_runs_per_dag"),
dagrun_timeout: timedelta | None = None,
sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None,
default_view: str = airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower(),
Expand Down Expand Up @@ -3956,7 +3956,7 @@ def factory(*args, **kwargs):
concurrency=concurrency,
max_active_tasks=max_active_tasks,
max_active_runs=max_active_runs,
max_failure_runs=max_failure_runs,
max_consecutive_failed_dag_runs=max_consecutive_failed_dag_runs,
dagrun_timeout=dagrun_timeout,
sla_miss_callback=sla_miss_callback,
default_view=default_view,
Expand Down
8 changes: 4 additions & 4 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,12 +755,12 @@ def recalculate(self) -> _UnfinishedStates:
msg="task_failure",
)

# checking if the max_failure_runs has been provided and last consecutivate failures are more
# checking if the max_consecutive_failed_dag_runs has been provided and last consecutivate failures are more
# than this number if so we have to mark this dag as off
if bool(dag.max_failure_runs):
if bool(dag.max_consecutive_failed_dag_runs):
self.log.info("Checking consecutive failed dags for %s, limit is %s", self.dag_id,
dag.max_failure_runs)
self._check_last_N_dagruns_failed(dag.dag_id, dag.max_failure_runs, session)
dag.max_consecutive_failed_dag_runs)
self._check_last_N_dagruns_failed(dag.dag_id, dag.max_consecutive_failed_dag_runs, session)

# if all leaves succeeded and no unfinished tasks, the run succeeded
elif not unfinished.tis and all(x.state in State.success_states for x in tis_for_dagrun_state):
Expand Down
2 changes: 1 addition & 1 deletion airflow/serialization/pydantic/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class DagModelPydantic(BaseModelPydantic):

max_active_tasks: int
max_active_runs: Optional[int]
max_failure_runs: Optional[int]
max_consecutive_failed_dag_runs: Optional[int]

has_task_concurrency_limits: bool
has_import_errors: Optional[bool] = False
Expand Down
2 changes: 1 addition & 1 deletion airflow/serialization/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
"_concurrency": { "type" : "number"},
"_max_active_tasks": { "type" : "number"},
"max_active_runs": { "type" : "number"},
"max_failure_runs": { "type" : "number"},
"max_consecutive_failed_dag_runs": { "type" : "number"},
"default_args": { "$ref": "#/definitions/dict" },
"start_date": { "$ref": "#/definitions/datetime" },
"end_date": { "$ref": "#/definitions/datetime" },
Expand Down

0 comments on commit d9d29c2

Please sign in to comment.