Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on_failure_callback on DAG level is not executed #16983

Closed
gh4n opened this issue Jul 14, 2021 · 18 comments
Closed

on_failure_callback on DAG level is not executed #16983

gh4n opened this issue Jul 14, 2021 · 18 comments
Labels
affected_version:2.1 Issues Reported for 2.1 kind:bug This is a clearly a bug

Comments

@gh4n
Copy link

gh4n commented Jul 14, 2021

Apache Airflow version: 2.0.1 and 2.1.1

Kubernetes version (if you are using kubernetes) (use kubectl version): 1.20.4

Environment:

  • Cloud provider or hardware configuration: AWS EKS
  • OS (e.g. from /etc/os-release): Debian GNU/Linux
  • Kernel (e.g. uname -a): Linux 5.4.117-58.216.amzn2.x86_64

What happened:

Airflow dag failed and on_failure_callback was not triggered.
Logs were also not shown which may be related to issue #13692.

In the worker pod logs I get the following error messages:

Failed to execute task local variable 'return_code' referenced before assignment.
Failed to execute task [Errno 2] No such file or directory: '/tmp/tmp7l296jgg'.                                                                                  
Task airflow.executors.celery_executor.execute_command[24d3f5c5-bf58-4aad-bf2a-c10b2781a2b2] raised unexpected: 
AirflowException('Celery command failed on host: 
Traceback (most recent call last):                                                                                                                                                                                  
  File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 412, in trace_task                                                                                                              
    R = retval = fun(*args, **kwargs)                                                                                                                                                                               
  File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/trace.py", line 704, in __protected_call__                                                                                                      
    return self.run(*args, **kwargs)                                                                                                                                                                                
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 87, in execute_command                                                                                         
    _execute_in_fork(command_to_exec)                                                                                                                                                                               
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 98, in _execute_in_fork                                                                                        
    raise AirflowException('Celery command failed on host: ' + get_hostname())                                                                                                                                      
airflow.exceptions.AirflowException: Celery command failed on host: hotdoc-airflow-worker-0.hotdoc-airflow-worker.hotdoc-airflow-staging.svc.cluster.local

What you expected to happen:

I expected the callback function to be called and executed.
It sounds like the null hostname issue contributed to this happening but I am not familiar enough with Airflow internals to say for sure. I had a dig through the source code and it looks like some queries are made to list out tasks and other metadata.

def handle_callback(self, dagrun, success=True, reason=None, session=None):

How to reproduce it:

Create a dag with a function that fails and an error callback function

import sys
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator

def on_failure(ctx):
    print('hello world')
    print(ctx)

def always_fails():
    sys.exit(1)

dag = DAG(
    dag_id='always_fails',
    description='dag that always fails',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2021,7,12),
    on_failure_callback=on_failure
)

task = PythonOperator(task_id='test-error-notifier', python_callable=always_fails, dag=dag)

Run the dag and check if the on_failure_callback is called.

@gh4n gh4n added the kind:bug This is a clearly a bug label Jul 14, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Jul 14, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@ephraimbuddy
Copy link
Contributor

Can you test this on airflow 2.1.1?

@trucnguyenlam
Copy link

@ephraimbuddy I can confirm that the issue exists on 2.1.1

@ashb ashb added the affected_version:2.1 Issues Reported for 2.1 label Jul 14, 2021
@trucnguyenlam
Copy link

trucnguyenlam commented Jul 14, 2021

I think the root cause might be from this

schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False)

apparently callback is disabled.

@samgans
Copy link
Contributor

samgans commented Jul 14, 2021

If you are not going to make a PR for this, I can try to make an investigation and fix this.

@trucnguyenlam
Copy link

@samgans I might not have time to investigate more on this issue, but it might be more complicated than it sounds, my comment above is only the surface, underneath it's more about message passing in multiple processes of DagFileProcessorProcess. Please try to make more investigation on this, thanks.

@uranusjr
Copy link
Member

@trucnguyenlam Callback execution is disabled here, but that disabled callback is returned as the second value here (callback_to_run). That callback is then passed on for execution later, via _send_dag_callbacks_to_processor on the next line. So I don’t think that’s the cause.

@trucnguyenlam
Copy link

trucnguyenlam commented Jul 15, 2021

@uranusjr yeah, I also came to that conclusion yesterday but didn't have time to go further, the callback somehow lost on the connection to DagFileProcessorProcess afterwards

@trucnguyenlam
Copy link

@uranusjr how is it going with the investigation?

@samgans
Copy link
Contributor

samgans commented Jul 16, 2021

UPD:

I have been testing the interaction and found that if we are passing the callable to the on_failure_callback DAG parameter directly, it doesn't work. But if we are configuring the on_failure_callback using default_args dictionary, the callback is getting called.

Continue review...

@ephraimbuddy
Copy link
Contributor

The on_failure_callback is a task argument and not DAG level argument.
The correct dag is:

import sys
from datetime import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator

def on_failure(ctx):
    print('hello world')
    print(ctx)

def always_fails():
    sys.exit(1)

dag = DAG(
    dag_id='always_fails',
    description='dag that always fails',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2021,7,12),
)

task = PythonOperator(task_id='test-error-notifier', python_callable=always_fails,on_failure_callback=on_failure, dag=dag)

@trucnguyenlam
Copy link

@ephraimbuddy I think there is some misleading here, it is a valid argument for a DAG object

on_failure_callback: Optional[DagStateChangeCallback] = None,

@ephraimbuddy ephraimbuddy reopened this Jul 18, 2021
@ephraimbuddy
Copy link
Contributor

Oops

@ephraimbuddy ephraimbuddy changed the title on_failure_callback is not executed on_failure_callback on DAG level is not executed Jul 18, 2021
@ephraimbuddy
Copy link
Contributor

So this too works fine, however, the logs end in :
$AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log
as stated here:

.. note: The logs end up in
``$AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log``

I tested with your dag. Let me know what you think

@trucnguyenlam
Copy link

@ephraimbuddy thanks for providing more information, I will check out the log location if it's executed, could you also let me know if there is a way to log those in the main scheduler log (only if it's applicable)?

@ephraimbuddy
Copy link
Contributor

I'm not sure there's a way to get the log into the main scheduler log, however, you can ask on Discussion or slack

@trucnguyenlam
Copy link

@ephraimbuddy I checked the location of the log, the callback does get handled. So it is technically working for me. Thanks.

@ephraimbuddy
Copy link
Contributor

Cool. Let's close the issue then

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.1 Issues Reported for 2.1 kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

6 participants