From 55619a01ebc08534a48bb8b26cb6ff0957efa9d7 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 26 Sep 2022 14:10:15 +0800 Subject: [PATCH] Simplify RTIF.delete_old_records() A lot of the codes and comments are actually not relevant since we've removed the funky execution_date based filtering in 2.3, and we can simply the implementation quite a bit now. --- airflow/models/renderedtifields.py | 68 ++++++++++++------------------ airflow/utils/sqlalchemy.py | 20 ++++++++- 2 files changed, 46 insertions(+), 42 deletions(-) diff --git a/airflow/models/renderedtifields.py b/airflow/models/renderedtifields.py index 2de03fba674cb7..c0f30a2a1aa6b5 100644 --- a/airflow/models/renderedtifields.py +++ b/airflow/models/renderedtifields.py @@ -19,9 +19,10 @@ from __future__ import annotations import os +from typing import TYPE_CHECKING import sqlalchemy_jsonfield -from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, and_, not_, text, tuple_ +from sqlalchemy import Column, ForeignKeyConstraint, Integer, PrimaryKeyConstraint, text from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.orm import Session, relationship @@ -32,6 +33,10 @@ from airflow.settings import json from airflow.utils.retries import retry_db_transaction from airflow.utils.session import NEW_SESSION, provide_session +from airflow.utils.sqlalchemy import tuple_not_in_condition + +if TYPE_CHECKING: + from sqlalchemy.sql import FromClause class RenderedTaskInstanceFields(Base): @@ -183,9 +188,9 @@ def delete_old_records( cls, task_id: str, dag_id: str, - num_to_keep=conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0), - session: Session = None, - ): + num_to_keep: int = conf.getint("core", "max_num_rendered_ti_fields_per_task", fallback=0), + session: Session = NEW_SESSION, + ) -> None: """ Keep only Last X (num_to_keep) number of records for a task by deleting others. @@ -211,49 +216,30 @@ def delete_old_records( .limit(num_to_keep) ) - if session.bind.dialect.name in ["postgresql", "sqlite"]: - # Fetch Top X records given dag_id & task_id ordered by Execution Date - subq1 = tis_to_keep_query.subquery() - excluded = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id) - session.query(cls).filter( - cls.dag_id == dag_id, - cls.task_id == task_id, - tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(excluded), - ).delete(synchronize_session=False) - elif session.bind.dialect.name in ["mysql"]: - cls._remove_old_rendered_ti_fields_mysql(dag_id, session, task_id, tis_to_keep_query) - else: - # Fetch Top X records given dag_id & task_id ordered by Execution Date - tis_to_keep = tis_to_keep_query.all() - - filter_tis = [ - not_( - and_( - cls.dag_id == ti.dag_id, - cls.task_id == ti.task_id, - cls.run_id == ti.run_id, - ) - ) - for ti in tis_to_keep - ] - - session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False) - + cls._do_delete_old_records( + dag_id=dag_id, + task_id=task_id, + ti_clause=tis_to_keep_query.subquery(), + session=session, + ) session.flush() @classmethod @retry_db_transaction - def _remove_old_rendered_ti_fields_mysql(cls, dag_id, session, task_id, tis_to_keep_query): - # Fetch Top X records given dag_id & task_id ordered by Execution Date - subq1 = tis_to_keep_query.subquery('subq1') - # Second Subquery - # Workaround for MySQL Limitation (https://stackoverflow.com/a/19344141/5691525) - # Limitation: This version of MySQL does not yet support - # LIMIT & IN/ALL/ANY/SOME subquery - subq2 = session.query(subq1.c.dag_id, subq1.c.task_id, subq1.c.run_id).subquery('subq2') + def _do_delete_old_records( + cls, + *, + task_id: str, + dag_id: str, + ti_clause: FromClause, + session: Session, + ) -> None: # This query might deadlock occasionally and it should be retried if fails (see decorator) session.query(cls).filter( cls.dag_id == dag_id, cls.task_id == task_id, - tuple_(cls.dag_id, cls.task_id, cls.run_id).notin_(subq2), + tuple_not_in_condition( + (cls.dag_id, cls.task_id, cls.run_id), + session.query(ti_clause.c.dag_id, ti_clause.c.task_id, ti_clause.c.run_id), + ), ).delete(synchronize_session=False) diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py index ef0d29ebf8a722..0531c29dbf410d 100644 --- a/airflow/utils/sqlalchemy.py +++ b/airflow/utils/sqlalchemy.py @@ -25,7 +25,7 @@ import pendulum from dateutil import relativedelta -from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, tuple_ +from sqlalchemy import TIMESTAMP, PickleType, and_, event, false, nullsfirst, or_, true, tuple_ from sqlalchemy.dialects import mssql, mysql from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import Session @@ -422,3 +422,21 @@ def tuple_in_condition( if not clauses: return false() return or_(*clauses) + + +def tuple_not_in_condition( + columns: tuple[ColumnElement, ...], + collection: Iterable[Any], +) -> ColumnOperators: + """Generates a tuple-not-in-collection operator to use in ``.filter()``. + + This is similar to ``tuple_in_condition`` except generating ``NOT IN``. + + :meta private: + """ + if settings.engine.dialect.name != "mssql": + return tuple_(*columns).not_in(collection) + clauses = [or_(*(c != v for c, v in zip(columns, values))) for values in collection] + if not clauses: + return true() + return and_(*clauses)