Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify RTIF.delete_old_records() #26667

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 27 additions & 41 deletions airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING

import sqlalchemy_jsonfield
from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, and_, not_, text, tuple_
from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, text
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import Session, relationship

Expand All @@ -32,6 +33,10 @@
from airflow.settings import json
from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import tuple_not_in_condition

if TYPE_CHECKING:
from sqlalchemy.sql import FromClause


class RenderedTaskInstanceFields(Base):
Expand Down Expand Up @@ -183,9 +188,9 @@ def delete_old_records(
cls,
task_id: str,
dag_id: str,
num_to_keep=conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0),
session: Session = None,
):
num_to_keep: int = conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0),
session: Session = NEW_SESSION,
) -> None:
"""
Keep only Last X (num_to_keep) number of records for a task by deleting others.

Expand All @@ -211,49 +216,30 @@ def delete_old_records(
.limit(num_to_keep)
)

if session.bind.dialect.name in ["postgresql", "sqlite"]:
# Fetch Top X records given dag_id & task_id ordered by Execution Date
subq1 = tis_to_keep_query.subquery()
excluded = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id)
session.query(cls).filter(
cls.dag_id == dag_id,
cls.task_id == task_id,
tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(excluded),
).delete(synchronize_session=False)
elif session.bind.dialect.name in ["mysql"]:
cls._remove_old_rendered_ti_fields_mysql(dag_id, session, task_id, tis_to_keep_query)
else:
# Fetch Top X records given dag_id & task_id ordered by Execution Date
tis_to_keep = tis_to_keep_query.all()

filter_tis = [
not_(
and_(
cls.dag_id == ti.dag_id,
cls.task_id == ti.task_id,
cls.run_id == ti.run_id,
)
)
for ti in tis_to_keep
]

session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False)

cls._do_delete_old_records(
dag_id=dag_id,
task_id=task_id,
ti_clause=tis_to_keep_query.subquery(),
session=session,
)
session.flush()

@classmethod
@retry_db_transaction
def _remove_old_rendered_ti_fields_mysql(cls, dag_id, session, task_id, tis_to_keep_query):
# Fetch Top X records given dag_id & task_id ordered by Execution Date
subq1 = tis_to_keep_query.subquery('subq1')
# Second Subquery
# Workaround for MySQL Limitation (https://stackoverflow.com/a/19344141/5691525)
# Limitation: This version of MySQL does not yet support
# LIMIT & IN/ALL/ANY/SOME subquery
subq2 = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id).subquery('subq2')
def _do_delete_old_records(
cls,
*,
task_id: str,
dag_id: str,
ti_clause: FromClause,
session: Session,
) -> None:
# This query might deadlock occasionally and it should be retried if fails (see decorator)
session.query(cls).filter(
cls.dag_id == dag_id,
cls.task_id == task_id,
tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(subq2),
tuple_not_in_condition(
(cls.dag_id, cls.task_id, cls.run_id),
session.query(ti_clause.c.dag_id, ti_clause.c.task_id, ti_clause.c.run_id),
),
).delete(synchronize_session=False)
20 changes: 19 additions & 1 deletion airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import pendulum
from dateutil import relativedelta
from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, tuple_
from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_
from sqlalchemy.dialects import mssql, mysql
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import Session
Expand Down Expand Up @@ -422,3 +422,21 @@ def tuple_in_condition(
if not clauses:
return false()
return or_(*clauses)


def tuple_not_in_condition(
columns: tuple[ColumnElement, ...],
collection: Iterable[Any],
) -> ColumnOperators:
"""Generates a tuple-not-in-collection operator to use in ``.filter()``.
This is similar to ``tuple_in_condition`` except generating ``NOT IN``.
:meta private:
"""
if settings.engine.dialect.name != "mssql":
return tuple_(*columns).not_in(collection)
clauses = [or_(*(c != v for c, v in zip(columns, values))) for values in collection]
if not clauses:
return true()
return and_(*clauses)