Skip to content

Commit

Permalink
Fix retries causing constraint violation on MySQL with DAG Serializat…
Browse files Browse the repository at this point in the history
…ion (apache#9336)

The issue was caused because the `rendered_task_instance_fields` table did not have precision and hence causing `_mysql_exceptions.IntegrityError`.

closes apache#9148

(cherry picked from commit 9e6b5ab)
  • Loading branch information
kaxil authored and Chris Fei committed Mar 5, 2021
1 parent 207d80d commit 4ccbb01
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add Precision to execution_date in RenderedTaskInstanceFields table
Revision ID: a66efa278eea
Revises: 8f966b9c467a
Create Date: 2020-06-16 21:44:02.883132
"""

from alembic import op
from sqlalchemy.dialects import mysql

# revision identifiers, used by Alembic.
revision = 'a66efa278eea'
down_revision = '8f966b9c467a'
branch_labels = None
depends_on = None

TABLE_NAME = 'rendered_task_instance_fields'
COLUMN_NAME = 'execution_date'


def upgrade():
"""Add Precision to execution_date in RenderedTaskInstanceFields table for MySQL"""
conn = op.get_bind()
if conn.dialect.name == "mysql":
op.alter_column(
table_name=TABLE_NAME,
column_name=COLUMN_NAME,
type_=mysql.TIMESTAMP(fsp=6),
nullable=False
)


def downgrade():
"""Unapply Add Precision to execution_date in RenderedTaskInstanceFields table"""
conn = op.get_bind()
if conn.dialect.name == "mysql":
op.alter_column(
table_name=TABLE_NAME,
column_name=COLUMN_NAME,
type_=mysql.TIMESTAMP(),
nullable=False
)
106 changes: 57 additions & 49 deletions tests/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ class TaskInstanceTest(unittest.TestCase):

def setUp(self):
db.clear_db_pools()
db.clear_rendered_ti_fields()
with create_session() as session:
test_pool = Pool(pool='test_pool', slots=1)
session.add(test_pool)
session.commit()

def tearDown(self):
db.clear_db_pools()
db.clear_rendered_ti_fields()
with create_session() as session:
session.query(TaskFail).delete()
session.query(TaskReschedule).delete()
Expand Down Expand Up @@ -463,58 +465,64 @@ def run_with_error(ti):
run_with_error(ti)
self.assertEqual(ti.state, State.FAILED)

def test_retry_handling(self):
@parameterized.expand([
(False, None,),
(True, {'env': None, 'bash_command': 'echo test_retry_handling; exit 1'},),
])
def test_retry_handling(self, dag_serialization, expected_rendered_ti_fields):
"""
Test that task retries are handled properly
"""
dag = models.DAG(dag_id='test_retry_handling')
task = BashOperator(
task_id='test_retry_handling_op',
bash_command='exit 1',
retries=1,
retry_delay=datetime.timedelta(seconds=0),
dag=dag,
owner='test_pool',
start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))

def run_with_error(ti):
try:
ti.run()
except AirflowException:
pass

ti = TI(
task=task, execution_date=timezone.utcnow())
self.assertEqual(ti.try_number, 1)

# first run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti._try_number, 1)
self.assertEqual(ti.try_number, 2)

# second run -- fail
run_with_error(ti)
self.assertEqual(ti.state, State.FAILED)
self.assertEqual(ti._try_number, 2)
self.assertEqual(ti.try_number, 3)

# Clear the TI state since you can't run a task with a FAILED state without
# clearing it first
dag.clear()

# third run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti._try_number, 3)
self.assertEqual(ti.try_number, 4)

# fourth run -- fail
run_with_error(ti)
ti.refresh_from_db()
self.assertEqual(ti.state, State.FAILED)
self.assertEqual(ti._try_number, 4)
self.assertEqual(ti.try_number, 5)
with patch("airflow.models.taskinstance.STORE_SERIALIZED_DAGS", dag_serialization):
dag = models.DAG(dag_id='test_retry_handling')
task = BashOperator(
task_id='test_retry_handling_op',
bash_command='echo {{dag.dag_id}}; exit 1',
retries=1,
retry_delay=datetime.timedelta(seconds=0),
dag=dag,
owner='test_pool',
start_date=timezone.datetime(2016, 2, 1, 0, 0, 0))

def run_with_error(ti):
try:
ti.run()
except AirflowException:
pass

ti = TI(
task=task, execution_date=timezone.utcnow())
self.assertEqual(ti.try_number, 1)

# first run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti._try_number, 1)
self.assertEqual(ti.try_number, 2)

# second run -- fail
run_with_error(ti)
self.assertEqual(ti.state, State.FAILED)
self.assertEqual(ti._try_number, 2)
self.assertEqual(ti.try_number, 3)

# Clear the TI state since you can't run a task with a FAILED state without
# clearing it first
dag.clear()

# third run -- up for retry
run_with_error(ti)
self.assertEqual(ti.state, State.UP_FOR_RETRY)
self.assertEqual(ti._try_number, 3)
self.assertEqual(ti.try_number, 4)

# fourth run -- fail
run_with_error(ti)
ti.refresh_from_db()
self.assertEqual(ti.state, State.FAILED)
self.assertEqual(ti._try_number, 4)
self.assertEqual(ti.try_number, 5)
self.assertEqual(RenderedTaskInstanceFields.get_templated_fields(ti), expected_rendered_ti_fields)

def test_next_retry_datetime(self):
delay = datetime.timedelta(seconds=30)
Expand Down

0 comments on commit 4ccbb01

Please sign in to comment.