diff --git a/airflow/migrations/versions/952da73b5eff_add_dag_code_table.py b/airflow/migrations/versions/952da73b5eff_add_dag_code_table.py index 87792c46952c2b..81d6ec9fdf8339 100644 --- a/airflow/migrations/versions/952da73b5eff_add_dag_code_table.py +++ b/airflow/migrations/versions/952da73b5eff_add_dag_code_table.py @@ -29,7 +29,6 @@ # revision identifiers, used by Alembic. from airflow.models.dagcode import DagCode -from airflow.models.serialized_dag import SerializedDagModel revision = '952da73b5eff' down_revision = '852ae6c715af' @@ -38,6 +37,18 @@ def upgrade(): + from sqlalchemy.ext.declarative import declarative_base + + Base = declarative_base() + + class SerializedDagModel(Base): + __tablename__ = 'serialized_dag' + + # There are other columns here, but these are the only ones we need for the SELECT/UPDATE we are doing + dag_id = sa.Column(sa.String(250), primary_key=True) + fileloc = sa.Column(sa.String(2000), nullable=False) + fileloc_hash = sa.Column(sa.BigInteger, nullable=False) + """Apply add source code table""" op.create_table('dag_code', # pylint: disable=no-member sa.Column('fileloc_hash', sa.BigInteger(), @@ -48,11 +59,13 @@ def upgrade(): conn = op.get_bind() if conn.dialect.name not in ('sqlite'): - op.drop_index('idx_fileloc_hash', 'serialized_dag') + if conn.dialect.name == "mssql": + op.drop_index('idx_fileloc_hash', 'serialized_dag') + op.alter_column(table_name='serialized_dag', column_name='fileloc_hash', type_=sa.BigInteger(), nullable=False) - op.create_index( # pylint: disable=no-member - 'idx_fileloc_hash', 'serialized_dag', ['fileloc_hash']) + if conn.dialect.name == "mssql": + op.create_index('idx_fileloc_hash', 'serialized_dag', ['fileloc_hash']) sessionmaker = sa.orm.sessionmaker() session = sessionmaker(bind=conn)