Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferrable Operators leave task in strange state on kill #19929

Closed
1 of 2 tasks
eskarimov opened this issue Dec 1, 2021 · 3 comments
Closed
1 of 2 tasks

Deferrable Operators leave task in strange state on kill #19929

eskarimov opened this issue Dec 1, 2021 · 3 comments
Labels
affected_version:2.2 Issues Reported for 2.2 area:async-operators AIP-40: Deferrable ("Async") Operators area:core kind:bug This is a clearly a bug Stale Bug Report

Comments

@eskarimov
Copy link
Contributor

Apache Airflow version

2.2.2 (latest released)

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

./breeze start-airflow --python 3.7 --backend postgres --use-airflow-version 2.2.2

What happened

When a task was killed while being executed by Triggerer process, there's no log available for this task after kill.
Also, on_kill function of Operator isn't called.
Later on, if the same task is started again, it finishes immediately, like it was continued after being deferred.

What you expected to happen

Task is killed:
AIrflowKilledTimeDeltaAsync

Log is empty:
AirflowEmptyLog

Log appears if I clear the task, the task instance is finished right away:

Log file content
*** Reading local file: /root/airflow/logs/example_time_delta_async/time_delta_async/2021-12-01T11:11:29.762988+00:00/1.log
[2021-12-01, 12:11:31 CET] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [queued]>
[2021-12-01, 12:11:31 CET] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [queued]>
[2021-12-01, 12:11:31 CET] {taskinstance.py:1239} INFO - 
--------------------------------------------------------------------------------
[2021-12-01, 12:11:31 CET] {taskinstance.py:1240} INFO - Starting attempt 1 of 1
[2021-12-01, 12:11:31 CET] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2021-12-01, 12:11:31 CET] {taskinstance.py:1260} INFO - Executing <Task(TimeDeltaSensorAsync): time_delta_async> on 2021-12-01 11:11:29.762988+00:00
[2021-12-01, 12:11:31 CET] {standard_task_runner.py:52} INFO - Started process 442 to run task
[2021-12-01, 12:11:31 CET] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'example_time_delta_async', 'time_delta_async', 'manual__2021-12-01T11:11:29.762988+00:00', '--job-id', '9', '--raw', '--subdir', 'DAGS_FOLDER/example_time_delta_async.py', '--cfg-path', '/tmp/tmpgdon4hbb', '--error-file', '/tmp/tmpqa1kftrc']
[2021-12-01, 12:11:31 CET] {standard_task_runner.py:77} INFO - Job 9: Subtask time_delta_async
[2021-12-01, 12:11:31 CET] {logging_mixin.py:109} INFO - Running <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [running]> on host e403e88ccb5c
[2021-12-01, 12:11:31 CET] {taskinstance.py:1427} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_time_delta_async
AIRFLOW_CTX_TASK_ID=time_delta_async
AIRFLOW_CTX_EXECUTION_DATE=2021-12-01T11:11:29.762988+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-01T11:11:29.762988+00:00
[2021-12-01, 12:11:31 CET] {taskinstance.py:1343} INFO - Pausing task as DEFERRED. dag_id=example_time_delta_async, task_id=time_delta_async, execution_date=20211201T111129, start_date=20211201T111131
[2021-12-01, 12:11:31 CET] {local_task_job.py:154} INFO - Task exited with return code 0
[2021-12-01, 12:11:31 CET] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
[2021-12-01, 12:14:00 CET] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [queued]>
[2021-12-01, 12:14:00 CET] {taskinstance.py:1035} INFO - Dependencies all met for <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [queued]>
[2021-12-01, 12:14:00 CET] {taskinstance.py:1239} INFO - 
--------------------------------------------------------------------------------
[2021-12-01, 12:14:00 CET] {taskinstance.py:1240} INFO - Starting attempt 1 of 1
[2021-12-01, 12:14:00 CET] {taskinstance.py:1241} INFO - 
--------------------------------------------------------------------------------
[2021-12-01, 12:14:00 CET] {taskinstance.py:1260} INFO - Executing <Task(TimeDeltaSensorAsync): time_delta_async> on 2021-12-01 11:11:29.762988+00:00
[2021-12-01, 12:14:00 CET] {standard_task_runner.py:52} INFO - Started process 519 to run task
[2021-12-01, 12:14:00 CET] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'example_time_delta_async', 'time_delta_async', 'manual__2021-12-01T11:11:29.762988+00:00', '--job-id', '11', '--raw', '--subdir', 'DAGS_FOLDER/example_time_delta_async.py', '--cfg-path', '/tmp/tmpikbpc_re', '--error-file', '/tmp/tmpqzcatu80']
[2021-12-01, 12:14:00 CET] {standard_task_runner.py:77} INFO - Job 11: Subtask time_delta_async
[2021-12-01, 12:14:00 CET] {logging_mixin.py:109} INFO - Running <TaskInstance: example_time_delta_async.time_delta_async manual__2021-12-01T11:11:29.762988+00:00 [running]> on host e403e88ccb5c
[2021-12-01, 12:14:00 CET] {taskinstance.py:1427} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_time_delta_async
AIRFLOW_CTX_TASK_ID=time_delta_async
AIRFLOW_CTX_EXECUTION_DATE=2021-12-01T11:11:29.762988+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-01T11:11:29.762988+00:00
[2021-12-01, 12:14:00 CET] {taskinstance.py:1278} INFO - Marking task as SUCCESS. dag_id=example_time_delta_async, task_id=time_delta_async, execution_date=20211201T111129, start_date=20211201T111400, end_date=20211201T111400
[2021-12-01, 12:14:00 CET] {local_task_job.py:154} INFO - Task exited with return code 0
[2021-12-01, 12:14:00 CET] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

How to reproduce

  • Create a DAG file with the following content:
from datetime import datetime, timedelta

from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensorAsync


with DAG(
    dag_id='example_time_delta_async',
    start_date=datetime(2021, 1, 1),
    catchup=False,
    dagrun_timeout=timedelta(minutes=60),
) as dag:
    time_delta_async_sensor = TimeDeltaSensorAsync(task_id='time_delta_async',
                                                   delta=timedelta(seconds=60),
                                                   execution_timeout=timedelta(seconds=60),
                                                   )

if __name__ == "__main__":
    dag.cli()
  • Start the DAG
  • Mark task time_delta_async as Failed while it's being deferred

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@eskarimov eskarimov added area:core kind:bug This is a clearly a bug labels Dec 1, 2021
@potiuk potiuk added defferrable-operators area:async-operators AIP-40: Deferrable ("Async") Operators and removed deferrable-operators labels Jan 3, 2022
@Greetlist
Copy link

Later on, if the same task is started again, it finishes immediately, like it was continued after being deferred.

I have a same problem in #19612. Hope it can help you. @eskarimov

When a task was killed while being executed by Triggerer process, there's no log available for this task after kill.
Also, on_kill function of Operator isn't called.

Triggerer process's log may have the detail log about it, task's try_number is decreased by TaskInstance._defer_task so that if this Deferrable Taks is killed, it's try_number is alway 0,so you can not see the log.

@github-actions
Copy link

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

@github-actions
Copy link

This issue has been closed because it has not received response from the issue author.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.2 Issues Reported for 2.2 area:async-operators AIP-40: Deferrable ("Async") Operators area:core kind:bug This is a clearly a bug Stale Bug Report
Projects
None yet
Development

No branches or pull requests

4 participants