Skip to content

Commit

Permalink
Update migrations to ensure compatibility with Airflow 1.10.* (#9660)
Browse files Browse the repository at this point in the history
closes apache/airflow#9640

GitOrigin-RevId: eb403beea2d1035635b7bea24c49b6b964313e51
  • Loading branch information
kaxil authored and Cloud Composer Team committed Oct 4, 2022
1 parent 6a883a6 commit 06f15cc
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 80 deletions.
46 changes: 26 additions & 20 deletions airflow/migrations/versions/3c20cacc0044_add_dagrun_run_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
Add DagRun run_type
Revision ID: 3c20cacc0044
Revises: 952da73b5eff
Revises: b25a55525161
Create Date: 2020-04-08 13:35:25.671327
"""

import sqlalchemy as sa
from alembic import op
from sqlalchemy import Boolean, Column, Integer, PickleType, String
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.ext.declarative import declarative_base

from airflow.models.base import ID_LEN
Expand All @@ -38,7 +39,7 @@

# revision identifiers, used by Alembic.
revision = "3c20cacc0044"
down_revision = "952da73b5eff"
down_revision = "b25a55525161"
branch_labels = None
depends_on = None

Expand Down Expand Up @@ -68,28 +69,33 @@ def upgrade():
"""Apply Add DagRun run_type"""
run_type_col_type = sa.String(length=50)

# Add nullable column
with op.batch_alter_table("dag_run") as batch_op:
batch_op.add_column(sa.Column("run_type", run_type_col_type, nullable=True))
conn = op.get_bind()
inspector = Inspector.from_engine(conn)
dag_run_columns = [col.get('name') for col in inspector.get_columns("dag_run")]

# Generate run type for existing records
connection = op.get_bind()
sessionmaker = sa.orm.sessionmaker()
session = sessionmaker(bind=connection)
if "run_type" not in dag_run_columns:

for run_type in DagRunType:
session.query(DagRun).filter(DagRun.run_id.like(f"{run_type.value}__%")).update(
{DagRun.run_type: run_type.value}, synchronize_session=False
)
# Add nullable column
with op.batch_alter_table("dag_run") as batch_op:
batch_op.add_column(sa.Column("run_type", run_type_col_type, nullable=True))

# Generate run type for existing records
sessionmaker = sa.orm.sessionmaker()
session = sessionmaker(bind=conn)

session.query(DagRun).filter(DagRun.run_type.is_(None)).update(
{DagRun.run_type: DagRunType.MANUAL.value}, synchronize_session=False
)
session.commit()
for run_type in DagRunType:
session.query(DagRun).filter(DagRun.run_id.like(f"{run_type.value}__%")).update(
{DagRun.run_type: run_type.value}, synchronize_session=False
)

session.query(DagRun).filter(DagRun.run_type.is_(None)).update(
{DagRun.run_type: DagRunType.MANUAL.value}, synchronize_session=False
)
session.commit()

# Make run_type not nullable
with op.batch_alter_table("dag_run") as batch_op:
batch_op.alter_column("run_type", type_=run_type_col_type, nullable=False)
# Make run_type not nullable
with op.batch_alter_table("dag_run") as batch_op:
batch_op.alter_column("run_type", type_=run_type_col_type, nullable=False)


def downgrade():
Expand Down
39 changes: 0 additions & 39 deletions airflow/migrations/versions/4ebbffe0a39a_merge_heads.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""Add RenderedTaskInstanceFields table
Revision ID: 852ae6c715af
Revises: b25a55525161
Revises: a4c2fd67d16b
Create Date: 2020-03-10 22:19:18.034961
"""
Expand All @@ -29,7 +29,7 @@

# revision identifiers, used by Alembic.
revision = '852ae6c715af'
down_revision = 'b25a55525161'
down_revision = 'a4c2fd67d16b'
branch_labels = None
depends_on = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""task reschedule fk on cascade delete
Revision ID: 939bb1e647c8
Revises: 4ebbffe0a39a
Revises: dd4ecb8fbee3
Create Date: 2019-02-04 20:21:50.669751
"""
Expand All @@ -27,7 +27,7 @@

# revision identifiers, used by Alembic.
revision = '939bb1e647c8'
down_revision = '4ebbffe0a39a'
down_revision = 'dd4ecb8fbee3'
branch_labels = None
depends_on = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""Add Precision to execution_date in RenderedTaskInstanceFields table
Revision ID: a66efa278eea
Revises: 8f966b9c467a
Revises: 952da73b5eff
Create Date: 2020-06-16 21:44:02.883132
"""
Expand All @@ -29,7 +29,7 @@

# revision identifiers, used by Alembic.
revision = 'a66efa278eea'
down_revision = '8f966b9c467a'
down_revision = '952da73b5eff'
branch_labels = None
depends_on = None

Expand Down
6 changes: 3 additions & 3 deletions airflow/migrations/versions/b0125267960b_merge_heads.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
# specific language governing permissions and limitations
# under the License.

"""merge multiple heads
"""Merge the four heads back together
Revision ID: 08364691d074
Revises: a56c9515abdc, 74effc47d867, b3b105409875, bbf4a7ad0465
Revises: a56c9515abdc, 004c1210f153, 74effc47d867, b3b105409875
Create Date: 2019-11-19 22:05:11.752222
"""

# revision identifiers, used by Alembic.
revision = '08364691d074'
down_revision = ('a56c9515abdc', '74effc47d867', 'b3b105409875', 'bbf4a7ad0465')
down_revision = ('a56c9515abdc', '004c1210f153', '74effc47d867', 'b3b105409875')
branch_labels = None
depends_on = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"""Increase length of pool name
Revision ID: b25a55525161
Revises: a4c2fd67d16b
Revises: bbf4a7ad0465
Create Date: 2020-03-09 08:48:14.534700
"""
Expand All @@ -29,7 +29,7 @@

# revision identifiers, used by Alembic.
revision = 'b25a55525161'
down_revision = 'a4c2fd67d16b'
down_revision = 'bbf4a7ad0465'
branch_labels = None
depends_on = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,33 @@
"""Remove id column from xcom
Revision ID: bbf4a7ad0465
Revises: 004c1210f153
Revises: cf5dc11e79ad
Create Date: 2019-10-29 13:53:09.445943
"""

from alembic import op
from sqlalchemy import Column, Integer
from sqlalchemy.engine.reflection import Inspector

# revision identifiers, used by Alembic.
revision = 'bbf4a7ad0465'
down_revision = '004c1210f153'
down_revision = 'cf5dc11e79ad'
branch_labels = None
depends_on = None


def upgrade():
"""Apply Remove id column from xcom"""
conn = op.get_bind()
inspector = Inspector.from_engine(conn)

with op.batch_alter_table('xcom') as bop:
bop.drop_column('id')
bop.drop_index('idx_xcom_dag_task_date')
bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key', 'execution_date'])
xcom_columns = [col.get('name') for col in inspector.get_columns("xcom")]
if "id" in xcom_columns:
bop.drop_column('id')
bop.drop_index('idx_xcom_dag_task_date')
bop.create_primary_key('pk_xcom', ['dag_id', 'task_id', 'key', 'execution_date'])


def downgrade():
Expand Down
14 changes: 9 additions & 5 deletions airflow/migrations/versions/cf5dc11e79ad_drop_user_and_chart.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""drop_user_and_chart
Revision ID: cf5dc11e79ad
Revises: 41f5f12752f8
Revises: a66efa278eea
Create Date: 2019-01-24 15:30:35.834740
"""
Expand All @@ -29,7 +29,7 @@

# revision identifiers, used by Alembic.
revision = 'cf5dc11e79ad'
down_revision = '41f5f12752f8'
down_revision = 'a66efa278eea'
branch_labels = None
depends_on = None

Expand All @@ -43,12 +43,16 @@ def upgrade(): # noqa: D103

conn = op.get_bind()
inspector = Inspector.from_engine(conn)
tables = inspector.get_table_names()

if 'known_event' in inspector.get_table_names() != 'sqlite':
if 'known_event' in tables:
op.drop_constraint('known_event_user_id_fkey', 'known_event')

op.drop_table("chart")
op.drop_table("users")
if "chart" in tables:
op.drop_table("chart", )

if "users" in tables:
op.drop_table("users")


def downgrade(): # noqa: D103
Expand Down

0 comments on commit 06f15cc

Please sign in to comment.