From 3b5af9507ef7a4291ac6723ed1359ac4b71b46c2 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 3 May 2022 12:12:57 +0100 Subject: [PATCH 1/3] Add cascade to `dag_tag` to `dag` foreignkey Bulk delete does not work if the cascade behaviour of a foreignkey is set on python side(relationship configuration). To allow bulk delete of dags we need to setup cascade deletion in the DB. The warning on query.delete at https://docs.sqlalchemy.org/en/14/orm/session_basics.html#selecting-a-synchronization-strategy stated that: The operations do not offer in-Python cascading of relationships - it is assumed that ON UPDATE CASCADE and/or ON DELETE CASCADE is configured for any foreign key references which require it, otherwise the database may emit an integrity violation if foreign key references are being enforced. Another alternative is avoiding bulk delete of dags but I prefer we support bulk deletes. This will break offline sql generation for mssql(already broken before now :) ). Also, since there's only one foreign key in `dag_tag` table, I assume that the foreign key would be named `dag_tag_ibfk_1` in `mysql`. This avoided having to query the db for the name. The foreignkey is explicitly named now, would be easy for future upgrades --- airflow/migrations/utils.py | 3 +- ...7fdf6_add_cascade_to_dag_tag_foreignkey.py | 84 +++++++++++++++++++ airflow/models/dag.py | 8 +- docs/apache-airflow/migrations-ref.rst | 4 +- 4 files changed, 95 insertions(+), 4 deletions(-) create mode 100644 airflow/migrations/versions/0109_3c94c427fdf6_add_cascade_to_dag_tag_foreignkey.py diff --git a/airflow/migrations/utils.py b/airflow/migrations/utils.py index f1f3ea0442d03..5737fa950774d 100644 --- a/airflow/migrations/utils.py +++ b/airflow/migrations/utils.py @@ -35,7 +35,8 @@ def get_mssql_table_constraints(conn, table_name): FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS tc JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS ccu ON ccu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME WHERE tc.TABLE_NAME = '{table_name}' AND - (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE') + (tc.CONSTRAINT_TYPE = 'PRIMARY KEY' or UPPER(tc.CONSTRAINT_TYPE) = 'UNIQUE' + or UPPER(tc.CONSTRAINT_TYPE) = 'FOREIGN KEY') """ result = conn.execute(query).fetchall() constraint_dict = defaultdict(lambda: defaultdict(list)) diff --git a/airflow/migrations/versions/0109_3c94c427fdf6_add_cascade_to_dag_tag_foreignkey.py b/airflow/migrations/versions/0109_3c94c427fdf6_add_cascade_to_dag_tag_foreignkey.py new file mode 100644 index 0000000000000..55d9e9754e532 --- /dev/null +++ b/airflow/migrations/versions/0109_3c94c427fdf6_add_cascade_to_dag_tag_foreignkey.py @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add cascade to dag_tag foreign key + +Revision ID: 3c94c427fdf6 +Revises: 1de7bc13c950 +Create Date: 2022-05-03 09:47:41.957710 + +""" + +from alembic import op + +from airflow.migrations.utils import get_mssql_table_constraints + +# revision identifiers, used by Alembic. +revision = '3c94c427fdf6' +down_revision = '1de7bc13c950' +branch_labels = None +depends_on = None +airflow_version = '2.3.2' + + +def upgrade(): + """Apply Add cascade to dag_tag foreignkey""" + conn = op.get_bind() + if conn.dialect.name == 'sqlite': + naming_convention = { + "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", + } + with op.batch_alter_table( + 'dag_tag', naming_convention=naming_convention, recreate='always' + ) as batch_op: + batch_op.drop_constraint('fk_dag_tag_dag_id_dag', type_='foreignkey') + batch_op.create_foreign_key( + "dag_tag_dag_id_fkey", 'dag', ['dag_id'], ['dag_id'], ondelete='CASCADE' + ) + else: + with op.batch_alter_table('dag_tag') as batch_op: + if conn.dialect.name == 'mssql': + constraints = get_mssql_table_constraints(conn, 'dag_tag') + Fk, _ = constraints['FOREIGN KEY'].popitem() + batch_op.drop_constraint(Fk, type_='foreignkey') + if conn.dialect.name == 'postgresql': + batch_op.drop_constraint('dag_tag_dag_id_fkey', type_='foreignkey') + if conn.dialect.name == 'mysql': + batch_op.drop_constraint('dag_tag_ibfk_1', type_='foreignkey') + + batch_op.create_foreign_key( + "dag_tag_dag_id_fkey", 'dag', ['dag_id'], ['dag_id'], ondelete='CASCADE' + ) + + +def downgrade(): + """Unapply Add cascade to dag_tag foreignkey""" + conn = op.get_bind() + if conn.dialect.name == 'sqlite': + with op.batch_alter_table('dag_tag') as batch_op: + batch_op.drop_constraint('dag_tag_dag_id_fkey', type_='foreignkey') + batch_op.create_foreign_key("fk_dag_tag_dag_id_dag", 'dag', ['dag_id'], ['dag_id']) + else: + with op.batch_alter_table('dag_tag') as batch_op: + batch_op.drop_constraint('dag_tag_dag_id_fkey', type_='foreignkey') + batch_op.create_foreign_key( + None, + 'dag', + ['dag_id'], + ['dag_id'], + ) diff --git a/airflow/models/dag.py b/airflow/models/dag.py index f48883ec6bc98..f3684bb414fb1 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2638,7 +2638,11 @@ class DagTag(Base): __tablename__ = "dag_tag" name = Column(String(100), primary_key=True) - dag_id = Column(String(ID_LEN), ForeignKey('dag.dag_id'), primary_key=True) + dag_id = Column( + String(ID_LEN), + ForeignKey('dag.dag_id', name='dag_tag_dag_id_fkey', ondelete='CASCADE'), + primary_key=True, + ) def __repr__(self): return self.name @@ -2689,7 +2693,7 @@ class DagModel(Base): timetable_description = Column(String(1000), nullable=True) # Tags for view filter - tags = relationship('DagTag', cascade='all,delete-orphan', backref=backref('dag')) + tags = relationship('DagTag', cascade='all, delete, delete-orphan', backref=backref('dag')) max_active_tasks = Column(Integer, nullable=False) max_active_runs = Column(Integer, nullable=True) diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 1f81e8cbb2acc..ae765b99dd80f 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -27,7 +27,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``1de7bc13c950`` (head) | ``b1b348e02d07`` | ``2.3.1`` | Add index for ``event`` column in ``log`` table. | +| ``3c94c427fdf6`` (head) | ``1de7bc13c950`` | ``2.3.2`` | Add cascade to dag_tag foreign key | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``1de7bc13c950`` | ``b1b348e02d07`` | ``2.3.1`` | Add index for ``event`` column in ``log`` table. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``b1b348e02d07`` | ``75d5ed6c2b43`` | ``2.3.0`` | Update dag.default_view to grid | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ From a4f5a0ae46c19b79276022c99f6d30cde627afa8 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 18 May 2022 11:33:07 +0100 Subject: [PATCH 2/3] update filename --- ...eignkey.py => 0110_2_3_1_add_cascade_to_dag_tag_foreignkey.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow/migrations/versions/{0109_3c94c427fdf6_add_cascade_to_dag_tag_foreignkey.py => 0110_2_3_1_add_cascade_to_dag_tag_foreignkey.py} (100%) diff --git a/airflow/migrations/versions/0109_3c94c427fdf6_add_cascade_to_dag_tag_foreignkey.py b/airflow/migrations/versions/0110_2_3_1_add_cascade_to_dag_tag_foreignkey.py similarity index 100% rename from airflow/migrations/versions/0109_3c94c427fdf6_add_cascade_to_dag_tag_foreignkey.py rename to airflow/migrations/versions/0110_2_3_1_add_cascade_to_dag_tag_foreignkey.py From ad286174a818e119275758b1e1edfbe884704ccc Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 26 May 2022 07:49:20 +0100 Subject: [PATCH 3/3] fixup! update filename --- ...eignkey.py => 0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename airflow/migrations/versions/{0110_2_3_1_add_cascade_to_dag_tag_foreignkey.py => 0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py} (100%) diff --git a/airflow/migrations/versions/0110_2_3_1_add_cascade_to_dag_tag_foreignkey.py b/airflow/migrations/versions/0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py similarity index 100% rename from airflow/migrations/versions/0110_2_3_1_add_cascade_to_dag_tag_foreignkey.py rename to airflow/migrations/versions/0110_2_3_2_add_cascade_to_dag_tag_foreignkey.py