Skip to content

Commit

Permalink
Make models/taskinstance.py pylint compatible (#10499)
Browse files Browse the repository at this point in the history
  • Loading branch information
potiuk authored Aug 24, 2020
1 parent 3696c34 commit 4f6d53e
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 43 deletions.
3 changes: 2 additions & 1 deletion airflow/models/skipmixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

class SkipMixin(LoggingMixin):
""" A Mixin to skip Tasks Instances """

def _set_state_to_skipped(self, dag_run, execution_date, tasks, session):
"""
Used internally to set state of task instances to skipped from the same dag run.
Expand Down Expand Up @@ -93,7 +94,7 @@ def skip(

# SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available.
try:
task_id = self.task_id
task_id = self.task_id # noqa
except AttributeError:
task_id = None

Expand Down
75 changes: 34 additions & 41 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ def clear_task_instances(tis,
).delete()

if job_ids:
from airflow.jobs.base_job import BaseJob as BJ
for job in session.query(BJ).filter(BJ.id.in_(job_ids)).all():
from airflow.jobs.base_job import BaseJob
for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all(): # noqa
job.state = State.SHUTDOWN

if activate_dag_runs and tis:
Expand Down Expand Up @@ -442,16 +442,15 @@ def generate_command(dag_id: str, # pylint: disable=too-many-arguments
def log_filepath(self):
"""Filepath for TaskInstance"""
iso = self.execution_date.isoformat()
log = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))
return ("{log}/{dag_id}/{task_id}/{iso}.log".format(
log=log, dag_id=self.dag_id, task_id=self.task_id, iso=iso))
the_log = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))
return f"{the_log}/{self.dag_id}/{self.task_id}/{iso}.log"

@property
def log_url(self):
"""Log URL for TaskInstance"""
iso = quote(self.execution_date.isoformat())
base_url = conf.get('webserver', 'BASE_URL')
return base_url + (
return base_url + ( # noqa
"/log?"
"execution_date={iso}"
"&task_id={task_id}"
Expand All @@ -463,7 +462,7 @@ def mark_success_url(self):
"""URL to mark TI success"""
iso = quote(self.execution_date.isoformat())
base_url = conf.get('webserver', 'BASE_URL')
return base_url + (
return base_url + ( # noqa
"/success"
"?task_id={task_id}"
"&dag_id={dag_id}"
Expand Down Expand Up @@ -537,7 +536,7 @@ def refresh_from_db(self, session=None, lock_for_update=False) -> None:
self.state = ti.state
# Get the raw value of try_number column, don't read through the
# accessor here otherwise it will be incremented by one already.
self.try_number = ti._try_number # pylint: disable=protected-access
self.try_number = ti._try_number # noqa pylint: disable=protected-access
self.max_tries = ti.max_tries
self.hostname = ti.hostname
self.unixname = ti.unixname
Expand Down Expand Up @@ -834,7 +833,7 @@ def get_failed_dep_statuses(
yield dep_status

def __repr__(self):
return (
return ( # noqa
"<TaskInstance: {ti.dag_id}.{ti.task_id} "
"{ti.execution_date} [{ti.state}]>"
).format(ti=self)
Expand All @@ -848,10 +847,10 @@ def next_retry_datetime(self):
if self.task.retry_exponential_backoff:
# If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus,
# we must round up prior to converting to an int, otherwise a divide by zero error
# will occurr in the modded_hash calculation.
# will occur in the modded_hash calculation.
min_backoff = int(math.ceil(delay.total_seconds() * (2 ** (self.try_number - 2))))
# deterministic per task instance
ti_hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id,
ti_hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, # noqa
self.task_id,
self.execution_date,
self.try_number)
Expand Down Expand Up @@ -1074,7 +1073,7 @@ def _run_raw_task(
try:
if not mark_success:
context = self.get_template_context()
self._prepare_and_execute_task_with_callbacks(context, session, task)
self._prepare_and_execute_task_with_callbacks(context, task)
self.refresh_from_db(lock_for_update=True)
self.state = State.SUCCESS
except AirflowSkipException as e:
Expand All @@ -1095,7 +1094,7 @@ def _run_raw_task(
)
except AirflowRescheduleException as reschedule_exception:
self.refresh_from_db()
self._handle_reschedule(actual_start_date, reschedule_exception, test_mode, context)
self._handle_reschedule(actual_start_date, reschedule_exception, test_mode)
return
except AirflowFailException as e:
self.refresh_from_db()
Expand Down Expand Up @@ -1135,23 +1134,16 @@ def _run_raw_task(
session.merge(self)
session.commit()

def _prepare_and_execute_task_with_callbacks(self, context, session, task):
def _prepare_and_execute_task_with_callbacks(
self,
context,
task):
"""
Prepare Task for Execution
"""
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.models.renderedtifields import RenderedTaskInstanceFields

task_copy = task.prepare_for_execution()
# Sensors in `poke` mode can block execution of DAGs when running
# with single process executor, thus we change the mode to`reschedule`
# to allow parallel task being scheduled and executed
if (
isinstance(task_copy, BaseSensorOperator) and
conf.get('core', 'executor') == "DebugExecutor"
):
self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.")
task_copy.mode = 'reschedule'
self.task = task_copy

def signal_handler(signum, frame): # pylint: disable=unused-argument
Expand All @@ -1167,8 +1159,8 @@ def signal_handler(signum, frame): # pylint: disable=unused-argument

self.render_templates(context=context)
if STORE_SERIALIZED_DAGS:
RTIF.write(RTIF(ti=self, render_templates=False))
RTIF.delete_old_records(self.task_id, self.dag_id)
RenderedTaskInstanceFields.write(RenderedTaskInstanceFields(ti=self, render_templates=False))
RenderedTaskInstanceFields.delete_old_records(self.task_id, self.dag_id)

# Export context to make it available for operators to use.
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
Expand Down Expand Up @@ -1283,7 +1275,6 @@ def _handle_reschedule(self,
actual_start_date,
reschedule_exception,
test_mode=False,
context=None, # pylint: disable=unused-argument
session=None):
# Don't record reschedule request in test mode
if test_mode:
Expand Down Expand Up @@ -1482,7 +1473,8 @@ def __repr__(self):
@staticmethod
def get(
item: str,
default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # pylint: disable=W0212
# pylint: disable=protected-access
default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # noqa
):
"""Get Airflow Variable value"""
return Variable.get(item, default_var=default_var)
Expand All @@ -1509,7 +1501,8 @@ def __repr__(self):
@staticmethod
def get(
item: str,
default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # pylint: disable=W0212
# pylint: disable=protected-access
default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # noqa
):
"""Get Airflow Variable after deserializing JSON value"""
return Variable.get(item, default_var=default_var, deserialize_json=True)
Expand Down Expand Up @@ -1561,9 +1554,9 @@ def get_rendered_template_fields(self):
"""
from airflow.models.renderedtifields import RenderedTaskInstanceFields
if STORE_SERIALIZED_DAGS:
rtif = RenderedTaskInstanceFields.get_templated_fields(self)
if rtif:
for field_name, rendered_value in rtif.items():
rendered_task_instance_fields = RenderedTaskInstanceFields.get_templated_fields(self)
if rendered_task_instance_fields:
for field_name, rendered_value in rendered_task_instance_fields.items():
setattr(self.task, field_name, rendered_value)
else:
try:
Expand Down Expand Up @@ -1628,6 +1621,7 @@ def render(key, content):

subject = render('subject_template', default_subject)
html_content = render('html_content_template', default_html_content)
# noinspection PyBroadException
try:
send_email(self.task.email, subject, html_content)
except Exception: # pylint: disable=broad-except
Expand Down Expand Up @@ -1662,7 +1656,7 @@ def xcom_push(
:type key: str
:param value: A value for the XCom. The value is pickled and stored
in the database.
:type value: any pickleable object
:type value: any picklable object
:param execution_date: if provided, the XCom will not be visible until
this date. This can be used, for example, to send a message to a
task on a future date without it being immediately visible.
Expand Down Expand Up @@ -1762,19 +1756,18 @@ def filter_for_tis(
tis: Iterable[Union["TaskInstance", TaskInstanceKey]]
) -> Optional[BooleanClauseList]:
"""Returns SQLAlchemy filter to query selected task instances"""
TI = TaskInstance
if not tis:
return None
if all(isinstance(t, TaskInstanceKey) for t in tis):
filter_for_tis = ([and_(TI.dag_id == tik.dag_id,
TI.task_id == tik.task_id,
TI.execution_date == tik.execution_date)
filter_for_tis = ([and_(TaskInstance.dag_id == tik.dag_id,
TaskInstance.task_id == tik.task_id,
TaskInstance.execution_date == tik.execution_date)
for tik in tis])
return or_(*filter_for_tis)
if all(isinstance(t, TaskInstance) for t in tis):
filter_for_tis = ([and_(TI.dag_id == ti.dag_id,
TI.task_id == ti.task_id,
TI.execution_date == ti.execution_date)
filter_for_tis = ([and_(TaskInstance.dag_id == ti.dag_id,
TaskInstance.task_id == ti.task_id,
TaskInstance.execution_date == ti.execution_date)
for ti in tis])
return or_(*filter_for_tis)

Expand Down
11 changes: 11 additions & 0 deletions airflow/sensors/base_sensor_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from time import sleep
from typing import Any, Dict, Iterable

from airflow.configuration import conf
from airflow.exceptions import (
AirflowException, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException,
)
Expand Down Expand Up @@ -159,6 +160,16 @@ def _get_next_poke_interval(self, started_at, try_number):
else:
return self.poke_interval

def prepare_for_execution(self) -> BaseOperator:
task = super().prepare_for_execution()
# Sensors in `poke` mode can block execution of DAGs when running
# with single process executor, thus we change the mode to`reschedule`
# to allow parallel task being scheduled and executed
if conf.get('core', 'executor') == "DebugExecutor":
self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.")
task.mode = 'reschedule'
return task

@property
def reschedule(self):
"""Define mode rescheduled sensors."""
Expand Down
1 change: 0 additions & 1 deletion scripts/ci/pylint_todo.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
./airflow/configuration.py
./airflow/models/dag.py
./airflow/models/dagrun.py
./airflow/models/taskinstance.py
./airflow/www/utils.py

0 comments on commit 4f6d53e

Please sign in to comment.