Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on_failure_callback is not called when task is terminated externally #25297

Closed
1 of 2 tasks
hliu47 opened this issue Jul 26, 2022 · 22 comments · Fixed by #29743
Closed
1 of 2 tasks

on_failure_callback is not called when task is terminated externally #25297

hliu47 opened this issue Jul 26, 2022 · 22 comments · Fixed by #29743

Comments

@hliu47
Copy link

hliu47 commented Jul 26, 2022

Apache Airflow version

2.2.5

What happened

on_failure_callback is not called when task is terminated externally.
A similar issue was reported in #14422 and fixed in #15172.
However, the code that fixed this was changed in a later PR #16301, after which task_instance._run_finished_callback is no longer called when SIGTERM is received
(https://github.com/apache/airflow/pull/16301/files#diff-d80fa918cc75c4d6aa582d5e29eeb812ba21371d6977fde45a4749668b79a515L85).

What you think should happen instead

on_failure_callback should be called when task fails regardless of how the task fails.

How to reproduce

DAG file:

import datetime
import pendulum
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
 
 
DEFAULT_ARGS = {
  'email': ['[email protected]']
}
 
 
TZ = pendulum.timezone("America/Los_Angeles")
 
test_dag = DAG(
  dag_id='test_callback_in_manually_terminated_dag',
  schedule_interval='*/10 * * * *',
  default_args=DEFAULT_ARGS,
  catchup=False,
  start_date=datetime.datetime(2022, 7, 14, 0, 0, tzinfo=TZ)
)
 
with test_dag:
  BashOperator(
    task_id='manually_terminated_task',
    bash_command='echo start; sleep 60',
    on_failure_callback=lambda context: print('This on_failure_back should be called when task fails.')
  )

While the task instance is running, either force quitting the scheduler or manually updating its state to None in the database will cause the task to get SIGTERM and terminate. In either case, a failure callback will not be called which does not match the behavior of previous versions of Airflow.

The stack trace is attached below and on_failure_callback is not called.

[2022-07-15, 02:02:24 UTC] {process_utils.py:124} INFO - Sending Signals.SIGTERM to group 10571. PIDs of all processes in the group: [10573, 10575, 10571]
[2022-07-15, 02:02:24 UTC] {process_utils.py:75} INFO - Sending the signal Signals.SIGTERM to group 10571
[2022-07-15, 02:02:24 UTC] {taskinstance.py:1431} ERROR - Received SIGTERM. Terminating subprocesses.
[2022-07-15, 02:02:24 UTC] {subprocess.py:99} INFO - Sending SIGTERM signal to process group
[2022-07-15, 02:02:24 UTC] {process_utils.py:70} INFO - Process psutil.Process(pid=10575, status='terminated', started='02:02:11') (10575) terminated with exit code None
[2022-07-15, 02:02:24 UTC] {taskinstance.py:1776} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.7/lib/python3.7/site-packages/airflow/operators/bash.py", line 182, in execute
    cwd=self.cwd,
  File "/opt/python3.7/lib/python3.7/site-packages/airflow/hooks/subprocess.py", line 87, in run_command
    for raw_line in iter(self.sub_process.stdout.readline, b''):
  File "/opt/python3.7/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1433, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2022-07-15, 02:02:24 UTC] {taskinstance.py:1289} INFO - Marking task as FAILED. dag_id=test_callback_in_manually_terminated_dag, task_id=manually_terminated_task, execution_date=20220715T015100, start_date=20220715T020211, end_date=20220715T020224
[2022-07-15, 02:02:24 UTC] {logging_mixin.py:109} WARNING - /opt/python3.7/lib/python3.7/site-packages/airflow/utils/email.py:108 PendingDeprecationWarning: Fetching SMTP credentials from configuration variables will be deprecated in a future release. Please set credentials using a connection instead.
[2022-07-15, 02:02:24 UTC] {configuration.py:381} WARNING - section/key [smtp/smtp_user] not found in config
[2022-07-15, 02:02:24 UTC] {email.py:214} INFO - Email alerting: attempt 1
[2022-07-15, 02:02:24 UTC] {configuration.py:381} WARNING - section/key [smtp/smtp_user] not found in config
[2022-07-15, 02:02:24 UTC] {email.py:214} INFO - Email alerting: attempt 1
[2022-07-15, 02:02:24 UTC] {taskinstance.py:1827} ERROR - Failed to send email to: ['[email protected]']
...
OSError: [Errno 101] Network is unreachable
[2022-07-15, 02:02:24 UTC] {standard_task_runner.py:98} ERROR - Failed to execute job 159 for task manually_terminated_task (Task received SIGTERM signal; 10571)
[2022-07-15, 02:02:24 UTC] {process_utils.py:70} INFO - Process psutil.Process(pid=10571, status='terminated', exitcode=1, started='02:02:11') (10571) terminated with exit code 1
[2022-07-15, 02:02:24 UTC] {process_utils.py:70} INFO - Process psutil.Process(pid=10573, status='terminated', started='02:02:11') (10573) terminated with exit code None

Operating System

CentOS Linux 7

Deployment

Other Docker-based deployment

Anything else

This is an issue in 2.2.5. However, I notice that it appears to be fixed in the main branch by PR #21877 although it was not intended to fix this issue. Is there a timeline for getting that PR into a release? We are happy to test it out to see if it fixes the issue once it's released.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@hliu47 hliu47 added area:core kind:bug This is a clearly a bug labels Jul 26, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Jul 26, 2022

Thanks for opening your first issue here! Be sure to follow the issue template!

@potiuk potiuk added this to the Airflow 2.3.4 milestone Jul 28, 2022
@potiuk
Copy link
Member

potiuk commented Jul 28, 2022

Since you know what caused it maybe you would like to provide a fix for it @hliu47 ? Sounds like nice contribution back for the free project and you cn ebecome one of the > 2100 contributors to it.

@hliu47
Copy link
Author

hliu47 commented Jul 29, 2022

Since you know what caused it maybe you would like to provide a fix for it @hliu47 ? Sounds like nice contribution back for the free project and you cn ebecome one of the > 2100 contributors to it.

@potiuk Thank you so much for replying in this issue. I think there is already a PR in the main branch #21877 that fixed the issue. So is there a timeline for getting that PR into a release?

@potiuk
Copy link
Member

potiuk commented Jul 29, 2022

Interesting - that should get to 2.4.0 release, initial goal is to get it out mid-August, but we releas when things are ready so it depends on testing phase.

@hliu47
Copy link
Author

hliu47 commented Aug 1, 2022

That's good to know, thanks!

@ashb ashb modified the milestones: Airflow 2.4.0, Airflow 2.4.1 Sep 8, 2022
@ephraimbuddy
Copy link
Contributor

@hliu47 can you confirm if this is fixed in 2.4.0

@ephraimbuddy ephraimbuddy modified the milestones: Airflow 2.4.3, Airflow 2.4.4 Nov 9, 2022
@ephraimbuddy ephraimbuddy modified the milestones: Airflow 2.4.4, Airflow 2.5.0, Airflow 2.5.1 Nov 23, 2022
@github-actions
Copy link

This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 25, 2022
@github-actions
Copy link

github-actions bot commented Jan 1, 2023

This issue has been closed because it has not received response from the issue author.

@github-actions github-actions bot closed this as completed Jan 1, 2023
@ed-sparkes
Copy link

@potiuk Hello, do you know if this has been resolved or not, there was some indication that it was destined for 2.4 but then milestone has increased to 2.5.1 and then the issue closed by github-actions. Many thanks

@potiuk
Copy link
Member

potiuk commented Jan 9, 2023

As anyone else here @ed-sparkes I can only see the same sources as you. If it has been closed without being explicitly mentioned, the jury is not out. It may or may not have been solved. But if you would like to verify and reproduce it and see if it has been solved - feel free and then either comment here with evidences it has not been solved (or better open a new issue with reproducible details).

What is the reason you want to know if it's been fixed? Do you base some business decisions on ? Is it causing some issues for you? Or is it just curiosity @ed-sparkes ?

From what I see we believed it's been fixed but the original author did not confirm it, so closing it was pretty good idea. Now - if you would like to help and get more certainty - you could take over where the author left it and perform the verification - that would be great contribution and giving back for the free software you get :)

@seub
Copy link

seub commented Jan 30, 2023

This bug is still present on Airflow 2.5.1 by my experience. on_failure_callback works when the task fails "normally", but not when I mark it as failed manually.

This issue should probably be reopened?

@potiuk
Copy link
Member

potiuk commented Feb 19, 2023

I think if If you can provide reproducible case with logs @seub from the last Airflow version, that would be good new issue. I guess the case is different than explained originally in the issue and having a reproducible case with logs would be helpful

@seub
Copy link

seub commented Feb 21, 2023

@potiuk It's the same issue, and it's easy to reproduce: just interrupt a task manually (in the UI) and observe that on_failure_callback is not called. Please try and let me know if it's working for you or not.

@potiuk
Copy link
Member

potiuk commented Feb 21, 2023

@seub - please provide a reproducible case and logs if you would like to report it. It might be something specific for your case, and to be brutally honest it's much easier for a reporter of one issue to provide the evidence rather than maintainers who look at 100s of issues and PRs a day to try to reproduce all issues that someone claims is happening without showing their evidences.

Please be empathetic towards maintainers who have a lot of things to handle. If you report "it does not work for me" for an issue that is already closed, you will nearly universaly be asked to report the issue and provide new evidences in a new issue.

@seub
Copy link

seub commented Feb 21, 2023

@potiuk This issue was automatically closed by a bot without being solved. I am confirming that it is in fact not solved as of 2.5.1, having the exact same behavior. I'm only reporting it out of courtesy, it doesn't matter to me.

@ephraimbuddy ephraimbuddy reopened this Feb 21, 2023
@potiuk
Copy link
Member

potiuk commented Feb 21, 2023

It was closed, because we did not have enough evidences and could not reproduce it. And it's really strange to be sure that it is the same bug. It might look and behave similarly and might be related to your particular deployment - Airflow has about 8 executors, and can run in multiple deployments. What you observe in your deployment might have completely different root cause and can be triggered by a number of factors. You are asking others to guess the environment you have and guess how your DAG look like and how to trigger it.

You forget the point that people here (including maitnainers) try to help and investigate user's issues in their free time usually and you got the software absolutely for free. So what we are asking for, is to spend a little time to help those people to not to spend hours and hours of their personal time to investigate and guess something that you are able to reproduce locally and provide necessary information to cut that time by many hours of trying to guess your setting.

We are just asking for your (little time) to fill in the information so that you could help ot save hours of time of those people who spend their own personal time to try to help users like you. But, maybe it is too much of an ask, I don't know. Hard to say, for sure I would be sensitive and empathetic towards those other humans if I were asked to do it politely. But not everyone is.

@github-actions github-actions bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Feb 22, 2023
@ephraimbuddy
Copy link
Contributor

We can solve this by running the callback in the signal handler:

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index c74ff8b6b0..0c361d0d41 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1528,7 +1528,8 @@ class TaskInstance(Base, LoggingMixin):
                 return
             self.log.error("Received SIGTERM. Terminating subprocesses.")
             self.task.on_kill()
-            raise AirflowException("Task received SIGTERM signal")
+            if self.task.on_failure_callback:
+                self._run_finished_callback(self.task.on_failure_callback, context, "on_failure")
 
         signal.signal(signal.SIGTERM, signal_handler)

The raised exception is somehow lost and I think the loss is similar to the explanations in this blog: https://anonbadger.wordpress.com/2018/12/15/python-signal-handlers-and-exceptions/

@ashb
Copy link
Member

ashb commented Feb 22, 2023

Generally running any serious amount of code inside a signal handler should be avoided -- it can cause lots of odd/hard to diagnose behavoiur. (The fact that we already call task.on_kill() from in there is probably not good either.) so it might fix it, but I'm worried about unpredictable side-effects with bigger failure callbacks.

The way around that in general terms (not sure it applies here) is to have the signal handler set a flag (self.terminate_requested=True for example) and then have the "loop" (where ever it is) notice that and run the teardown code.

However I'm not sure if that approach applies here. I'm also not sure which process we are in here, is this the raw/actual task executiuon process, or the supervisor?

@ephraimbuddy
Copy link
Contributor

However I'm not sure if that approach applies here. I'm also not sure which process we are in here, is this the raw/actual task executiuon process, or the supervisor?

It's the raw task execution process.

The way around that in general terms (not sure it applies here) is to have the signal handler set a flag (self.terminate_requested=True for example) and then have the "loop" (where ever it is) notice that and run the teardown code.

I will see if there's something I can do here

@ashb
Copy link
Member

ashb commented Feb 24, 2023

Is an option might be to throw a (new/custom) exception in the signal handler, and then catch it in BaseOperator.execute and run the on_failure callback there?

@ephraimbuddy
Copy link
Contributor

Is an option might be to throw a (new/custom) exception in the signal handler, and then catch it in BaseOperator.execute and run the on_failure callback there?

It currently raises AirflowException which was supposed to be caught by _run_raw_task method but it doesn't catch it.

@FFCMSouza
Copy link

I'm getting the same error on Airflow 2.7.1.
I saw that the code change to call the on_failure_callback in this cases is no longer present on version 2.7.1.

As you can see from the screenshots bellow the function was never called, the logs are never printed.
Can anyone help me with this? Should I open a new issue?
image
image
image
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants