Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task instance mutation hook #8852

Merged
merged 12 commits into from
Jun 13, 2020

Conversation

milton0825
Copy link
Contributor

@milton0825 milton0825 commented May 13, 2020

tl;dr: support task instance mutation before each execution.

Currently the only way to apply cluster-wide mutations to tasks is by defining policy in airflow_local_setting. However, policy is applied to task (not task_instance and the descriptions and docs are wrong) right after the DAG is loaded. And you will not be able to change the task after that.

Therefore a hook is added to support mutating the task instance before each execution (in case there is a retry). For example, you can re-route the task to execute in a different queue during retries.


Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@milton0825 milton0825 marked this pull request as ready for review May 16, 2020 06:42
@feng-tao
Copy link
Member

@ashb could you take a look? Lyft leverages this features to do py2/py3 migration and found it pretty useful. I haven't followed actively on recent airflow development and not sure if there is anything similar exist in current codebase /feature. Let us know.

@@ -564,6 +565,27 @@ def with_all_tasks_removed(dag):
flaky_ti.refresh_from_db()
self.assertEqual(State.NONE, flaky_ti.state)

@mock.patch('airflow.models.dagrun.task_instance_mutation_hook')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for TI where it is UP_FOR_RETRY

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@milton0825 Can you add a test for TI where it is UP_FOR_RETRY

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing that probably won't add too much value as the logic is not based on the state of TI.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#8852 (comment) Based on our discussion in that comment if in future someone removes L468, no test would fail but it would break the behaviour that ti_mutation_hook won't work on ti that are up for retry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added


To define policy, add a ``airflow_local_settings`` module
to your PYTHONPATH that defines this ``policy`` function. It receives
a ``Task`` object and can alter it where needed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to remove these doc comments? This bit seems appropriate still.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved that to the sections below.

@milton0825 milton0825 merged commit bacb05d into apache:master Jun 13, 2020
@kaxil kaxil added this to the Airflow 1.10.11 milestone Jun 30, 2020
kaxil pushed a commit that referenced this pull request Jul 1, 2020
@kaxil kaxil added the type:new-feature Changelog: New Features label Jul 1, 2020
kaxil pushed a commit that referenced this pull request Jul 1, 2020
cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Mar 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants