Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5391] Do not run skipped tasks when they are cleared #7276

Merged
merged 2 commits into from
Feb 21, 2020

Conversation

yuqian90
Copy link
Contributor

@yuqian90 yuqian90 commented Jan 28, 2020

This PR fixes the following issue:

If a task is skipped by BranchPythonOperator, BaseBranchOperator orr ShortCircuitOperator and the user then clears the skipped task, it'll execute.

After this PR:
The NotPreviouslySkippedDep rule will first evaluate if a task has a direct SkipMixin parent that has decided to skip it. This is done by examining the XCom data stored by SkipMixin.skip() or SkipMixin.skip_all_except().

The implementation is inspired by the author of this blog.


Issue link: AIRFLOW-5391

Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
  • Unit tests coverage for changes (not needed for documentation changes)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@tooptoop4
Copy link
Contributor

hmm, I use BranchPythonOperator and clear command a lot and I never see the skipped tasks just run on clear. do u use none_failed ?

@yuqian90
Copy link
Contributor Author

@tooptoop4 the problem happens when you clear the skipped task (not when you clear the BranchPythonOperator). There's an example in the linked JIRA.

@tooptoop4
Copy link
Contributor

i clear entire dag and dont face problem

@yuqian90
Copy link
Contributor Author

@tooptoop4 yes if always clear the entire DAG, the BranchPythonOperator itself gets cleared so you won't face this problem.

@yuqian90 yuqian90 closed this Jan 29, 2020
@yuqian90 yuqian90 reopened this Jan 29, 2020
@tooptoop4
Copy link
Contributor

makes sense!

@codecov-io
Copy link

codecov-io commented Feb 2, 2020

Codecov Report

Merging #7276 into master will increase coverage by 53.43%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff             @@
##           master    #7276       +/-   ##
===========================================
+ Coverage   32.95%   86.39%   +53.43%     
===========================================
  Files         878      878               
  Lines       41219    41288       +69     
===========================================
+ Hits        13584    35669    +22085     
+ Misses      27635     5619    -22016
Impacted Files Coverage Δ
airflow/ti_deps/deps/trigger_rule_dep.py 91.02% <ø> (+74.77%) ⬆️
airflow/models/baseoperator.py 96.52% <100%> (+38.94%) ⬆️
airflow/ti_deps/dep_context.py 100% <100%> (+29.03%) ⬆️
airflow/models/skipmixin.py 98.18% <100%> (+73.79%) ⬆️
airflow/ti_deps/deps/not_previously_skipped_dep.py 100% <100%> (ø)
airflow/kubernetes/volume_mount.py 44.44% <0%> (-55.56%) ⬇️
airflow/kubernetes/volume.py 52.94% <0%> (-47.06%) ⬇️
airflow/kubernetes/pod_launcher.py 47.18% <0%> (-39.44%) ⬇️
...viders/cncf/kubernetes/operators/kubernetes_pod.py 69.38% <0%> (-25.52%) ⬇️
airflow/kubernetes/refresh_config.py 50.98% <0%> (-23.53%) ⬇️
... and 767 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d83ce26...39020d8. Read the comment docs.

@yuqian90
Copy link
Contributor Author

yuqian90 commented Feb 2, 2020

  • Added tests
  • Extended the same support to BranchPythonOperator, ShortCircuitOperator and BaseBranchOperator. Clearing downstream tasks of all these three operators will now do the right thing.
  • That said, I still think BaseBranchOperator is a bit redundant since we already have BranchPythonOperator. I'm proposing to deprecate BaesBranchOperator in another PR

The PR is now ready for review. Reviewers @kaxil @ashb @feluelle please take a look.

@yuqian90 yuqian90 force-pushed the branch_python_operator branch 3 times, most recently from a7ebc52 to 9e094d1 Compare February 7, 2020 11:03
@yuqian90 yuqian90 changed the title [AIRFLOW-5391] Make BaseOperator respect branching of parent BranchPythonOperator [AIRFLOW-5391] Add a new dependency rule to evaluate branching result Feb 7, 2020
@yuqian90
Copy link
Contributor Author

yuqian90 commented Feb 7, 2020

@ashb @kaxil @feluelle @tooptoop4 Hoping to get a review here.

I have updated the PR a bit. The logic is still the same, but the code that does the branching evaluation has been moved into a new dependency rule inside ti_deps/deps/branch_dep.py.

When BranchDep is evaluated, it sets the state of a task to "SKIPPED" if it has a parent that decided to skip it.

For example, this comes from "example_short_circuit_operatorschedule":
image

For a DAG that looks like this, condition_is_False decided to skip false_1. Before this PR, the problem is if someone clears false_1, it'll execute. This is very counter-intuitive for users:
image
image

After this PR, when the scheduler evaluates the dependency rules for false_1, BranchDep will skip false_1 because it knows that the parent task condition_is_False had already decided to skip false_1. In other words, this makes the "skipped" status "sticky".

So after this PR, the DAG will look like this after someone clears false_1. This is much more intuitive:

image

@yuqian90 yuqian90 force-pushed the branch_python_operator branch 3 times, most recently from 68383ae to ca16153 Compare February 8, 2020 09:39
@yuqian90 yuqian90 changed the title [AIRFLOW-5391] Add a new dependency rule to evaluate branching result [AIRFLOW-5391] Do not run skipped tasks when they are cleared Feb 8, 2020
Copy link
Member

@feluelle feluelle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Great test coverage 👍

..would be nice if you could complete the docstrings to contain types and add the types also to the function header.

airflow/models/skipmixin.py Outdated Show resolved Hide resolved
airflow/models/skipmixin.py Outdated Show resolved Hide resolved
tests/operators/test_python.py Outdated Show resolved Hide resolved
@yuqian90
Copy link
Contributor Author

Hi, @feluelle I've adopted all your suggestions (adding doc and raise ValueError), and the PR has been updated. Please take another look. Thank you!

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, why you are using XCom makes more sense now but I still think it should use a non-default Key (as skip Mixin could be used elsewhere, and we don't want to overwrite possible xcom from the task)

airflow/ti_deps/deps/branch_dep.py Outdated Show resolved Hide resolved
airflow/models/skipmixin.py Outdated Show resolved Hide resolved
airflow/operators/branch_operator.py Outdated Show resolved Hide resolved
airflow/ti_deps/deps/branch_dep.py Outdated Show resolved Hide resolved
airflow/ti_deps/deps/branch_dep.py Outdated Show resolved Hide resolved
tests/operators/test_python.py Show resolved Hide resolved
airflow/models/baseoperator.py Outdated Show resolved Hide resolved
galuszkak pushed a commit to FlyrInc/apache-airflow that referenced this pull request Mar 5, 2020
…che#7276)

If a task is skipped by BranchPythonOperator, BaseBranchOperator or ShortCircuitOperator and the user then clears the skipped task later, it'll execute. This is probably not the right
behaviour.

This commit changes that so it will be skipped again. This can be ignored by running the task again with "Ignore Task Deps" override.
@ashb ashb added this to the Airflow 2.0.0 milestone Jul 2, 2020
yuqian90 added a commit to yuqian90/airflow that referenced this pull request Jul 22, 2020
…che#7276)

If a task is skipped by BranchPythonOperator, BaseBranchOperator or ShortCircuitOperator and the user then clears the skipped task later, it'll execute. This is probably not the right
behaviour.

This commit changes that so it will be skipped again. This can be ignored by running the task again with "Ignore Task Deps" override.

(cherry picked from commit 1cdab56)
kaxil pushed a commit that referenced this pull request Jul 22, 2020
If a task is skipped by BranchPythonOperator, BaseBranchOperator or ShortCircuitOperator and the user then clears the skipped task later, it'll execute. This is probably not the right
behaviour.

This commit changes that so it will be skipped again. This can be ignored by running the task again with "Ignore Task Deps" override.

(cherry picked from commit 1cdab56)
potiuk pushed a commit that referenced this pull request Jul 22, 2020
If a task is skipped by BranchPythonOperator, BaseBranchOperator or ShortCircuitOperator and the user then clears the skipped task later, it'll execute. This is probably not the right
behaviour.

This commit changes that so it will be skipped again. This can be ignored by running the task again with "Ignore Task Deps" override.

(cherry picked from commit 1cdab56)
kaxil pushed a commit that referenced this pull request Aug 11, 2020
If a task is skipped by BranchPythonOperator, BaseBranchOperator or ShortCircuitOperator and the user then clears the skipped task later, it'll execute. This is probably not the right
behaviour.

This commit changes that so it will be skipped again. This can be ignored by running the task again with "Ignore Task Deps" override.

(cherry picked from commit 1cdab56)
cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Mar 5, 2021
If a task is skipped by BranchPythonOperator, BaseBranchOperator or ShortCircuitOperator and the user then clears the skipped task later, it'll execute. This is probably not the right
behaviour.

This commit changes that so it will be skipped again. This can be ignored by running the task again with "Ignore Task Deps" override.

(cherry picked from commit 1cdab56)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants