Skip to content

Commit

Permalink
[AIRFLOW-XXX] Fix incorrect docstring parameter (#5729)
Browse files Browse the repository at this point in the history
(cherry picked from commit 38d977d)
  • Loading branch information
kaxil authored and potiuk committed Aug 6, 2019
1 parent 663e9f8 commit 397d0a6
Showing 1 changed file with 36 additions and 20 deletions.
56 changes: 36 additions & 20 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@


class DagFileProcessor(AbstractDagFileProcessor, LoggingMixin):
"""Helps call SchedulerJob.process_file() in a separate process."""
"""Helps call SchedulerJob.process_file() in a separate process.
:param file_path: a Python file containing Airflow DAG definitions
:type file_path: unicode
:param pickle_dags: whether to serialize the DAG objects to the DB
:type pickle_dags: bool
:param dag_id_white_list: If specified, only look at these DAG ID's
:type dag_id_white_list: list[unicode]
"""

# Counter that increments everytime an instance of this class is created
# Counter that increments every time an instance of this class is created
class_creation_counter = 0

def __init__(self, file_path, pickle_dags, dag_id_white_list):
"""
:param file_path: a Python file containing Airflow DAG definitions
:type file_path: unicode
:param pickle_dags: whether to serialize the DAG objects to the DB
:type pickle_dags: bool
:param dag_id_whitelist: If specified, only look at these DAG ID's
:type dag_id_whitelist: list[unicode]
"""
self._file_path = file_path

# The process that was launched to process the given .
Expand Down Expand Up @@ -298,6 +298,24 @@ class SchedulerJob(BaseJob):
task and sees if the dependencies for the next schedules are met.
If so, it creates appropriate TaskInstances and sends run commands to the
executor. It does this for each task in each DAG and repeats.
:param dag_id: if specified, only schedule tasks with this DAG ID
:type dag_id: unicode
:param dag_ids: if specified, only schedule tasks with these DAG IDs
:type dag_ids: list[unicode]
:param subdir: directory containing Python files with Airflow DAG
definitions, or a specific path to a file
:type subdir: unicode
:param num_runs: The number of times to try to schedule each DAG file.
-1 for unlimited times.
:type num_runs: int
:param processor_poll_interval: The number of seconds to wait between
polls of running processors
:type processor_poll_interval: int
:param run_duration: how long to run (in seconds) before exiting
:type run_duration: int
:param do_pickle: once a DAG object is obtained by executing the Python
file, whether to serialize the DAG object to the DB
:type do_pickle: bool
"""

__mapper_args__ = {
Expand Down Expand Up @@ -330,8 +348,6 @@ def __init__(
:param processor_poll_interval: The number of seconds to wait between
polls of running processors
:type processor_poll_interval: int
:param run_duration: how long to run (in seconds) before exiting
:type run_duration: int
:param do_pickle: once a DAG object is obtained by executing the Python
file, whether to serialize the DAG object to the DB
:type do_pickle: bool
Expand Down Expand Up @@ -406,7 +422,7 @@ def manage_slas(self, dag, session=None):
Finding all tasks that have SLAs defined, and sending alert emails
where needed. New SLA misses are also recorded in the database.
Where assuming that the scheduler runs often, so we only check for
We are assuming that the scheduler runs often, so we only check for
tasks that should have succeeded in the past hour.
"""
if not any([isinstance(ti.sla, timedelta) for ti in dag.tasks]):
Expand Down Expand Up @@ -462,7 +478,7 @@ def manage_slas(self, dag, session=None):
slas = (
session
.query(SlaMiss)
.filter(SlaMiss.notification_sent == False, SlaMiss.dag_id == dag.dag_id) # noqa: E712
.filter(SlaMiss.notification_sent == False, SlaMiss.dag_id == dag.dag_id) # noqa pylint: disable=singleton-comparison
.all()
)

Expand Down Expand Up @@ -601,7 +617,7 @@ def create_dag_run(self, dag, session=None):
session.query(func.max(DagRun.execution_date))
.filter_by(dag_id=dag.dag_id)
.filter(or_(
DagRun.external_trigger == False, # noqa: E712
DagRun.external_trigger == False, # noqa: E712 pylint: disable=singleton-comparison
# add % as a wildcard for the like query
DagRun.run_id.like(DagRun.ID_PREFIX + '%')
))
Expand Down Expand Up @@ -873,17 +889,17 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
DR,
and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date)
)
.filter(or_(DR.run_id == None, # noqa: E711
.filter(or_(DR.run_id == None, # noqa: E711 pylint: disable=singleton-comparison
not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'))))
.outerjoin(DM, DM.dag_id == TI.dag_id)
.filter(or_(DM.dag_id == None, # noqa: E711
.filter(or_(DM.dag_id == None, # noqa: E711 pylint: disable=singleton-comparison
not_(DM.is_paused)))
)

# Additional filters on task instance state
if None in states:
ti_query = ti_query.filter(
or_(TI.state == None, TI.state.in_(states)) # noqa: E711
or_(TI.state == None, TI.state.in_(states)) # noqa: E711 pylint: disable=singleton-comparison
)
else:
ti_query = ti_query.filter(TI.state.in_(states))
Expand Down Expand Up @@ -1002,7 +1018,7 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):
task_instance_str = "\n\t".join(
[repr(x) for x in executable_tis])
self.log.info(
"Setting the follow tasks to queued state:\n\t%s", task_instance_str)
"Setting the following tasks to queued state:\n\t%s", task_instance_str)
# so these dont expire on commit
for ti in executable_tis:
copy_dag_id = ti.dag_id
Expand Down Expand Up @@ -1045,7 +1061,7 @@ def _change_state_for_executable_task_instances(self, task_instances,

if None in acceptable_states:
ti_query = ti_query.filter(
or_(TI.state == None, TI.state.in_(acceptable_states)) # noqa: E711
or_(TI.state == None, TI.state.in_(acceptable_states)) # noqa pylint: disable=singleton-comparison
)
else:
ti_query = ti_query.filter(TI.state.in_(acceptable_states))
Expand Down

0 comments on commit 397d0a6

Please sign in to comment.