From 4f6d53eaa70eec6bcb3ceeb7a3ff30a821d78dd6 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 25 Aug 2020 00:16:05 +0200 Subject: [PATCH] Make models/taskinstance.py pylint compatible (#10499) --- airflow/models/skipmixin.py | 3 +- airflow/models/taskinstance.py | 75 +++++++++++-------------- airflow/sensors/base_sensor_operator.py | 11 ++++ scripts/ci/pylint_todo.txt | 1 - 4 files changed, 47 insertions(+), 43 deletions(-) diff --git a/airflow/models/skipmixin.py b/airflow/models/skipmixin.py index b3c5530a109327..0ad2eb882cd904 100644 --- a/airflow/models/skipmixin.py +++ b/airflow/models/skipmixin.py @@ -36,6 +36,7 @@ class SkipMixin(LoggingMixin): """ A Mixin to skip Tasks Instances """ + def _set_state_to_skipped(self, dag_run, execution_date, tasks, session): """ Used internally to set state of task instances to skipped from the same dag run. @@ -93,7 +94,7 @@ def skip( # SkipMixin may not necessarily have a task_id attribute. Only store to XCom if one is available. try: - task_id = self.task_id + task_id = self.task_id # noqa except AttributeError: task_id = None diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index d07f91875de624..e0c465e9ed19de 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -135,8 +135,8 @@ def clear_task_instances(tis, ).delete() if job_ids: - from airflow.jobs.base_job import BaseJob as BJ - for job in session.query(BJ).filter(BJ.id.in_(job_ids)).all(): + from airflow.jobs.base_job import BaseJob + for job in session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all(): # noqa job.state = State.SHUTDOWN if activate_dag_runs and tis: @@ -442,16 +442,15 @@ def generate_command(dag_id: str, # pylint: disable=too-many-arguments def log_filepath(self): """Filepath for TaskInstance""" iso = self.execution_date.isoformat() - log = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER')) - return ("{log}/{dag_id}/{task_id}/{iso}.log".format( - log=log, dag_id=self.dag_id, task_id=self.task_id, iso=iso)) + the_log = os.path.expanduser(conf.get('logging', 'BASE_LOG_FOLDER')) + return f"{the_log}/{self.dag_id}/{self.task_id}/{iso}.log" @property def log_url(self): """Log URL for TaskInstance""" iso = quote(self.execution_date.isoformat()) base_url = conf.get('webserver', 'BASE_URL') - return base_url + ( + return base_url + ( # noqa "/log?" "execution_date={iso}" "&task_id={task_id}" @@ -463,7 +462,7 @@ def mark_success_url(self): """URL to mark TI success""" iso = quote(self.execution_date.isoformat()) base_url = conf.get('webserver', 'BASE_URL') - return base_url + ( + return base_url + ( # noqa "/success" "?task_id={task_id}" "&dag_id={dag_id}" @@ -537,7 +536,7 @@ def refresh_from_db(self, session=None, lock_for_update=False) -> None: self.state = ti.state # Get the raw value of try_number column, don't read through the # accessor here otherwise it will be incremented by one already. - self.try_number = ti._try_number # pylint: disable=protected-access + self.try_number = ti._try_number # noqa pylint: disable=protected-access self.max_tries = ti.max_tries self.hostname = ti.hostname self.unixname = ti.unixname @@ -834,7 +833,7 @@ def get_failed_dep_statuses( yield dep_status def __repr__(self): - return ( + return ( # noqa "" ).format(ti=self) @@ -848,10 +847,10 @@ def next_retry_datetime(self): if self.task.retry_exponential_backoff: # If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus, # we must round up prior to converting to an int, otherwise a divide by zero error - # will occurr in the modded_hash calculation. + # will occur in the modded_hash calculation. min_backoff = int(math.ceil(delay.total_seconds() * (2 ** (self.try_number - 2)))) # deterministic per task instance - ti_hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, + ti_hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, # noqa self.task_id, self.execution_date, self.try_number) @@ -1074,7 +1073,7 @@ def _run_raw_task( try: if not mark_success: context = self.get_template_context() - self._prepare_and_execute_task_with_callbacks(context, session, task) + self._prepare_and_execute_task_with_callbacks(context, task) self.refresh_from_db(lock_for_update=True) self.state = State.SUCCESS except AirflowSkipException as e: @@ -1095,7 +1094,7 @@ def _run_raw_task( ) except AirflowRescheduleException as reschedule_exception: self.refresh_from_db() - self._handle_reschedule(actual_start_date, reschedule_exception, test_mode, context) + self._handle_reschedule(actual_start_date, reschedule_exception, test_mode) return except AirflowFailException as e: self.refresh_from_db() @@ -1135,23 +1134,16 @@ def _run_raw_task( session.merge(self) session.commit() - def _prepare_and_execute_task_with_callbacks(self, context, session, task): + def _prepare_and_execute_task_with_callbacks( + self, + context, + task): """ Prepare Task for Execution """ - from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF - from airflow.sensors.base_sensor_operator import BaseSensorOperator + from airflow.models.renderedtifields import RenderedTaskInstanceFields task_copy = task.prepare_for_execution() - # Sensors in `poke` mode can block execution of DAGs when running - # with single process executor, thus we change the mode to`reschedule` - # to allow parallel task being scheduled and executed - if ( - isinstance(task_copy, BaseSensorOperator) and - conf.get('core', 'executor') == "DebugExecutor" - ): - self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.") - task_copy.mode = 'reschedule' self.task = task_copy def signal_handler(signum, frame): # pylint: disable=unused-argument @@ -1167,8 +1159,8 @@ def signal_handler(signum, frame): # pylint: disable=unused-argument self.render_templates(context=context) if STORE_SERIALIZED_DAGS: - RTIF.write(RTIF(ti=self, render_templates=False)) - RTIF.delete_old_records(self.task_id, self.dag_id) + RenderedTaskInstanceFields.write(RenderedTaskInstanceFields(ti=self, render_templates=False)) + RenderedTaskInstanceFields.delete_old_records(self.task_id, self.dag_id) # Export context to make it available for operators to use. airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True) @@ -1283,7 +1275,6 @@ def _handle_reschedule(self, actual_start_date, reschedule_exception, test_mode=False, - context=None, # pylint: disable=unused-argument session=None): # Don't record reschedule request in test mode if test_mode: @@ -1482,7 +1473,8 @@ def __repr__(self): @staticmethod def get( item: str, - default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # pylint: disable=W0212 + # pylint: disable=protected-access + default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # noqa ): """Get Airflow Variable value""" return Variable.get(item, default_var=default_var) @@ -1509,7 +1501,8 @@ def __repr__(self): @staticmethod def get( item: str, - default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # pylint: disable=W0212 + # pylint: disable=protected-access + default_var: Any = Variable._Variable__NO_DEFAULT_SENTINEL, # noqa ): """Get Airflow Variable after deserializing JSON value""" return Variable.get(item, default_var=default_var, deserialize_json=True) @@ -1561,9 +1554,9 @@ def get_rendered_template_fields(self): """ from airflow.models.renderedtifields import RenderedTaskInstanceFields if STORE_SERIALIZED_DAGS: - rtif = RenderedTaskInstanceFields.get_templated_fields(self) - if rtif: - for field_name, rendered_value in rtif.items(): + rendered_task_instance_fields = RenderedTaskInstanceFields.get_templated_fields(self) + if rendered_task_instance_fields: + for field_name, rendered_value in rendered_task_instance_fields.items(): setattr(self.task, field_name, rendered_value) else: try: @@ -1628,6 +1621,7 @@ def render(key, content): subject = render('subject_template', default_subject) html_content = render('html_content_template', default_html_content) + # noinspection PyBroadException try: send_email(self.task.email, subject, html_content) except Exception: # pylint: disable=broad-except @@ -1662,7 +1656,7 @@ def xcom_push( :type key: str :param value: A value for the XCom. The value is pickled and stored in the database. - :type value: any pickleable object + :type value: any picklable object :param execution_date: if provided, the XCom will not be visible until this date. This can be used, for example, to send a message to a task on a future date without it being immediately visible. @@ -1762,19 +1756,18 @@ def filter_for_tis( tis: Iterable[Union["TaskInstance", TaskInstanceKey]] ) -> Optional[BooleanClauseList]: """Returns SQLAlchemy filter to query selected task instances""" - TI = TaskInstance if not tis: return None if all(isinstance(t, TaskInstanceKey) for t in tis): - filter_for_tis = ([and_(TI.dag_id == tik.dag_id, - TI.task_id == tik.task_id, - TI.execution_date == tik.execution_date) + filter_for_tis = ([and_(TaskInstance.dag_id == tik.dag_id, + TaskInstance.task_id == tik.task_id, + TaskInstance.execution_date == tik.execution_date) for tik in tis]) return or_(*filter_for_tis) if all(isinstance(t, TaskInstance) for t in tis): - filter_for_tis = ([and_(TI.dag_id == ti.dag_id, - TI.task_id == ti.task_id, - TI.execution_date == ti.execution_date) + filter_for_tis = ([and_(TaskInstance.dag_id == ti.dag_id, + TaskInstance.task_id == ti.task_id, + TaskInstance.execution_date == ti.execution_date) for ti in tis]) return or_(*filter_for_tis) diff --git a/airflow/sensors/base_sensor_operator.py b/airflow/sensors/base_sensor_operator.py index a1e9c98a8c35ca..c4bfd4335eb4ec 100644 --- a/airflow/sensors/base_sensor_operator.py +++ b/airflow/sensors/base_sensor_operator.py @@ -22,6 +22,7 @@ from time import sleep from typing import Any, Dict, Iterable +from airflow.configuration import conf from airflow.exceptions import ( AirflowException, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, ) @@ -159,6 +160,16 @@ def _get_next_poke_interval(self, started_at, try_number): else: return self.poke_interval + def prepare_for_execution(self) -> BaseOperator: + task = super().prepare_for_execution() + # Sensors in `poke` mode can block execution of DAGs when running + # with single process executor, thus we change the mode to`reschedule` + # to allow parallel task being scheduled and executed + if conf.get('core', 'executor') == "DebugExecutor": + self.log.warning("DebugExecutor changes sensor mode to 'reschedule'.") + task.mode = 'reschedule' + return task + @property def reschedule(self): """Define mode rescheduled sensors.""" diff --git a/scripts/ci/pylint_todo.txt b/scripts/ci/pylint_todo.txt index 4824756d13cc82..17c6103c6a8988 100644 --- a/scripts/ci/pylint_todo.txt +++ b/scripts/ci/pylint_todo.txt @@ -1,5 +1,4 @@ ./airflow/configuration.py ./airflow/models/dag.py ./airflow/models/dagrun.py -./airflow/models/taskinstance.py ./airflow/www/utils.py