diff --git a/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py new file mode 100644 index 00000000000000..ecb589df4d4ab8 --- /dev/null +++ b/airflow/migrations/versions/a66efa278eea_add_precision_to_execution_date_in_mysql.py @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add Precision to execution_date in RenderedTaskInstanceFields table + +Revision ID: a66efa278eea +Revises: 8f966b9c467a +Create Date: 2020-06-16 21:44:02.883132 + +""" + +from alembic import op +from sqlalchemy.dialects import mysql + +# revision identifiers, used by Alembic. +revision = 'a66efa278eea' +down_revision = '8f966b9c467a' +branch_labels = None +depends_on = None + +TABLE_NAME = 'rendered_task_instance_fields' +COLUMN_NAME = 'execution_date' + + +def upgrade(): + """Add Precision to execution_date in RenderedTaskInstanceFields table for MySQL""" + conn = op.get_bind() + if conn.dialect.name == "mysql": + op.alter_column( + table_name=TABLE_NAME, + column_name=COLUMN_NAME, + type_=mysql.TIMESTAMP(fsp=6), + nullable=False + ) + + +def downgrade(): + """Unapply Add Precision to execution_date in RenderedTaskInstanceFields table""" + conn = op.get_bind() + if conn.dialect.name == "mysql": + op.alter_column( + table_name=TABLE_NAME, + column_name=COLUMN_NAME, + type_=mysql.TIMESTAMP(), + nullable=False + ) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 9bce7d58b75c08..4534a077c71bb4 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -53,6 +53,7 @@ class TaskInstanceTest(unittest.TestCase): def setUp(self): db.clear_db_pools() + db.clear_rendered_ti_fields() with create_session() as session: test_pool = Pool(pool='test_pool', slots=1) session.add(test_pool) @@ -60,6 +61,7 @@ def setUp(self): def tearDown(self): db.clear_db_pools() + db.clear_rendered_ti_fields() with create_session() as session: session.query(TaskFail).delete() session.query(TaskReschedule).delete() @@ -463,58 +465,64 @@ def run_with_error(ti): run_with_error(ti) self.assertEqual(ti.state, State.FAILED) - def test_retry_handling(self): + @parameterized.expand([ + (False, None,), + (True, {'env': None, 'bash_command': 'echo test_retry_handling; exit 1'},), + ]) + def test_retry_handling(self, dag_serialization, expected_rendered_ti_fields): """ Test that task retries are handled properly """ - dag = models.DAG(dag_id='test_retry_handling') - task = BashOperator( - task_id='test_retry_handling_op', - bash_command='exit 1', - retries=1, - retry_delay=datetime.timedelta(seconds=0), - dag=dag, - owner='test_pool', - start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) - - def run_with_error(ti): - try: - ti.run() - except AirflowException: - pass - - ti = TI( - task=task, execution_date=timezone.utcnow()) - self.assertEqual(ti.try_number, 1) - - # first run -- up for retry - run_with_error(ti) - self.assertEqual(ti.state, State.UP_FOR_RETRY) - self.assertEqual(ti._try_number, 1) - self.assertEqual(ti.try_number, 2) - - # second run -- fail - run_with_error(ti) - self.assertEqual(ti.state, State.FAILED) - self.assertEqual(ti._try_number, 2) - self.assertEqual(ti.try_number, 3) - - # Clear the TI state since you can't run a task with a FAILED state without - # clearing it first - dag.clear() - - # third run -- up for retry - run_with_error(ti) - self.assertEqual(ti.state, State.UP_FOR_RETRY) - self.assertEqual(ti._try_number, 3) - self.assertEqual(ti.try_number, 4) - - # fourth run -- fail - run_with_error(ti) - ti.refresh_from_db() - self.assertEqual(ti.state, State.FAILED) - self.assertEqual(ti._try_number, 4) - self.assertEqual(ti.try_number, 5) + with patch("airflow.models.taskinstance.STORE_SERIALIZED_DAGS", dag_serialization): + dag = models.DAG(dag_id='test_retry_handling') + task = BashOperator( + task_id='test_retry_handling_op', + bash_command='echo {{dag.dag_id}}; exit 1', + retries=1, + retry_delay=datetime.timedelta(seconds=0), + dag=dag, + owner='test_pool', + start_date=timezone.datetime(2016, 2, 1, 0, 0, 0)) + + def run_with_error(ti): + try: + ti.run() + except AirflowException: + pass + + ti = TI( + task=task, execution_date=timezone.utcnow()) + self.assertEqual(ti.try_number, 1) + + # first run -- up for retry + run_with_error(ti) + self.assertEqual(ti.state, State.UP_FOR_RETRY) + self.assertEqual(ti._try_number, 1) + self.assertEqual(ti.try_number, 2) + + # second run -- fail + run_with_error(ti) + self.assertEqual(ti.state, State.FAILED) + self.assertEqual(ti._try_number, 2) + self.assertEqual(ti.try_number, 3) + + # Clear the TI state since you can't run a task with a FAILED state without + # clearing it first + dag.clear() + + # third run -- up for retry + run_with_error(ti) + self.assertEqual(ti.state, State.UP_FOR_RETRY) + self.assertEqual(ti._try_number, 3) + self.assertEqual(ti.try_number, 4) + + # fourth run -- fail + run_with_error(ti) + ti.refresh_from_db() + self.assertEqual(ti.state, State.FAILED) + self.assertEqual(ti._try_number, 4) + self.assertEqual(ti.try_number, 5) + self.assertEqual(RenderedTaskInstanceFields.get_templated_fields(ti), expected_rendered_ti_fields) def test_next_retry_datetime(self): delay = datetime.timedelta(seconds=30)