Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-7022] Simplify DagFileProcessor.process_file method #7674

Merged
merged 8 commits into from
Mar 12, 2020

Conversation

mik-laj
Copy link
Member

@mik-laj mik-laj commented Mar 9, 2020

NOTE FOR REVIEWERS: This PR contains several commits. Please review each commit in turn.

This method is too long, so I did the following changes:

708872191 Filter active DAGs only once
aa38e5afb Extract _prepare_simple_dags method
67f6f388d Extract _schedule_task_instances method

Issue link: AIRFLOW-7022

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
  • Unit tests coverage for changes (not needed for documentation changes)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Mar 9, 2020
@mik-laj mik-laj changed the title Simplify DagFileProcessor.process_file method [AIRFLOW-7022] Simplify DagFileProcessor.process_file method Mar 9, 2020
Copy link
Member

@turbaszek turbaszek left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, small comments only 👌

airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job.py Show resolved Hide resolved
@mik-laj
Copy link
Member Author

mik-laj commented Mar 10, 2020

Travis is green again.

@ashb
Copy link
Member

ashb commented Mar 11, 2020

Thanks for doing them as fixup commits @mik-laj -- I think we (comitters) should get more in the habit of it, makes re-review easier

@@ -1057,6 +1057,31 @@ def test_process_file_should_failure_callback(self):
self.assertEqual("Callback fired", content)
os.remove(callback_file.name)

def test_should_parse_only_active_dags(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def test_should_parse_only_active_dags(self):
def test_should_parse_only_unpaused_dags(self):

Active is another concept (the is_active column).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 from me after this change is made

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@@ -845,11 +845,11 @@ def process_file(

paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dagbag.dag_ids)

active_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
unpaaused_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
unpaaused_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]
unpaused_dags = [dag for dag_id, dag in dagbag.dags.items() if dag_id not in paused_dag_ids]


simple_dags = self._prepare_simple_dags(active_dags, pickle_dags, session)
simple_dags = self._prepare_simple_dags(unpaaused_dags, pickle_dags, session)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
simple_dags = self._prepare_simple_dags(unpaaused_dags, pickle_dags, session)
simple_dags = self._prepare_simple_dags(unpaused_dags, pickle_dags, session)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.


dags = self._find_dags_to_process(active_dags)
dags = self._find_dags_to_process(unpaaused_dags)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
dags = self._find_dags_to_process(unpaaused_dags)
dags = self._find_dags_to_process(unpaused_dags)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.

@codecov-io
Copy link

codecov-io commented Mar 11, 2020

Codecov Report

Merging #7674 into master will decrease coverage by 0.62%.
The diff coverage is 86.66%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #7674      +/-   ##
==========================================
- Coverage   86.84%   86.21%   -0.63%     
==========================================
  Files         897      904       +7     
  Lines       42806    43740     +934     
==========================================
+ Hits        37173    37710     +537     
- Misses       5633     6030     +397     
Impacted Files Coverage Δ
airflow/jobs/scheduler_job.py 90.88% <86.66%> (+0.22%) ⬆️
...flow/providers/apache/cassandra/hooks/cassandra.py 21.51% <0.00%> (-72.16%) ⬇️
...w/providers/apache/hive/operators/mysql_to_hive.py 35.84% <0.00%> (-64.16%) ⬇️
airflow/kubernetes/volume_mount.py 44.44% <0.00%> (-55.56%) ⬇️
airflow/providers/postgres/operators/postgres.py 50.00% <0.00%> (-50.00%) ⬇️
airflow/providers/redis/operators/redis_publish.py 50.00% <0.00%> (-50.00%) ⬇️
airflow/kubernetes/volume.py 52.94% <0.00%> (-47.06%) ⬇️
airflow/providers/mongo/sensors/mongo.py 53.33% <0.00%> (-46.67%) ⬇️
airflow/kubernetes/pod_launcher.py 47.18% <0.00%> (-45.08%) ⬇️
airflow/providers/mysql/operators/mysql.py 55.00% <0.00%> (-45.00%) ⬇️
... and 33 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 5be3b31...4ea2043. Read the comment docs.

@mik-laj mik-laj requested review from kaxil and ashb March 12, 2020 03:24
@mik-laj mik-laj merged commit 7f6e6b9 into apache:master Mar 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants