Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make models/taskinstance.py pylint compatible #10499

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airflow/models/skipmixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

class SkipMixin(LoggingMixin):
""" A Mixin to skip Tasks Instances """

def _set_state_to_skipped(self, dag_run, execution_date, tasks, session):
"""
Used internally to set state of task instances to skipped from the same dag run.
Expand Down Expand Up @@ -93,7 +94,7 @@ def skip(

# SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available.
try:
task_id = self.task_id
task_id = self.task_id # noqa
except AttributeError:
task_id = None

Expand Down
75 changes: 34 additions & 41 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ def clear_task_instances(tis,
).delete()

if job_ids:
from airflow.jobs.base_job import BaseJob as BJ
for job in session.query(BJ).filter(BJ.id.in_(job_ids)).all():
from airflow.jobs.base_job import BaseJob
for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all(): # noqa
job.state = State.SHUTDOWN

if activate_dag_runs and tis:
Expand Down Expand Up @@ -442,16 +442,15 @@ def generate_command(dag_id: str, # pylint: disable=too-many-arguments
def log_filepath(self):
"""Filepath for TaskInstance"""
iso = self.execution_date.isoformat()
log = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))
return ("{log}/{dag_id}/{task_id}/{iso}.log".format(
log=log, dag_id=self.dag_id, task_id=self.task_id, iso=iso))
the_log = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER'))
return f"{the_log}/{self.dag_id}/{self.task_id}/{iso}.log"

@property
def log_url(self):
"""Log URL for TaskInstance"""
iso = quote(self.execution_date.isoformat())
base_url = conf.get('webserver', 'BASE_URL')
return base_url + (
return base_url + ( # noqa
"/log?"
"execution_date={iso}"
"&task_id={task_id}"
Expand All @@ -463,7 +462,7 @@ def mark_success_url(self):
"""URL to mark TI success"""
iso = quote(self.execution_date.isoformat())
base_url = conf.get('webserver', 'BASE_URL')
return base_url + (
return base_url + ( # noqa
"/success"
"?task_id={task_id}"
"&dag_id={dag_id}"
Expand Down Expand Up @@ -537,7 +536,7 @@ def refresh_from_db(self, session=None, lock_for_update=False) -> None:
self.state = ti.state
# Get the raw value of try_number column, don't read through the
# accessor here otherwise it will be incremented by one already.
self.try_number = ti._try_number # pylint: disable=protected-access
self.try_number = ti._try_number # noqa pylint: disable=protected-access
self.max_tries = ti.max_tries
self.hostname = ti.hostname
self.unixname = ti.unixname
Expand Down Expand Up @@ -834,7 +833,7 @@ def get_failed_dep_statuses(
yield dep_status

def __repr__(self):
return (
return ( # noqa
"<TaskInstance: {ti.dag_id}.{ti.task_id} "
"{ti.execution_date} [{ti.state}]>"
).format(ti=self)
Expand All @@ -848,10 +847,10 @@ def next_retry_datetime(self):
if self.task.retry_exponential_backoff:
# If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus,
# we must round up prior to converting to an int, otherwise a divide by zero error
# will occurr in the modded_hash calculation.
# will occur in the modded_hash calculation.
min_backoff = int(math.ceil(delay.total_seconds() * (2 ** (self.try_number - 2))))
# deterministic per task instance
ti_hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id,
ti_hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, # noqa
self.task_id,
self.execution_date,
self.try_number)
Expand Down Expand Up @@ -1074,7 +1073,7 @@ def _run_raw_task(
try:
if not mark_success:
context = self.get_template_context()
self._prepare_and_execute_task_with_callbacks(context, session, task)
self._prepare_and_execute_task_with_callbacks(context, task)
self.refresh_from_db(lock_for_update=True)
self.state = State.SUCCESS
except AirflowSkipException as e:
Expand All @@ -1095,7 +1094,7 @@ def _run_raw_task(
)
except AirflowRescheduleException as reschedule_exception:
self.refresh_from_db()
self._handle_reschedule(actual_start_date, reschedule_exception, test_mode, context)
self._handle_reschedule(actual_start_date, reschedule_exception, test_mode)
return
except AirflowFailException as e:
self.refresh_from_db()
Expand Down Expand Up @@ -1135,23 +1134,16 @@ def _run_raw_task(
session.merge(self)
session.commit()

def _prepare_and_execute_task_with_callbacks(self, context, session, task):
def _prepare_and_execute_task_with_callbacks(
self,
context,
task):
"""
Prepare Task for Execution
"""
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.models.renderedtifields import RenderedTaskInstanceFields

task_copy = task.prepare_for_execution()
# Sensors in `poke` mode can block execution of DAGs when running
# with single process executor, thus we change the mode to`reschedule`
# to allow parallel task being scheduled and executed
if (
isinstance(task_copy, BaseSensorOperator) and
potiuk marked this conversation as resolved.
Show resolved Hide resolved
conf.get('core', 'executor') == "DebugExecutor"
):
self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.")
task_copy.mode = 'reschedule'
self.task = task_copy

def signal_handler(signum, frame): # pylint: disable=unused-argument
Expand All @@ -1167,8 +1159,8 @@ def signal_handler(signum, frame): # pylint: disable=unused-argument

self.render_templates(context=context)
if STORE_SERIALIZED_DAGS:
RTIF.write(RTIF(ti=self, render_templates=False))
RTIF.delete_old_records(self.task_id, self.dag_id)
RenderedTaskInstanceFields.write(RenderedTaskInstanceFields(ti=self, render_templates=False))
RenderedTaskInstanceFields.delete_old_records(self.task_id, self.dag_id)

# Export context to make it available for operators to use.
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
Expand Down Expand Up @@ -1283,7 +1275,6 @@ def _handle_reschedule(self,
actual_start_date,
reschedule_exception,
test_mode=False,
context=None, # pylint: disable=unused-argument
session=None):
# Don't record reschedule request in test mode
if test_mode:
Expand Down Expand Up @@ -1482,7 +1473,8 @@ def __repr__(self):
@staticmethod
def get(
item: str,
default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # pylint: disable=W0212
# pylint: disable=protected-access
default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # noqa
):
"""Get Airflow Variable value"""
return Variable.get(item, default_var=default_var)
Expand All @@ -1509,7 +1501,8 @@ def __repr__(self):
@staticmethod
def get(
item: str,
default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # pylint: disable=W0212
# pylint: disable=protected-access
default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # noqa
):
"""Get Airflow Variable after deserializing JSON value"""
return Variable.get(item, default_var=default_var, deserialize_json=True)
Expand Down Expand Up @@ -1561,9 +1554,9 @@ def get_rendered_template_fields(self):
"""
from airflow.models.renderedtifields import RenderedTaskInstanceFields
if STORE_SERIALIZED_DAGS:
rtif = RenderedTaskInstanceFields.get_templated_fields(self)
if rtif:
for field_name, rendered_value in rtif.items():
rendered_task_instance_fields = RenderedTaskInstanceFields.get_templated_fields(self)
if rendered_task_instance_fields:
for field_name, rendered_value in rendered_task_instance_fields.items():
setattr(self.task, field_name, rendered_value)
else:
try:
Expand Down Expand Up @@ -1628,6 +1621,7 @@ def render(key, content):

subject = render('subject_template', default_subject)
html_content = render('html_content_template', default_html_content)
# noinspection PyBroadException
try:
send_email(self.task.email, subject, html_content)
except Exception: # pylint: disable=broad-except
Expand Down Expand Up @@ -1662,7 +1656,7 @@ def xcom_push(
:type key: str
:param value: A value for the XCom. The value is pickled and stored
in the database.
:type value: any pickleable object
:type value: any picklable object
:param execution_date: if provided, the XCom will not be visible until
this date. This can be used, for example, to send a message to a
task on a future date without it being immediately visible.
Expand Down Expand Up @@ -1762,19 +1756,18 @@ def filter_for_tis(
tis: Iterable[Union["TaskInstance", TaskInstanceKey]]
) -> Optional[BooleanClauseList]:
"""Returns SQLAlchemy filter to query selected task instances"""
TI = TaskInstance
if not tis:
return None
if all(isinstance(t, TaskInstanceKey) for t in tis):
filter_for_tis = ([and_(TI.dag_id == tik.dag_id,
TI.task_id == tik.task_id,
TI.execution_date == tik.execution_date)
filter_for_tis = ([and_(TaskInstance.dag_id == tik.dag_id,
TaskInstance.task_id == tik.task_id,
TaskInstance.execution_date == tik.execution_date)
for tik in tis])
return or_(*filter_for_tis)
if all(isinstance(t, TaskInstance) for t in tis):
filter_for_tis = ([and_(TI.dag_id == ti.dag_id,
TI.task_id == ti.task_id,
TI.execution_date == ti.execution_date)
filter_for_tis = ([and_(TaskInstance.dag_id == ti.dag_id,
TaskInstance.task_id == ti.task_id,
TaskInstance.execution_date == ti.execution_date)
potiuk marked this conversation as resolved.
Show resolved Hide resolved
for ti in tis])
return or_(*filter_for_tis)

Expand Down
11 changes: 11 additions & 0 deletions airflow/sensors/base_sensor_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from time import sleep
from typing import Any, Dict, Iterable

from airflow.configuration import conf
from airflow.exceptions import (
AirflowException, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException,
)
Expand Down Expand Up @@ -159,6 +160,16 @@ def _get_next_poke_interval(self, started_at, try_number):
else:
return self.poke_interval

def prepare_for_execution(self) -> BaseOperator:
potiuk marked this conversation as resolved.
Show resolved Hide resolved
task = super().prepare_for_execution()
# Sensors in `poke` mode can block execution of DAGs when running
# with single process executor, thus we change the mode to`reschedule`
# to allow parallel task being scheduled and executed
if conf.get('core', 'executor') == "DebugExecutor":
potiuk marked this conversation as resolved.
Show resolved Hide resolved
self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.")
task.mode = 'reschedule'
return task

@property
def reschedule(self):
"""Define mode rescheduled sensors."""
Expand Down
1 change: 0 additions & 1 deletion scripts/ci/pylint_todo.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
./airflow/configuration.py
./airflow/models/dag.py
./airflow/models/dagrun.py
./airflow/models/taskinstance.py
./airflow/www/utils.py
./airflow/www/views.py