Skip to content

Commit

Permalink
[AIRFLOW-3462] Move TaskReschedule out of models.py (apache#4618)
Browse files Browse the repository at this point in the history
  • Loading branch information
seelmann authored and wayne.morris committed Jul 29, 2019
1 parent e3ee9bb commit 76637ef
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 81 deletions.
2 changes: 1 addition & 1 deletion airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from airflow.utils.state import State

Base = models.base.Base
ID_LEN = models.ID_LEN
ID_LEN = models.base.ID_LEN


class BaseJob(Base, LoggingMixin):
Expand Down
69 changes: 4 additions & 65 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from builtins import ImportError as BuiltinImportError, bytes, object, str
from future.standard_library import install_aliases

from airflow.models.base import Base
from airflow.models.base import Base, ID_LEN

try:
# Fix Python > 3.7 deprecation
Expand Down Expand Up @@ -63,9 +63,8 @@
from urllib.parse import quote

from sqlalchemy import (
Boolean, Column, DateTime, Float, ForeignKeyConstraint, Index,
Integer, LargeBinary, PickleType, String, Text, UniqueConstraint, and_, asc,
func, or_, true as sqltrue
Boolean, Column, DateTime, Float, Index, Integer, LargeBinary, PickleType, String,
Text, UniqueConstraint, and_, func, or_, true as sqltrue
)
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import reconstructor, synonym
Expand All @@ -85,6 +84,7 @@
from airflow.dag.base_dag import BaseDag, BaseDagBag
from airflow.lineage import apply_lineage, prepare_lineage
from airflow.models.dagpickle import DagPickle
from airflow.models.taskreschedule import TaskReschedule
from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
Expand All @@ -109,7 +109,6 @@

install_aliases()

ID_LEN = 250
XCOM_RETURN_KEY = 'return_value'

Stats = settings.Stats
Expand Down Expand Up @@ -1888,66 +1887,6 @@ def __init__(self, task, execution_date, start_date, end_date):
self.duration = None


class TaskReschedule(Base):
"""
TaskReschedule tracks rescheduled task instances.
"""

__tablename__ = "task_reschedule"

id = Column(Integer, primary_key=True)
task_id = Column(String(ID_LEN), nullable=False)
dag_id = Column(String(ID_LEN), nullable=False)
execution_date = Column(UtcDateTime, nullable=False)
try_number = Column(Integer, nullable=False)
start_date = Column(UtcDateTime, nullable=False)
end_date = Column(UtcDateTime, nullable=False)
duration = Column(Integer, nullable=False)
reschedule_date = Column(UtcDateTime, nullable=False)

__table_args__ = (
Index('idx_task_reschedule_dag_task_date', dag_id, task_id, execution_date,
unique=False),
ForeignKeyConstraint([task_id, dag_id, execution_date],
[TaskInstance.task_id, TaskInstance.dag_id,
TaskInstance.execution_date],
name='task_reschedule_dag_task_date_fkey')
)

def __init__(self, task, execution_date, try_number, start_date, end_date,
reschedule_date):
self.dag_id = task.dag_id
self.task_id = task.task_id
self.execution_date = execution_date
self.try_number = try_number
self.start_date = start_date
self.end_date = end_date
self.reschedule_date = reschedule_date
self.duration = (self.end_date - self.start_date).total_seconds()

@staticmethod
@provide_session
def find_for_task_instance(task_instance, session):
"""
Returns all task reschedules for the task instance and try number,
in ascending order.
:param task_instance: the task instance to find task reschedules for
:type task_instance: TaskInstance
"""
TR = TaskReschedule
return (
session
.query(TR)
.filter(TR.dag_id == task_instance.dag_id,
TR.task_id == task_instance.task_id,
TR.execution_date == task_instance.execution_date,
TR.try_number == task_instance.try_number)
.order_by(asc(TR.id))
.all()
)


class Log(Base):
"""
Used to actively log events to the database
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,5 @@
Base = declarative_base()
else:
Base = declarative_base(metadata=MetaData(schema=SQL_ALCHEMY_SCHEMA))

ID_LEN = 250
3 changes: 2 additions & 1 deletion airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
from sqlalchemy.orm import synonym

from airflow import LoggingMixin, AirflowException
from airflow.models import Base, ID_LEN, get_fernet
from airflow.models import get_fernet
from airflow.models.base import Base, ID_LEN


class Connection(Base, LoggingMixin):
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/slamiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from sqlalchemy import Boolean, Column, String, Index, Text

from airflow.models import Base, ID_LEN
from airflow.models.base import Base, ID_LEN
from airflow.utils.sqlalchemy import UtcDateTime


Expand Down
84 changes: 84 additions & 0 deletions airflow/models/taskreschedule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, String, asc

from airflow.models.base import Base, ID_LEN
from airflow.utils.db import provide_session
from airflow.utils.sqlalchemy import UtcDateTime


class TaskReschedule(Base):
"""
TaskReschedule tracks rescheduled task instances.
"""

__tablename__ = "task_reschedule"

id = Column(Integer, primary_key=True)
task_id = Column(String(ID_LEN), nullable=False)
dag_id = Column(String(ID_LEN), nullable=False)
execution_date = Column(UtcDateTime, nullable=False)
try_number = Column(Integer, nullable=False)
start_date = Column(UtcDateTime, nullable=False)
end_date = Column(UtcDateTime, nullable=False)
duration = Column(Integer, nullable=False)
reschedule_date = Column(UtcDateTime, nullable=False)

__table_args__ = (
Index('idx_task_reschedule_dag_task_date', dag_id, task_id, execution_date,
unique=False),
ForeignKeyConstraint([task_id, dag_id, execution_date],
['task_instance.task_id', 'task_instance.dag_id',
'task_instance.execution_date'],
name='task_reschedule_dag_task_date_fkey')
)

def __init__(self, task, execution_date, try_number, start_date, end_date,
reschedule_date):
self.dag_id = task.dag_id
self.task_id = task.task_id
self.execution_date = execution_date
self.try_number = try_number
self.start_date = start_date
self.end_date = end_date
self.reschedule_date = reschedule_date
self.duration = (self.end_date - self.start_date).total_seconds()

@staticmethod
@provide_session
def find_for_task_instance(task_instance, session):
"""
Returns all task reschedules for the task instance and try number,
in ascending order.
:param task_instance: the task instance to find task reschedules for
:type task_instance: TaskInstance
"""
TR = TaskReschedule
return (
session
.query(TR)
.filter(TR.dag_id == task_instance.dag_id,
TR.task_id == task_instance.task_id,
TR.execution_date == task_instance.execution_date,
TR.try_number == task_instance.try_number)
.order_by(asc(TR.id))
.all()
)
3 changes: 2 additions & 1 deletion airflow/sensors/base_sensor_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

from airflow.exceptions import AirflowException, AirflowSensorTimeout, \
AirflowSkipException, AirflowRescheduleException
from airflow.models import BaseOperator, SkipMixin, TaskReschedule
from airflow.models import BaseOperator, SkipMixin
from airflow.models.taskreschedule import TaskReschedule
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
Expand Down
3 changes: 1 addition & 2 deletions airflow/ti_deps/deps/ready_to_reschedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.

from airflow.models.taskreschedule import TaskReschedule
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.utils import timezone
from airflow.utils.db import provide_session
Expand Down Expand Up @@ -49,8 +50,6 @@ def _get_dep_statuses(self, ti, session, dep_context):
reason="The task instance is not in State_UP_FOR_RESCHEDULE or NONE state.")
return

# Lazy import to avoid circular dependency
from airflow.models import TaskReschedule
task_reschedules = TaskReschedule.find_for_task_instance(task_instance=ti)
if not task_reschedules:
yield self._passing_status(
Expand Down
6 changes: 3 additions & 3 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@
from airflow.models import KubeResourceVersion, KubeWorkerIdentifier
from airflow.models import SkipMixin
from airflow.models import State as ST
from airflow.models import TaskReschedule as TR
from airflow.models import XCom
from airflow.models import Variable
from airflow.models import clear_task_instances
from airflow.models.connection import Connection
from airflow.models.taskreschedule import TaskReschedule
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
Expand Down Expand Up @@ -1946,7 +1946,7 @@ class TaskInstanceTest(unittest.TestCase):
def tearDown(self):
with create_session() as session:
session.query(models.TaskFail).delete()
session.query(models.TaskReschedule).delete()
session.query(TaskReschedule).delete()
session.query(models.TaskInstance).delete()

def test_set_task_dates(self):
Expand Down Expand Up @@ -2367,7 +2367,7 @@ def run_ti_and_assert(run_date, expected_start_date, expected_end_date, expected
self.assertEqual(ti.start_date, expected_start_date)
self.assertEqual(ti.end_date, expected_end_date)
self.assertEqual(ti.duration, expected_duration)
trs = TR.find_for_task_instance(ti)
trs = TaskReschedule.find_for_task_instance(ti)
self.assertEqual(len(trs), expected_task_reschedule_count)

date1 = timezone.utcnow()
Expand Down
3 changes: 2 additions & 1 deletion tests/sensors/test_base_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
from airflow import DAG, configuration, settings
from airflow.exceptions import (AirflowSensorTimeout, AirflowException,
AirflowRescheduleException)
from airflow.models import DagRun, TaskInstance, TaskReschedule
from airflow.models import DagRun, TaskInstance
from airflow.models.taskreschedule import TaskReschedule
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
Expand Down
13 changes: 7 additions & 6 deletions tests/ti_deps/deps/test_ready_to_reschedule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from datetime import timedelta
from mock import Mock, patch

from airflow.models import TaskInstance, DAG, TaskReschedule
from airflow.models import TaskInstance, DAG
from airflow.models.taskreschedule import TaskReschedule
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
from airflow.utils.state import State
Expand Down Expand Up @@ -52,20 +53,20 @@ def test_should_pass_if_not_in_none_state(self):
ti = self._get_task_instance(State.UP_FOR_RETRY)
self.assertTrue(ReadyToRescheduleDep().is_met(ti=ti))

@patch('airflow.models.TaskReschedule.find_for_task_instance', return_value=[])
@patch('airflow.models.taskreschedule.TaskReschedule.find_for_task_instance', return_value=[])
def test_should_pass_if_no_reschedule_record_exists(self, find_for_task_instance):
ti = self._get_task_instance(State.NONE)
self.assertTrue(ReadyToRescheduleDep().is_met(ti=ti))

@patch('airflow.models.TaskReschedule.find_for_task_instance')
@patch('airflow.models.taskreschedule.TaskReschedule.find_for_task_instance')
def test_should_pass_after_reschedule_date_one(self, find_for_task_instance):
find_for_task_instance.return_value = [
self._get_task_reschedule(utcnow() - timedelta(minutes=1)),
]
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
self.assertTrue(ReadyToRescheduleDep().is_met(ti=ti))

@patch('airflow.models.TaskReschedule.find_for_task_instance')
@patch('airflow.models.taskreschedule.TaskReschedule.find_for_task_instance')
def test_should_pass_after_reschedule_date_multiple(self, find_for_task_instance):
find_for_task_instance.return_value = [
self._get_task_reschedule(utcnow() - timedelta(minutes=21)),
Expand All @@ -75,15 +76,15 @@ def test_should_pass_after_reschedule_date_multiple(self, find_for_task_instance
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
self.assertTrue(ReadyToRescheduleDep().is_met(ti=ti))

@patch('airflow.models.TaskReschedule.find_for_task_instance')
@patch('airflow.models.taskreschedule.TaskReschedule.find_for_task_instance')
def test_should_fail_before_reschedule_date_one(self, find_for_task_instance):
find_for_task_instance.return_value = [
self._get_task_reschedule(utcnow() + timedelta(minutes=1)),
]
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
self.assertFalse(ReadyToRescheduleDep().is_met(ti=ti))

@patch('airflow.models.TaskReschedule.find_for_task_instance')
@patch('airflow.models.taskreschedule.TaskReschedule.find_for_task_instance')
def test_should_fail_before_reschedule_date_multiple(self, find_for_task_instance):
find_for_task_instance.return_value = [
self._get_task_reschedule(utcnow() - timedelta(minutes=19)),
Expand Down

0 comments on commit 76637ef

Please sign in to comment.