Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-6535] add exception to fail without retry #7133

Merged
merged 17 commits into from
May 16, 2020

Conversation

jstern
Copy link
Contributor

@jstern jstern commented Jan 10, 2020

I would like to be able to configure certain tasks to retry, but to also have a way to bypass retry if I can detect a condition that is unlikely to be change for the better.

For example, imagine I have a DAG with a large number of tasks that fetch data from an API ... sometimes the API returns a 200 and everything is fine, sometimes the API returns a 500 and I know this means that API is failing under load and I want to retry later, and sometimes the API returns a 400 indicating that I've configured my requests in a way that will never succeed. If the API returns a 400 for some/all of my tasks, then I have to wait for all of them to get through all their retries before the run fails.

My proposal is to add another subclass of AirflowException called AirflowFailException, and to update the exception handling in TaskInstance.run_raw_task such that when this exception is seen, the resulting behavior is that same as if we entered handle_failure and is_eligible_for_returned false (except with adjusted logging so it does not look like we simply hit our retry limit).


Issue link: AIRFLOW-6535

  • Description above provides context of the change
  • Commit message/PR title starts with [AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID*
  • Unit tests coverage for changes (not needed for documentation changes)
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

* For document-only changes commit message can start with [AIRFLOW-XXXX].


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it, just one small comment. It looks like a good way. However it will also need some description in the documentation and some tests.

@jstern
Copy link
Contributor Author

jstern commented Jan 10, 2020

I like it, just one small comment. It looks like a good way. However it will also need some description in the documentation and some tests.

Thanks for the quick review! Definitely will follow up with some tests shortly.

I was looking for a good place in the docs ... ideally I would have found an existing spot that discusses the semantics of AirflowSkipException and added something there, but the docs don't cover that either :/ Is there a particular spot within the documentation that would be best?

@jstern
Copy link
Contributor Author

jstern commented Jan 10, 2020

One other question: am I perhaps overcomplicating this? It occurs to me that if a task encounters an AirflowSkipException, we don't bother checking on its retry status or adjusting our messaging based on that:

        except AirflowSkipException as e:
            # Recording SKIP
            # log only if exception has any arguments to prevent log flooding
            if e.args:
                self.log.info(e)
            self.refresh_from_db(lock_for_update=True)
            self.state = State.SKIPPED
            self.log.info(
                'Marking task as SKIPPED.'
                'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
                self.dag_id,
                self.task_id,
                self.execution_date.strftime('%Y%m%dT%H%M%S') if hasattr(
                    self,
                    'execution_date') and self.execution_date else '',
                self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
                    self,
                    'start_date') and self.start_date else '',
                self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
                    self,
                    'end_date') and self.end_date else '')

Maybe instead of messing with handle_failure I should just emulate that but with different messaging? Might be cleaner and easier to follow...

EDIT: but then I would still need to make sure failure email/callbacks happen ... so I guess I can either leave this structured the way I have it or include those in the except block.

@potiuk
Copy link
Member

potiuk commented Jan 10, 2020

I like it, just one small comment. It looks like a good way. However it will also need some description in the documentation and some tests.

Thanks for the quick review! Definitely will follow up with some tests shortly.

I was looking for a good place in the docs ... ideally I would have found an existing spot that discusses the semantics of AirflowSkipException and added something there, but the docs don't cover that either :/ Is there a particular spot within the documentation that would be best?

I think "Concepts" is best and adding chapter about Exceptions would be good.

@potiuk
Copy link
Member

potiuk commented Jan 10, 2020

Maybe instead of messing with handle_failure I should just emulate that but with different messaging? Might be cleaner and easier to follow...

It's ok. I think it's easy enough.

@jstern jstern changed the title [AIRFLOW-6535] first pass at exception to fail without retry [AIRFLOW-6535] add exception to fail without retry Jan 10, 2020
@codecov-io
Copy link

codecov-io commented Jan 10, 2020

Codecov Report

Merging #7133 into master will decrease coverage by 54.47%.
The diff coverage is 7.5%.

Impacted file tree graph

@@             Coverage Diff             @@
##           master    #7133       +/-   ##
===========================================
- Coverage   86.92%   32.45%   -54.48%     
===========================================
  Files         915      914        -1     
  Lines       44152    44152               
===========================================
- Hits        38381    14329    -24052     
- Misses       5771    29823    +24052
Impacted Files Coverage Δ
airflow/models/dag.py 46.41% <0%> (-44.81%) ⬇️
airflow/exceptions.py 96.66% <100%> (-3.34%) ⬇️
airflow/models/taskinstance.py 29.1% <5.55%> (-65.88%) ⬇️
...low/contrib/operators/wasb_delete_blob_operator.py 0% <0%> (-100%) ⬇️
airflow/contrib/hooks/vertica_hook.py 0% <0%> (-100%) ⬇️
airflow/contrib/sensors/__init__.py 0% <0%> (-100%) ⬇️
airflow/hooks/mssql_hook.py 0% <0%> (-100%) ⬇️
...viders/docker/example_dags/example_docker_swarm.py 0% <0%> (-100%) ⬇️
airflow/hooks/webhdfs_hook.py 0% <0%> (-100%) ⬇️
airflow/contrib/sensors/emr_base_sensor.py 0% <0%> (-100%) ⬇️
... and 786 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 35eff0a...a887d1f. Read the comment docs.

@jstern jstern requested a review from potiuk January 24, 2020 14:37
@robinedwards
Copy link
Contributor

Interesting I have a hack in one of my operators which allows the selection of specific exceptions to retry for.

@jstern
Copy link
Contributor Author

jstern commented Jan 24, 2020

Interesting I have a hack in one of my operators which allows the selection of specific exceptions to retry for.

I'd be curious to see how you do it ... maybe it's not really that hacky? :) I'd be happy to close this PR if there's already a sensible/easy way to accomplish this within what airflow already provides...

@robinedwards
Copy link
Contributor

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice idea.

The one thing I don't think the tests cover is that actually raising this new exception causes the task to not be retried -- you've tested the change to handle_failure, but that is essentially the internal details, rather than the "interface" of the new feature.

airflow/models/taskinstance.py Outdated Show resolved Hide resolved
docs/conf.py Show resolved Hide resolved
tests/models/test_taskinstance.py Outdated Show resolved Hide resolved
@jstern
Copy link
Contributor Author

jstern commented Feb 4, 2020

The one thing I don't think the tests cover is that actually raising this new exception causes the task to not be retried -- you've tested the change to handle_failure, but that is essentially the internal details, rather than the "interface" of the new feature.

I'll take some time to look deeper but I didn't find any existing tests of the interface for handling the other exceptions, so I glommed onto the closest test I could find. If you know offhand where better example tests that I can extend please point me to them :) Or I can try to take some time to add them.

@KevinYang21
Copy link
Member

Hi @jstern , this is a very nice feature that we are also very interested in. Do we still plan to merge it? Anything we can do to help?

@jstern
Copy link
Contributor Author

jstern commented Mar 27, 2020

Thanks @KevinYang21 - I definitely would like to see it get merged, but I'm stuck on a strange problem with the docs build throwing warnings due to the problem I described in #7133 (comment) above.

The short version of the issue is that my PR adds airflow.exceptions to the API docs, but this causes warnings in the doc build because AirflowException is already included (incorrectly) in the yandex provider API docs. I have not yet had the time or inspiration to figure out why that bug exists in master or how to correct it.

Another option that would let the build succeed would be to revert the change that adds the exceptions module to the api docs, but since I believe the docs in master are wrong I have been reluctant to take this step.

Perhaps I should file a JIRA ticket for the docs bug in master and hope someone who understands Sphinx and autoapi better than I do knows how to fix it?

@KevinYang21
Copy link
Member

@jstern sry I was distracted, yes I think your plan is good. Saw that you got a PR fixing doc, will look into that one too. Thank you!

@tooptoop4
Copy link
Contributor

gentle ping @jstern

@jstern
Copy link
Contributor Author

jstern commented Apr 17, 2020

gentle ping @jstern

hi @tooptoop4 ! just waiting on #8095 to hit master so the docs can build cleanly and then hopefully we'll be able to get this one ready to merge

@jstern jstern force-pushed the airflow-fail-exception branch 2 times, most recently from f661cbd to f19e950 Compare April 26, 2020 12:53
Try to reduce nesting and repetition of logic for different conditions.
Also try to tighten up the scope of the exception handling ... it looks
like the large block that catches an Exception and logs it as a failure
to send an email may have been swallowing some TypeErrors coming out
of trying to compose a log info message and calling strftime on
start_date and end_date when they're set to None; this is why I've added
lines in the test to set those values on the TaskInstance objects.
@tooptoop4
Copy link
Contributor

bump?

@jstern
Copy link
Contributor Author

jstern commented May 16, 2020

@KevinYang21 if you have some time to look at this again ... nothing I'm aware of to hold it up other than committer approval.

cc @ashb @potiuk ... i know you guys are very busy but i'd be very grateful if you could find time to take another look at this :)

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one tiny doc change then I'm happy with this.

sorry you had to chase so many times!

docs/concepts.rst Show resolved Hide resolved
@ashb ashb merged commit 707bb0c into apache:master May 16, 2020
@ashb ashb added this to the Airflow 1.10.11 milestone May 16, 2020
@KevinYang21
Copy link
Member

🎉 I need to really spend some time on my email filter, keep missing such important PR @ :) Thank you @jstern

@marshall7m
Copy link
Contributor

Hi there, just wondering what version of airflow the AirflowFailException is available on. Thanks!

@ashb
Copy link
Member

ashb commented Jun 3, 2020

It's marked to be included in 1.10.11

image

kaxil pushed a commit that referenced this pull request Jun 22, 2020
)

* use preferred boolean check idiom

Co-Authored-By: Jarek Potiuk <[email protected]>

* add test coverage for AirflowFailException

* add docs for some exception usage patterns

* autoformatting

* remove extraneous newline, poke travis build

* clean up TaskInstance.handle_failure

Try to reduce nesting and repetition of logic for different conditions.
Also try to tighten up the scope of the exception handling ... it looks
like the large block that catches an Exception and logs it as a failure
to send an email may have been swallowing some TypeErrors coming out
of trying to compose a log info message and calling strftime on
start_date and end_date when they're set to None; this is why I've added
lines in the test to set those values on the TaskInstance objects.

* let sphinx generate docs for exceptions module

* keep session kwarg last in handle_failure

* explain allowed_top_level

* add black-box tests for retry/fail immediately cases

* don't lose safety measures in logging date attrs

* fix flake8 too few blank lines

* grammar nitpick

* add import to AirflowFailException example

Co-authored-by: Jarek Potiuk <[email protected]>
(cherry picked from commit 707bb0c)
potiuk pushed a commit that referenced this pull request Jun 29, 2020
)

* use preferred boolean check idiom

Co-Authored-By: Jarek Potiuk <[email protected]>

* add test coverage for AirflowFailException

* add docs for some exception usage patterns

* autoformatting

* remove extraneous newline, poke travis build

* clean up TaskInstance.handle_failure

Try to reduce nesting and repetition of logic for different conditions.
Also try to tighten up the scope of the exception handling ... it looks
like the large block that catches an Exception and logs it as a failure
to send an email may have been swallowing some TypeErrors coming out
of trying to compose a log info message and calling strftime on
start_date and end_date when they're set to None; this is why I've added
lines in the test to set those values on the TaskInstance objects.

* let sphinx generate docs for exceptions module

* keep session kwarg last in handle_failure

* explain allowed_top_level

* add black-box tests for retry/fail immediately cases

* don't lose safety measures in logging date attrs

* fix flake8 too few blank lines

* grammar nitpick

* add import to AirflowFailException example

Co-authored-by: Jarek Potiuk <[email protected]>
(cherry picked from commit 707bb0c)
kaxil pushed a commit that referenced this pull request Jul 1, 2020
)

* use preferred boolean check idiom

Co-Authored-By: Jarek Potiuk <[email protected]>

* add test coverage for AirflowFailException

* add docs for some exception usage patterns

* autoformatting

* remove extraneous newline, poke travis build

* clean up TaskInstance.handle_failure

Try to reduce nesting and repetition of logic for different conditions.
Also try to tighten up the scope of the exception handling ... it looks
like the large block that catches an Exception and logs it as a failure
to send an email may have been swallowing some TypeErrors coming out
of trying to compose a log info message and calling strftime on
start_date and end_date when they're set to None; this is why I've added
lines in the test to set those values on the TaskInstance objects.

* let sphinx generate docs for exceptions module

* keep session kwarg last in handle_failure

* explain allowed_top_level

* add black-box tests for retry/fail immediately cases

* don't lose safety measures in logging date attrs

* fix flake8 too few blank lines

* grammar nitpick

* add import to AirflowFailException example

Co-authored-by: Jarek Potiuk <[email protected]>
(cherry picked from commit 707bb0c)
cfei18 pushed a commit to cfei18/incubator-airflow that referenced this pull request Mar 5, 2021
* use preferred boolean check idiom

Co-Authored-By: Jarek Potiuk <[email protected]>

* add test coverage for AirflowFailException

* add docs for some exception usage patterns

* autoformatting

* remove extraneous newline, poke travis build

* clean up TaskInstance.handle_failure

Try to reduce nesting and repetition of logic for different conditions.
Also try to tighten up the scope of the exception handling ... it looks
like the large block that catches an Exception and logs it as a failure
to send an email may have been swallowing some TypeErrors coming out
of trying to compose a log info message and calling strftime on
start_date and end_date when they're set to None; this is why I've added
lines in the test to set those values on the TaskInstance objects.

* let sphinx generate docs for exceptions module

* keep session kwarg last in handle_failure

* explain allowed_top_level

* add black-box tests for retry/fail immediately cases

* don't lose safety measures in logging date attrs

* fix flake8 too few blank lines

* grammar nitpick

* add import to AirflowFailException example

Co-authored-by: Jarek Potiuk <[email protected]>
(cherry picked from commit 707bb0c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants