Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-7022] Simplify DagFileProcessor.process_file method #7674

Merged
merged 8 commits into from
Mar 12, 2020
31 changes: 23 additions & 8 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,8 +820,6 @@ def process_file(
:rtype: Tuple[List[SimpleDag], int]
"""
self.log.info("Processing file %s for tasks to queue", file_path)
# As DAGs are parsed from this file, they will be converted into SimpleDags
simple_dags = []

try:
dagbag = models.DagBag(file_path, include_examples=False)
Expand Down Expand Up @@ -849,12 +847,7 @@ def process_file(

active_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]

# Pickle the DAGs (if necessary) and put them into a SimpleDag
for dag in active_dags:
pickle_id = None
if pickle_dags:
pickle_id = dag.pickle(session).id
simple_dags.append(SimpleDag(dag, pickle_id=pickle_id))
simple_dags = self._prepare_simple_dags(active_dags, pickle_dags, session)

dags = self._find_dags_to_process(active_dags)

Expand Down Expand Up @@ -905,6 +898,28 @@ def process_file(

return simple_dags, len(dagbag.import_errors)

@provide_session
def _prepare_simple_dags(self, dags: List[DAG], pickle_dags: bool, session) -> List[SimpleDag]:
"""
Convert DAGS to SimpleDags. If necessary, it also Pickle the DAGs

:param dags: List of DAGs
:param pickle_dags: whether serialize the DAGs found in the file and
save them to the db
:type pickle_dags: bool
:return: List of SimpleDag
:rtype: List[airflow.utils.dag_processing.SimpleDag]
"""

simple_dags = []
# Pickle the DAGs (if necessary) and put them into a SimpleDag
for dag in dags:
pickle_id = None
if pickle_dags:
pickle_id = dag.pickle(session).id
mik-laj marked this conversation as resolved.
Show resolved Hide resolved
simple_dags.append(SimpleDag(dag, pickle_id=pickle_id))
return simple_dags


class SchedulerJob(BaseJob):
"""
Expand Down