From 9df3fba1037ee2dd7c2b0d185eae56729551804c Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Tue, 7 Apr 2020 15:39:08 +0100 Subject: [PATCH] Improve add_dag_code_table migration This migration had a few problems that could cause us trouble in the future: 1. It was importing the live model definition, which could break in the future (if for instance it ever got a new column added, when this migration runs it would try to use that new definition, but the column won't exist yet). 2. It was selecting the (large) `data` column needlessly 3. It was dropping and re-creating the index but that is only needed on MSSQL, not for MySQL or Postgresql. --- .../952da73b5eff_add_dag_code_table.py | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/airflow/migrations/versions/952da73b5eff_add_dag_code_table.py b/airflow/migrations/versions/952da73b5eff_add_dag_code_table.py index 87792c46952c2..81d6ec9fdf833 100644 --- a/airflow/migrations/versions/952da73b5eff_add_dag_code_table.py +++ b/airflow/migrations/versions/952da73b5eff_add_dag_code_table.py @@ -29,7 +29,6 @@ # revision identifiers, used by Alembic. from airflow.models.dagcode import DagCode -from airflow.models.serialized_dag import SerializedDagModel revision = '952da73b5eff' down_revision = '852ae6c715af' @@ -38,6 +37,18 @@ def upgrade(): + from sqlalchemy.ext.declarative import declarative_base + + Base = declarative_base() + + class SerializedDagModel(Base): + __tablename__ = 'serialized_dag' + + # There are other columns here, but these are the only ones we need for the SELECT/UPDATE we are doing + dag_id = sa.Column(sa.String(250), primary_key=True) + fileloc = sa.Column(sa.String(2000), nullable=False) + fileloc_hash = sa.Column(sa.BigInteger, nullable=False) + """Apply add source code table""" op.create_table('dag_code', # pylint: disable=no-member sa.Column('fileloc_hash', sa.BigInteger(), @@ -48,11 +59,13 @@ def upgrade(): conn = op.get_bind() if conn.dialect.name not in ('sqlite'): - op.drop_index('idx_fileloc_hash', 'serialized_dag') + if conn.dialect.name == "mssql": + op.drop_index('idx_fileloc_hash', 'serialized_dag') + op.alter_column(table_name='serialized_dag', column_name='fileloc_hash', type_=sa.BigInteger(), nullable=False) - op.create_index( # pylint: disable=no-member - 'idx_fileloc_hash', 'serialized_dag', ['fileloc_hash']) + if conn.dialect.name == "mssql": + op.create_index('idx_fileloc_hash', 'serialized_dag', ['fileloc_hash']) sessionmaker = sa.orm.sessionmaker() session = sessionmaker(bind=conn)