diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 31bf3546c2db4a..a08c48328026c6 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -60,20 +60,20 @@ class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin): - """Helps call SchedulerJob.process_file() in a separate process.""" + """Helps call SchedulerJob.process_file() in a separate process. + + :param file_path: a Python file containing Airflow DAG definitions + :type file_path: unicode + :param pickle_dags: whether to serialize the DAG objects to the DB + :type pickle_dags: bool + :param dag_id_white_list: If specified, only look at these DAG ID's + :type dag_id_white_list: list[unicode] + """ - # Counter that increments everytime an instance of this class is created + # Counter that increments every time an instance of this class is created class_creation_counter = 0 def __init__(self, file_path, pickle_dags, dag_id_white_list): - """ - :param file_path: a Python file containing Airflow DAG definitions - :type file_path: unicode - :param pickle_dags: whether to serialize the DAG objects to the DB - :type pickle_dags: bool - :param dag_id_whitelist: If specified, only look at these DAG ID's - :type dag_id_whitelist: list[unicode] - """ self._file_path = file_path # The process that was launched to process the given . @@ -298,6 +298,24 @@ class SchedulerJob(BaseJob): task and sees if the dependencies for the next schedules are met. If so, it creates appropriate TaskInstances and sends run commands to the executor. It does this for each task in each DAG and repeats. + :param dag_id: if specified, only schedule tasks with this DAG ID + :type dag_id: unicode + :param dag_ids: if specified, only schedule tasks with these DAG IDs + :type dag_ids: list[unicode] + :param subdir: directory containing Python files with Airflow DAG + definitions, or a specific path to a file + :type subdir: unicode + :param num_runs: The number of times to try to schedule each DAG file. + -1 for unlimited times. + :type num_runs: int + :param processor_poll_interval: The number of seconds to wait between + polls of running processors + :type processor_poll_interval: int + :param run_duration: how long to run (in seconds) before exiting + :type run_duration: int + :param do_pickle: once a DAG object is obtained by executing the Python + file, whether to serialize the DAG object to the DB + :type do_pickle: bool """ __mapper_args__ = { @@ -330,8 +348,6 @@ def __init__( :param processor_poll_interval: The number of seconds to wait between polls of running processors :type processor_poll_interval: int - :param run_duration: how long to run (in seconds) before exiting - :type run_duration: int :param do_pickle: once a DAG object is obtained by executing the Python file, whether to serialize the DAG object to the DB :type do_pickle: bool @@ -406,7 +422,7 @@ def manage_slas(self, dag, session=None): Finding all tasks that have SLAs defined, and sending alert emails where needed. New SLA misses are also recorded in the database. - Where assuming that the scheduler runs often, so we only check for + We are assuming that the scheduler runs often, so we only check for tasks that should have succeeded in the past hour. """ if not any([isinstance(ti.sla, timedelta) for ti in dag.tasks]): @@ -462,7 +478,7 @@ def manage_slas(self, dag, session=None): slas = ( session .query(SlaMiss) - .filter(SlaMiss.notification_sent == False, SlaMiss.dag_id == dag.dag_id) # noqa: E712 + .filter(SlaMiss.notification_sent == False, SlaMiss.dag_id == dag.dag_id) # noqa pylint: disable=singleton-comparison .all() ) @@ -601,7 +617,7 @@ def create_dag_run(self, dag, session=None): session.query(func.max(DagRun.execution_date)) .filter_by(dag_id=dag.dag_id) .filter(or_( - DagRun.external_trigger == False, # noqa: E712 + DagRun.external_trigger == False, # noqa: E712 pylint: disable=singleton-comparison # add % as a wildcard for the like query DagRun.run_id.like(DagRun.ID_PREFIX + '%') )) @@ -873,17 +889,17 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) ) - .filter(or_(DR.run_id == None, # noqa: E711 + .filter(or_(DR.run_id == None, # noqa: E711 pylint: disable=singleton-comparison not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%')))) .outerjoin(DM, DM.dag_id == TI.dag_id) - .filter(or_(DM.dag_id == None, # noqa: E711 + .filter(or_(DM.dag_id == None, # noqa: E711 pylint: disable=singleton-comparison not_(DM.is_paused))) ) # Additional filters on task instance state if None in states: ti_query = ti_query.filter( - or_(TI.state == None, TI.state.in_(states)) # noqa: E711 + or_(TI.state == None, TI.state.in_(states)) # noqa: E711 pylint: disable=singleton-comparison ) else: ti_query = ti_query.filter(TI.state.in_(states)) @@ -1002,7 +1018,7 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): task_instance_str = "\n\t".join( [repr(x) for x in executable_tis]) self.log.info( - "Setting the follow tasks to queued state:\n\t%s", task_instance_str) + "Setting the following tasks to queued state:\n\t%s", task_instance_str) # so these dont expire on commit for ti in executable_tis: copy_dag_id = ti.dag_id @@ -1045,7 +1061,7 @@ def _change_state_for_executable_task_instances(self, task_instances, if None in acceptable_states: ti_query = ti_query.filter( - or_(TI.state == None, TI.state.in_(acceptable_states)) # noqa: E711 + or_(TI.state == None, TI.state.in_(acceptable_states)) # noqa pylint: disable=singleton-comparison ) else: ti_query = ti_query.filter(TI.state.in_(acceptable_states))