Skip to content

Commit

Permalink
Simplify RTIF.delete_old_records()
Browse files Browse the repository at this point in the history
A lot of the codes and comments are actually not relevant since we've
removed the funky execution_date based filtering in 2.3, and we can
simply the implementation quite a bit now.
  • Loading branch information
uranusjr committed Sep 26, 2022
1 parent 465c564 commit 55619a0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 42 deletions.
68 changes: 27 additions & 41 deletions airflow/models/renderedtifields.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING

import sqlalchemy_jsonfield
from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, and_, not_, text, tuple_
from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, text
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import Session, relationship

Expand All @@ -32,6 +33,10 @@
from airflow.settings import json
from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import tuple_not_in_condition

if TYPE_CHECKING:
from sqlalchemy.sql import FromClause


class RenderedTaskInstanceFields(Base):
Expand Down Expand Up @@ -183,9 +188,9 @@ def delete_old_records(
cls,
task_id: str,
dag_id: str,
num_to_keep=conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0),
session: Session = None,
):
num_to_keep: int = conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0),
session: Session = NEW_SESSION,
) -> None:
"""
Keep only Last X (num_to_keep) number of records for a task by deleting others.
Expand All @@ -211,49 +216,30 @@ def delete_old_records(
.limit(num_to_keep)
)

if session.bind.dialect.name in ["postgresql", "sqlite"]:
# Fetch Top X records given dag_id & task_id ordered by Execution Date
subq1 = tis_to_keep_query.subquery()
excluded = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id)
session.query(cls).filter(
cls.dag_id == dag_id,
cls.task_id == task_id,
tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(excluded),
).delete(synchronize_session=False)
elif session.bind.dialect.name in ["mysql"]:
cls._remove_old_rendered_ti_fields_mysql(dag_id, session, task_id, tis_to_keep_query)
else:
# Fetch Top X records given dag_id & task_id ordered by Execution Date
tis_to_keep = tis_to_keep_query.all()

filter_tis = [
not_(
and_(
cls.dag_id == ti.dag_id,
cls.task_id == ti.task_id,
cls.run_id == ti.run_id,
)
)
for ti in tis_to_keep
]

session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False)

cls._do_delete_old_records(
dag_id=dag_id,
task_id=task_id,
ti_clause=tis_to_keep_query.subquery(),
session=session,
)
session.flush()

@classmethod
@retry_db_transaction
def _remove_old_rendered_ti_fields_mysql(cls, dag_id, session, task_id, tis_to_keep_query):
# Fetch Top X records given dag_id & task_id ordered by Execution Date
subq1 = tis_to_keep_query.subquery('subq1')
# Second Subquery
# Workaround for MySQL Limitation (https://stackoverflow.com/a/19344141/5691525)
# Limitation: This version of MySQL does not yet support
# LIMIT & IN/ALL/ANY/SOME subquery
subq2 = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id).subquery('subq2')
def _do_delete_old_records(
cls,
*,
task_id: str,
dag_id: str,
ti_clause: FromClause,
session: Session,
) -> None:
# This query might deadlock occasionally and it should be retried if fails (see decorator)
session.query(cls).filter(
cls.dag_id == dag_id,
cls.task_id == task_id,
tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(subq2),
tuple_not_in_condition(
(cls.dag_id, cls.task_id, cls.run_id),
session.query(ti_clause.c.dag_id, ti_clause.c.task_id, ti_clause.c.run_id),
),
).delete(synchronize_session=False)
20 changes: 19 additions & 1 deletion airflow/utils/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import pendulum
from dateutil import relativedelta
from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, tuple_
from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_
from sqlalchemy.dialects import mssql, mysql
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import Session
Expand Down Expand Up @@ -422,3 +422,21 @@ def tuple_in_condition(
if not clauses:
return false()
return or_(*clauses)


def tuple_not_in_condition(
columns: tuple[ColumnElement, ...],
collection: Iterable[Any],
) -> ColumnOperators:
"""Generates a tuple-not-in-collection operator to use in ``.filter()``.
This is similar to ``tuple_in_condition`` except generating ``NOT IN``.
:meta private:
"""
if settings.engine.dialect.name != "mssql":
return tuple_(*columns).not_in(collection)
clauses = [or_(*(c != v for c, v in zip(columns, values))) for values in collection]
if not clauses:
return true()
return and_(*clauses)

0 comments on commit 55619a0

Please sign in to comment.