Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deferrable operator tasks do not call on_kill method when fail or restarted #36090

Open
nirben82 opened this issue Dec 6, 2023 · 42 comments
Open
Labels
area:async-operators AIP-40: Deferrable ("Async") Operators area:core kind:bug This is a clearly a bug

Comments

@nirben82
Copy link

nirben82 commented Dec 6, 2023

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

When marking a task that is using a deferrable operators as failed or cleared, on_kill method is not called and the remote job is never stopped.

Task log output:

*** Found local files:
***   * /usr/local/google/home/nirben/airflow/logs/dag_id=dag_test/run_id=manual__2023-12-06T13:22:54.698489+00:00/task_id=bigquery_task/attempt=30.log
***   * /usr/local/google/home/nirben/airflow/logs/dag_id=dag_test/run_id=manual__2023-12-06T13:22:54.698489+00:00/task_id=bigquery_task/attempt=30.log.trigger.13880.log
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dag_test.bigquery_task manual__2023-12-06T13:22:54.698489+00:00 [queued]>
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dag_test.bigquery_task manual__2023-12-06T13:22:54.698489+00:00 [queued]>
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1308} INFO - Starting attempt 30 of 30
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1327} INFO - Executing <Task(BigQueryInsertJobOperator): bigquery_task> on 2023-12-06 13:22:54.698489+00:00
[2023-12-06, 14:14:11 UTC] {standard_task_runner.py:57} INFO - Started process 629728 to run task
[2023-12-06, 14:14:11 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'dag_test', 'bigquery_task', 'manual__2023-12-06T13:22:54.698489+00:00', '--job-id', '13896', '--raw', '--subdir', 'DAGS_FOLDER/dag_test.py', '--cfg-path', '/tmp/tmpxnqe4ysz']
[2023-12-06, 14:14:11 UTC] {standard_task_runner.py:85} INFO - Job 13896: Subtask bigquery_task
[2023-12-06, 14:14:11 UTC] {task_command.py:410} INFO - Running <TaskInstance: dag_test.bigquery_task manual__2023-12-06T13:22:54.698489+00:00 [running]> on host nirben-ws1.tlv.corp.google.com
[2023-12-06, 14:14:11 UTC] {taskinstance.py:1545} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='some-owner' AIRFLOW_CTX_DAG_ID='dag_test' AIRFLOW_CTX_TASK_ID='bigquery_task' AIRFLOW_CTX_EXECUTION_DATE='2023-12-06T13:22:54.698489+00:00' AIRFLOW_CTX_TRY_NUMBER='30' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-12-06T13:22:54.698489+00:00'
[2023-12-06, 14:14:11 UTC] {base.py:73} INFO - Using connection ID 'some-connection' for task execution.
[2023-12-06, 14:14:11 UTC] {bigquery.py:2799} INFO - Executing: {'query': {'query': 'SELECT * FROM table, 'useLegacySql': False, 'priority': 'batch', 'allowLargeResults': True, 'destinationTable': {'datasetId': 'datasetId', 'projectId': 'projectId', 'tableId': 'some_table_test'}, 'flattenResults': False, 'writeDisposition': 'WRITE_TRUNCATE', 'createDisposition': 'CREATE_IF_NEEDED'}}'
[2023-12-06, 14:14:11 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2023-12-06, 14:14:11 UTC] {logging_mixin.py:150} WARNING - /usr/local/google/home/nirben/venv/waze-data/lib/python3.8/site-packages/google/auth/_default.py:78 UserWarning: Your application has authenticated using end user credentials from Google Cloud SDK without a quota project. You might receive a "quota exceeded" or "API not enabled" error. See the following page for troubleshooting: https://cloud.google.com/docs/authentication/adc-troubleshooting/user-creds.
[2023-12-06, 14:14:12 UTC] {bigquery.py:1596} INFO - Inserting job airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_5bf2e5098a664fd1d54ec9b9b75d077b
[2023-12-06, 14:14:13 UTC] {bigquery.py:51} INFO - Using the connection  some-connection .
[2023-12-06, 14:14:13 UTC] {taskinstance.py:1415} INFO - Pausing task as DEFERRED. dag_id=dag_test, task_id=bigquery_task, execution_date=20231206T132254, start_date=20231206T141411
[2023-12-06, 14:14:13 UTC] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
[2023-12-06, 14:14:14 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_pipelines' for task execution.
[2023-12-06, 14:14:15 UTC] {bigquery.py:93} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
[2023-12-06, 14:14:19 UTC] {bigquery.py:93} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
[2023-12-06, 14:14:21 UTC] {triggerer_job_runner.py:625} ERROR - Trigger cancelled; message=```

### What you think should happen instead

I think that the trigger should be stopped and task instance should continue the same [behavior](https://github.com/apache/airflow/blob/d2514b408cb98f792289a5d032aaf85fe605350d/airflow/models/taskinstance.py#L2452) that is done by any non-deferrable tasks.

### How to reproduce

1. Invoke a run of the below dag and after the task is in state `defer`, mark it as `failed` or clear it. 

    The task log ends with the below text and the job in BQ does not stop.
    ```
    [2023-12-06, 14:14:14 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_pipelines' for task execution.
    [2023-12-06, 14:14:15 UTC] {bigquery.py:93} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
    [2023-12-06, 14:14:19 UTC] {bigquery.py:93} INFO - Bigquery job status is running. Sleeping for 4.0 seconds.
    [2023-12-06, 14:14:21 UTC] {triggerer_job_runner.py:625} ERROR - Trigger cancelled; message=
    ```

2. Change to `deferrable=False`,  Invoke a run of the dag and after the task is in state `running` and the job started in BQ, mark it as `failed` or clear it. 

    The task log ends with the below text and the job in BQ stops.
    ```
    [2023-12-06, 14:13:06 UTC] {bigquery.py:1596} INFO - Inserting job airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
    [2023-12-06, 14:13:36 UTC] {local_task_job_runner.py:291} WARNING - State of this instance has been externally set to failed. Terminating instance.
    [2023-12-06, 14:13:36 UTC] {process_utils.py:131} INFO - Sending Signals.SIGTERM to group 629540. PIDs of all processes in the group: [629540]
    [2023-12-06, 14:13:36 UTC] {process_utils.py:86} INFO - Sending the signal Signals.SIGTERM to group 629540
    [2023-12-06, 14:13:36 UTC] {taskinstance.py:1517} ERROR - Received SIGTERM. Terminating subprocesses.
    [2023-12-06, 14:13:36 UTC] {bigquery.py:1487} INFO - Attempting to cancel job : project-id, airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
    [2023-12-06, 14:13:37 UTC] {bigquery.py:1508} INFO - Waiting for canceled job project-id, airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985 to finish.
    [2023-12-06, 14:13:43 UTC] {bigquery.py:1499} INFO - Job successfully canceled: project-id, airflow_dag_test_bigquery_task_2023_12_06T13_22_54_698489_00_00_9238ba6363d9b8cca1e4a1fd26b7e985
    [2023-12-06, 14:13:43 UTC] {process_utils.py:79} INFO - Process psutil.Process(pid=629540, status='terminated', exitcode=0, started='16:13:05') (629540) terminated with exit code 0
    ```

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

default_args = {
'owner': 'owner',
'start_date': datetime(2023, 12, 5),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=15)
}

dag = DAG('dag_test', default_args=default_args, catchup=False)

query_that_runs_for_a_few_minutes = """
SELECT * FROM some-large-table
"""

client_driver_events_task = BigQueryInsertJobOperator(
task_id='bigquery_task',
gcp_conn_id='google_cloud_pipelines',
dag=dag,
configuration={
'query': {
'query': query_that_runs_for_a_few_minutes.strip(),
'useLegacySql': False,
'priority': 'batch',
"allowLargeResults": True,
"destinationTable": {
"datasetId": "datasetId",
"projectId": "projectId",
"tableId": "tableId"
},
'flattenResults': False,
'writeDisposition': 'WRITE_TRUNCATE',
'createDisposition': 'CREATE_IF_NEEDED'
}
},
deferrable=True)


### Operating System

Debian GNU

### Versions of Apache Airflow Providers

apache-airflow==2.6.2
apache-airflow-providers-google==10.9.0

### Deployment

Official Apache Airflow Helm Chart

### Deployment details

_No response_

### Anything else

_No response_

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
@nirben82 nirben82 added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Dec 6, 2023
Copy link

boring-cyborg bot commented Dec 6, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@nirben82 nirben82 changed the title Deferrable operator tasks don't call on_kill when fail or restarted Deferrable operator tasks do not call on_kill method when fail or restarted Dec 6, 2023
@tirkarthi
Copy link
Contributor

tirkarthi commented Dec 6, 2023

Related : #19929 . It seems this was reported earlier but got closed as stale issue. We would also find this feature useful to cancel remote jobs tracked by triggerer on clear/fail.

@potiuk potiuk removed the needs-triage label for new issues that we didn't triage yet label Dec 11, 2023
@potiuk
Copy link
Member

potiuk commented Dec 11, 2023

Yes. Probably it should be changed. The problem is that this would have to bring the task from deferred state first to run it's on_kill method, but that would require someone to implement the logic for that.

@nirben82
Copy link
Author

Thanks @potiuk.

  1. Do you know if someone is currently working on this?
  2. Could you please provide some more specifics on what needs to be implemented, so we can understand if our team can prioritize adding the logic?
  3. Is it also possible to update previous minor version 2.6? This is mainly so we can continue being aligned with Cloud Composer that is currently on 2.6.3.

@potiuk
Copy link
Member

potiuk commented Dec 13, 2023

1, Not that I am aware of. But you are asking random maintainer who might not know everything. In OSS everything is in the open so if this is not mentioned here, there is no other place where someone working on it would be recorded.

  1. It's really part of the task to design it. This one will likely require Airflow Improvement Proposal or at the very least discussion with the devlist outlining the proposal. Shortly you will need to add a whole new internal processing on how to make sure to instantiate the task in a worker when it is deferred to execute on_kiill in the worker. Currently on_kill is run when task is exiting from running state - so when it is already handled by the worker. Deferred task is handled by Triggerer by runing triggers, not real "task" code - just triggers that task deferred to. So what is needed is to decide who and how will schedule a task that should only execute on_kill method in the worker when the task is set to killed statis and is currently deferred to Trigger. This has not been discussed nor designed yet, so it has to start (and the person/people doing it must be capable of) with making desing and proposla how to implement it, lead the discussion in devlist and get approval on the proposal. So you really need someone not to execute precise instructions but somoene who will be able to propose and lead discussion on the whole design of it.

  2. No. This is against our versioning policies https://airflow.apache.org/docs/apache-airflow/stable/release-process.html - SemVer forbids adding tasks (this is a new feature), Also we only release new things in exactly one currently active branch. As of now we are voting on release candidate for 2.8.0 and the current main is 2.9.0dev0 so the first thing this feature might be available is 2.9.0 - but it woudl reqiure to start preparing the design and making proposal as soon as possible.

@kaxil kaxil added this to the Airflow 2.10.0 milestone Mar 7, 2024
@RNHTTR RNHTTR added the area:async-operators AIP-40: Deferrable ("Async") Operators label Apr 6, 2024
@pustoshilov-d
Copy link
Contributor

We're working with Slurm deferrable operator and waiting this feature so much 🙏

@potiuk
Copy link
Member

potiuk commented Apr 11, 2024

We're working with Slurm deferrable operator and waiting this feature so much 🙏

Just a kind reminder: if you and your company wait for a feature badly, the most certain way to get it is to spend engineering effort and contribute it back. Other than that the only thing to do is to 🙏 that someone will do it. Also paying someone to contribute such feature works.

This is how open-source project works.

@Lee-W
Copy link
Member

Lee-W commented Apr 12, 2024

Assign it ot @sunank200 as he already created a PR to avoid others work on it

@tirkarthi
Copy link
Contributor

I guess this issue needs a more general solution across core triggered and the linked PR is more specific to a particular operator. One of the solutions we are trying to implement is to have something like on_cancel executed on on CancellationError but a defined interface to implement like run method for cancellation would help users.

cc: @andrewgodwin

@tirkarthi
Copy link
Contributor

I just found out that there is already a cleanup method as part of the interface. The problem is that cleanup is called during triggerer restarts too as part of deployment due to which we don't want to cleanup like deleting remote jobs since the trigger will start tracking invalid jobs, So I thought of this change where there could be a marker attribute set only when trigger is cancelled as part of to_cancel loop and call my custom cleanup function. This helps with executing cleanup when tasks are cleared or marked success/failure. I tried using contextvar but for some reason the update from triggerer_job_runner is not propagated to the trigger. In Python 3.9 the msg parameter was added to Task.cancel with which custom messages can be propagated as part of CancelledError in Python 3.11 and above. This doesn't fully address the issue but thought to add my analysis in case if someone finds the patch/approach useful.

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/triggers/base/index.html#airflow.triggers.base.BaseTrigger.cleanup
https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel

class CustomTrigger(BaseTrigger):

    async def cleanup(self):
        """
        cleanup is called when trigger is cancelled by the triggerer which can happen as part of to_cancel loop
        and also when triggerer exits/restarts but we want to execute cleanup only when it's part of the cancel loop
        since triggerer could be restarted during deployment or marked as unhealthy due to which we don't want to
        do cleanup like deleting jobs tracked upstream which becomes an issue as the triggerer starts to track invalid jobs.

        If the trigger is cancelled from the to_cancel loop then the trigger is not present in the database
        with _cancelled_from_job_runner set as True with which the custom cleanup is executed.
        E.g. cleared task instance where trigger is cancelled
        """
        cancelled_from_runner = getattr(self, "_cancelled_from_job_runner", False)

        if cancelled_from_runner:
            self.custom_cleanup()

        await super().cleanup()
diff --git a/airflow/jobs/triggerer_job_runner.py b/airflow/jobs/triggerer_job_runner.py
index bb151b32cc..e80a910008 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -497,6 +497,7 @@ class TriggerRunner(threading.Thread, LoggingMixin):
                     "name": f"{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} "
                     f"(ID {trigger_id})",
                     "events": 0,
+                    "trigger": trigger_instance
                 }
             else:
                 self.log.warning("Trigger %s had insertion attempted twice", trigger_id)
@@ -512,6 +513,11 @@ class TriggerRunner(threading.Thread, LoggingMixin):
             trigger_id = self.to_cancel.popleft()
             if trigger_id in self.triggers:
                 # We only delete if it did not exit already
+                # These are tasks cancelled by triggerer since they are not found in the database
+                # E.g. task instance cleared from UI. _cancelled_from_job_runner is set so that
+                # our cleanup is executed only as needed and not during triggerer process shutdown
+                # when cancel is called to call cleanup but this attribut is not present.
+                self.triggers[trigger_id]["trigger"]._cancelled_from_job_runner = True
                 self.triggers[trigger_id]["task"].cancel()
             await asyncio.sleep(0)

@dh-racheldean
Copy link

dh-racheldean commented Apr 15, 2024

If i was feeling super-hacky in my own fork, I'd be tweaking models.Trigger.clean_unused(), and adapting submit_failure to do the hilarious re-schedule. In theory, just those two would work - it's similar theory that the check_trigger_timeouts fn that the scheduler uses to kill the tasks + triggers...
Edit: primary goal with this method, being that it doesn't care about the trigger implementation, and mimics an existing/proven use-path - just from the other direction. Please note, this is results of a 30 minute explore, so excuse any massive oversights in the concept ;)

@thesuperzapper
Copy link
Contributor

thesuperzapper commented May 5, 2024

WARNING

The proposed solution of capturing asyncio.CancelledError in a try/except is NOT safe!
The following PRs have implemented this:

These PRs will result in the external job being canceled if the triggerer itself is restarted (or crashes), not just when users set the state of a deferred task to "success", "failed", or "clear".

Also note, Airflow will be unaware that the external job has been canceled, and will reschedule the deferred operator on another triggerer instance (which could cause all kinds of strange behaviour).

It makes more sense to find a way for Airflow itself to run BaseOperator.on_kill(), even if the operator is deferred while it is killed (either manually, or by failure).

However, as I am sure vendors will want their deferred operators to work correctly (when users set deferred tasks to "clear", "success" or "failed") here is a possible workaround (which needs testing).

Possible Workaround

We can still capture asyncio.CancelledError, but ONLY cancel the external job if the TaskInstance is NOT in a running or deferred state. That is, if airflow still thinks the job is running or deferred, we probably should not kill the external job.

EDIT: the original solution did not handle the case that the task was "cleared" causing the task to be rescheduled, and be deferred again. Fixing this requires us to distinguish if the TaskInstance for the run is the same one that created the deferred operator which is being canceled. Luckily we can do this by comparing the job_id. I have updated the example below with this fix.

Here is a basic triggerer which pretends to run an external job. It shows if it has "canceled" the job, by writing to /tmp/testing/on_kill_deferred/{dag_id}/{task_id}/log_trigger.txt.

import asyncio
import os
from datetime import timedelta
from typing import Any, AsyncIterator, Dict, Optional

import pendulum
from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.models.taskinstance import TaskInstance
from airflow.settings import Session
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone
from airflow.utils.context import Context
from airflow.utils.dates import days_ago
from airflow.utils.session import provide_session
from airflow.utils.state import TaskInstanceState
from pendulum.datetime import datetime


# define a trigger that sleeps until a given datetime, and then sends an event
# and "handles" `asyncio.CancelledError` by writing to a file
class DateTimeTriggerWithCancel(BaseTrigger):
    def __init__(
        self,
        dag_id: str,
        task_id: str,
        run_id: str,
        map_index: int,
        job_id: int,
        statement_name: str,
        moment: datetime.datetime,
    ):
        super().__init__()
        self.dag_id = dag_id
        self.task_id = task_id
        self.run_id = run_id
        self.map_index = map_index
        self.job_id = job_id
        self.statement_name = statement_name

        # set and validate the moment
        if not isinstance(moment, datetime.datetime):
            raise TypeError(
                f"Expected 'datetime.datetime' type for moment. Got '{type(moment)}'"
            )
        elif moment.tzinfo is None:
            raise ValueError("You cannot pass naive datetime")
        else:
            self.moment: pendulum.DateTime = timezone.convert_to_utc(moment)

    def serialize(self) -> tuple[str, dict[str, Any]]:
        #
        # TODO: for Airflow 2.6.0+ you can get the `TaskInstance` from `self.task_instance` which removes
        #       the need to store `dag_id`, `task_id`, `run_id`, `map_index`, and `job_id` in the trigger
        #       serialization. However, you STILL NEED TO QUERY for the latest TaskInstance state.
        #
        return (
            "test_on_kill_deferred.DateTimeTriggerWithCancel",
            {
                "dag_id": self.dag_id,
                "task_id": self.task_id,
                "run_id": self.run_id,
                "map_index": self.map_index,
                "job_id": self.job_id,
                "statement_name": self.statement_name,
                "moment": self.moment,
            },
        )

    @provide_session
    def get_task_instance(self, session: Session) -> TaskInstance:
        query = session.query(TaskInstance).filter(
            TaskInstance.dag_id == self.dag_id,
            TaskInstance.task_id == self.task_id,
            TaskInstance.run_id == self.run_id,
            TaskInstance.map_index == self.map_index,
        )
        task_instance = query.one_or_none()
        if task_instance is None:
            raise AirflowException(
                "TaskInstance with dag_id: %s, task_id: %s, run_id: %s and map_index: %s is not found",
                self.dag_id,
                self.task_id,
                self.run_id,
                self.map_index,
            )
        return task_instance

    def safe_to_cancel(self) -> bool:
        """
        Whether it is safe to cancel the external job which is being executed by this trigger.

        This is to avoid the case that `asyncio.CancelledError` is called because the trigger
        itself is stopped. Because in those cases, we should NOT cancel the external job.
        """
        # Database query is needed to get the latest state of the task instance.
        task_instance = self.get_task_instance()

        # If the current job_id is different from when the trigger was created,
        # then we should cancel the external job we are waiting on because the task has been
        # cleared and a new job has been created.
        if int(task_instance.job_id) != int(self.job_id):
            return True

        # If the task is not in a deferred state, then something else has happened to the task
        # since we were deferred (e.g. a manual state change), so we should cancel the external
        # job we are waiting on.
        return task_instance.state != TaskInstanceState.DEFERRED

    async def run(self) -> AsyncIterator[TriggerEvent]:
        self.log.info("trigger starting")
        try:
            # Sleep a second at a time
            while self.moment > pendulum.instance(timezone.utcnow()):
                self.log.info("sleeping 1 second...")
                await asyncio.sleep(1)

            # Send our single event and then we're done
            self.log.info("yielding event with payload %r", self.moment)
            yield TriggerEvent(
                {
                    "statement_name": self.statement_name,
                    "status": "success",
                    "moment": self.moment,
                }
            )

        except asyncio.CancelledError:
            self.log.info("asyncio.CancelledError was called")
            if self.statement_name:
                if self.safe_to_cancel():
                    self.log.warning("Cancelling query: %s", self.statement_name)

                    # Cancel the query (mock by writing to a file)
                    output_folder = (
                        f"/tmp/testing/on_kill_deferred/{self.dag_id}/{self.task_id}"
                    )
                    os.makedirs(output_folder, exist_ok=True)
                    with open(f"{output_folder}/log_trigger.txt", "a") as f:
                        f.write(
                            f"asyncio.CancelledError was called: {self.statement_name}\n"
                        )
                    yield TriggerEvent({"status": "cancelled"})
                else:
                    self.log.warning("Triggerer probably stopped, not cancelling query")
            else:
                self.log.error("self.statement_name is None")
        except Exception as e:
            self.log.exception("Exception occurred while checking for query completion")
            yield TriggerEvent({"status": "error", "message": str(e)})


# an operator that sleeps for a given number of seconds using a deferred trigger
class TestDeferredOperator(BaseOperator):
    statement_name: Optional[str]
    wait_seconds: int
    moment: Optional[datetime.datetime]

    def __init__(self, wait_seconds: int = 120, **kwargs):
        super().__init__(**kwargs)
        self.wait_seconds = wait_seconds
        self.statement_name = None
        self.moment = None

    def execute(self, context: Context) -> None:
        self.statement_name = (
            f"airflow"
            f"::{self.dag.dag_id}"
            f"::{self.task_id}"
            f"::{pendulum.now(timezone.utc).isoformat()}"
        )
        self.moment = pendulum.instance(timezone.utcnow()).add(
            seconds=self.wait_seconds
        )
        self.defer(
            trigger=DateTimeTriggerWithCancel(
                dag_id=self.dag.dag_id,
                task_id=self.task_id,
                run_id=context["run_id"],
                map_index=context["task_instance"].map_index,
                job_id=context["task_instance"].job_id,
                statement_name=self.statement_name,
                moment=self.moment,
            ),
            method_name="execute_complete",
        )

    def execute_complete(
        self,
        context: Context,
        event: Optional[Dict[str, Any]] = None,
    ) -> None:
        if event is None:
            raise AirflowException("Trigger event is None")

        if event["status"] == "error":
            msg = f"context: {context}, error message: {event['message']}"
            raise AirflowException(msg)

        if event["status"] == "cancelled":
            self.log.info(f"external job was cancelled: {self.statement_name}")
            return

        self.log.info("%s completed successfully.", self.task_id)

    def on_kill(self):
        output_folder = (
            f"/tmp/testing/on_kill_deferred/{self.dag.dag_id}/{self.task_id}"
        )
        os.makedirs(output_folder, exist_ok=True)
        with open(f"{output_folder}/log_operator.txt", "a") as f:
            f.write(f"on_kill was called: {self.statement_name}\n")


with DAG(
    dag_id="test_on_kill_deferred",
    schedule_interval="0 0 * * *",
    start_date=days_ago(1),
    dagrun_timeout=timedelta(minutes=60),
) as dag:

    # task 1
    task_1 = TestDeferredOperator(
        task_id="task_1",
        wait_seconds=60,
    )

@thesuperzapper
Copy link
Contributor

For those watching, I have done a little more testing with the workaround proposed in #36090 (comment), and at least in my tests, it works correctly when execution_timeout or defer(timeout=xxxx) is reached.

That is, the self.safe_to_cancel() returns true, and the external job is canceled (because the TaskInstance becomes failed status before terminating the deferred operator). But I would be surprised if there was not some race condition involving timeouts, so more testing is needed.

However, my preference is still finding a way to make on_kill run even when the task is deferred, even if only so that we don't have to tell everyone to update their providers!

@sunank200
Copy link
Collaborator

sunank200 commented May 6, 2024

Same here. I have created the PR for BigQuery/InsertJobOperator based on suggestions. Please review : #39442 I have tested it and it works correctly. cc: @thesuperzapper

Currently, I have used the workaround. I will work on the general solution across core triggerer after these PRs fixed.

@sunank200
Copy link
Collaborator

Please re-review: #39446

@sunank200
Copy link
Collaborator

Please re-review: #39447

@thesuperzapper
Copy link
Contributor

@sunank200 I see you have removed the check for a RUNNING state, and now only check if it's not in a DEFERRED state, for example:

return task_instance.state != TaskInstanceState.DEFERRED

This seems reasonable, and might actually be safer, because it ensures that if the TaskInstance has started RUNNING again (perhaps try number 2+ of the same task), that external jobs triggered by older tries are killed.

@thesuperzapper
Copy link
Contributor

@sunank200 also note that using the self.task_instance of the BaseTrigger will require at least Airflow 2.6.0, it was added by this PR #29482.

Upstream this should be fine, as all providers currently pin apache-airflow>=2.6.0:

But for those wanting to backport this fix into Airflow 2.5.3 (and older), they will need to use something like what I have done in #36090 (comment) and add the dag_id, task_id, and run_id to the serialize() method of the triggerer.

@thesuperzapper
Copy link
Contributor

@sunank200 also we will need to update all the other providers that set an on_kill and can be deferred (unless you are already doing this).

This search might help us find them:

@thesuperzapper
Copy link
Contributor

@sunank200 @akaul my original solution proposed in #36090 (comment) had a critical part missing, which means that the solution you implemented (in the BigQuery and DataProc operators) needs to be updated, along with the unmerged update to the Databricks operator (PR #39373).

The problem was that we would not correctly cancel the external job if the task was CLEARED, rather than being set explicitly to SUCCESS or FAILED. This is because if the task is cleared, the new job will likely end up DEFERRED before the asyncio.CancelledError is even thrown.

I found a solution, which is to update the safe_to_cancel() method to also return True when the job_id of the current TaskInstance has changed since we were deferred (which only happens when the task is rescheduled because it was cleared).

For example, here is the updated safe_to_cancel() definition I am using:

    def safe_to_cancel(self) -> bool:
        """
        Whether it is safe to cancel the external job which is being executed by this trigger.

        This is to avoid the case that `asyncio.CancelledError` is called because the trigger
        itself is stopped. Because in those cases, we should NOT cancel the external job.
        """
        # Database query is needed to get the latest state of the task instance.
        task_instance = self.get_task_instance()

        # If the current job_id is different from when the trigger was created,
        # then we should cancel the external job we are waiting on because the task has been
        # cleared and a new job has been created.
        if int(task_instance.job_id) != int(self.job_id):
            return True

        # If the task is not in a deferred state, then something else has happened to the task
        # since we were deferred (e.g. a manual state change), so we should cancel the external
        # job we are waiting on.
        return task_instance.state != TaskInstanceState.DEFERRED

So people can do more testing, I have updated my reference example in #36090 (comment)

NOTE: my reference is designed to work on all versions of airflow with deferable operators (e.g. 2.4.0+), but can be simplified if we require 2.6.0+ like you have in the upstream providers, see the "TODO" for more context.


@potiuk we might want to consider implementing my workaround as a default method on the BaseTrigger and documenting it, or just explaining the workaround in the official docs about triggerers, because its pretty critical that users know that on_kill is NOT called by triggers when they are manually set as success/failure/clear, as this will result in external jobs not being stopped when users expect.

We also need to update all other operators that currently define on_kill and support being deferred to use the workaround, here is a search that helps us find them:

@sunank200
Copy link
Collaborator

@sunank200 @akaul my original solution proposed in #36090 (comment) had a critical part missing, which means that the solution you implemented (in the BigQuery and DataProc operators) needs to be updated, along with the unmerged update to the Databricks operator (PR #39373).

The problem was that we would not correctly cancel the external job if the task was CLEARED, rather than being set explicitly to SUCCESS or FAILED. This is because if the task is cleared, the new job will likely end up DEFERRED before the asyncio.CancelledError is even thrown.

@thesuperzapper I tried clearing the task for DataprocCreateClusterOperator and it deleted the provisioning cluster for me.

@thesuperzapper
Copy link
Contributor

@thesuperzapper I tried clearing the task for DataprocCreateClusterOperator and it deleted the provisioning cluster for me.

@sunank200 it's a race condition, it will only fail to cancel the external job if after clearing the task becomes deferred again before the exception handler on the triggerer runs.

It's more important for jobs which are very quick to start (like submitting a database query to the Redshift Data API), but you should definitely fix it in your operator too.

@potiuk
Copy link
Member

potiuk commented Jun 14, 2024

Probably worth implementing it on the "airflow" level as you mentioned. Ate you willing to submit a PR on that @thesuperzapper ? Did I understand well?

@thesuperzapper
Copy link
Contributor

Probably worth implementing it on the "airflow" level as you mentioned. Ate you willing to submit a PR on that @thesuperzapper ? Did I understand well?

@potiuk I can definitely help with reviewing PR to implement the workaround above into existing deferred operators that use on_kill, because all of them need to be updated.

In terms of implementing some kind of solution at the airflow level, I'm happy to discuss possible solutions, but since we have no idea what the implementation would be yet, I can't commit to anything.

I guess there are two approaches to a generic solution:

  1. Try and get on_kill to run for deferred operators, probably by rescheduling it or effectively just running it in the exception handler of the trigger like I'm doing above. (The problem is the on_kill method is on the operator, not the triggerer, so it's not available after being deferred)
  2. Just document the above workaround in the deferred operator docs, because It does work, and won't require us to set a new minimum airflow version for some providers. (Note, the only problem is in the extremely rare case that the trigger crashes at the exact moment the exception handler is running, which might mean the external job doesn't get killed, but there are already many similar race conditions in airflow)

@potiuk
Copy link
Member

potiuk commented Jun 14, 2024

As usual with OSS - we need someone to take a lead on that, so yeah - I see you would love to discuss the solution but it needs someone to take the lead and create a PR for that

@eladkal
Copy link
Contributor

eladkal commented Jun 14, 2024

@sunank200 reported that he is working on a generic solution.
@sunank200 is that still the case?

@sunank200
Copy link
Collaborator

Apologies for the late reply, but I have been working on this PR: #40084. I will be able to address this issue after the above PR is completed.

@phanikumv
Copy link
Contributor

@sunank200 do you know if we are going to make it in time for 2.10? If not, let us know so that we can remove it from the milestone

@sunank200
Copy link
Collaborator

Given that I am working on another priority task it seems difficult to make it for 2.10. I will be working on this post #40084

@thesuperzapper
Copy link
Contributor

@sunank200 which approach are we planning to take on this issue?

  1. Implement the workaround I described in Deferrable operator tasks do not call on_kill method when fail or restarted #36090 (comment) for each deferred operator that needs to clean up external resources (that is, the non-deferred version uses on_kill)
  2. Attempting a general fix.

I think we should do both, starting with the workaround, as we should be able to get that in for 2.10.1.

It's very important we do something soon, because users don't expect this behavior, and it can be quite detrimental to your data pipelines if external jobs/resources keep running when you manually mark tasks as failed/success/clear.

@thesuperzapper
Copy link
Contributor

@kaxil @ashb I think this issue is critical to fix before Airflow 3.

Without a fix, deferrable operators that trigger external jobs are not safe to use. Users expect that setting a deferred task to "success", "failed", "cleared" will cancel the external job, which is not currently the case.

We need to go through all the providers which contain deferrable operators and have on_kill defined, and ensure they have at least implemented the workaround described in:

@phanikumv
Copy link
Contributor

@sunank200 is occupied with other developments for Airflow 3. Would you or any one from the community be up for spending some engineering efforts on this @thesuperzapper ?

@kaxil kaxil removed this from the Airflow 2.11.0 milestone Sep 18, 2024
@kaxil
Copy link
Member

kaxil commented Sep 18, 2024

@msumit is also interested in this afaik. Would be great if someone in the community is willing to take this on.

@msumit
Copy link
Contributor

msumit commented Sep 19, 2024

TLDR: I coded the fix for the same problem in a more generic way, which should solve this for all operators. I'll try to raise a diff by the EOW.

@thesuperzapper
Copy link
Contributor

TLDR: I coded the fix for the same problem in a more generic way, which should solve this for all operators. I'll try to raise a diff by the EOW.

@msumit whatever your solution is, make sure that it meets the following requirements:

  • setting a deferred task to failed/succeeded cancels the external job
  • clearing a deferred task cancels the external job (but not the newly scheduled the next try starts and deferred quickly)
  • shutting down the trigger should not cancel the external job of any deferred tasks running on it

@msumit
Copy link
Contributor

msumit commented Sep 23, 2024

Sorry for the bummer folks, I don't think the solution I was thinking of is generic enough, but here are 2 different approaches I could think of

  • There is a cleanup function in BaseTrigger, which I think has not been implemented by any other Trigger class. We can invoke that function when calling cancel of the async task, but yeah that function needs to be implemented in every Trigger class, very much like what @sunank200 did for a couple of classes.

  • We can send back the control to normal executors & call the on_kill method of the operators. This would work for most of the cases, but there are a couple of issues here as well

    • Many operator's "on_kill" function isn't deferred aware, which means it doesn't know that execution is being called from this function directly, and it may be using some information coming out of the execute function.
    • As we've to pass the task back to executors, the user would see strange task state changes, like from success/failure to scheduled & then running & then to the final state. Not sure if it's worth the effort to introduce some transient states like marking_success or being_cancelled etc.

In short, we need an owner here who could either find a generic way or take the lead in making all trigger classes cancel aware.

@thesuperzapper
Copy link
Contributor

@msumit we already have a generic workaround based on my code in: #36090 (comment)

It's not perfect, it doesn't handle the case that the triggerer is force killed (power loss) at the exact same time that the user manually changes the state of the task, but that's a very rare situation (and also applies to non-deferred on_kill during a worker failures).

It relies on catching the asyncio.CancelledError exception and canceling the external job from within the triggerer itself. But only if the current state of the task (read from DB) is not deferred, to avoid canceling when the triggerer is shut down normally.

The maintainers of the providers really need to implement this workaround for all deferrable operators with on_kill, otherwise, we need a warning on each of the deferrable operator docs. Users do not expect this behavior, and leave their external jobs running accidentally.


Airflow 3 Proposal:

My proposal for a unified clean-up solution is to introduce a new TaskInstance state called pending cleanup.

When an operator defines a "cleanup handler", the task would enter this state when it terminates for any reason, including success, failure, and clearing.

The "cleanup handler" is guaranteed to run at least once, and would be passed the following information to help it decide what to do:

  1. Which state the task will go into after cleanup (success, failure, clear, up-for-retry)
  2. How the state changed (if it was manually set, failed naturally, or was cleared)
  3. The parameters of the task
  4. Any return xcom values (if the task succeeded)
  5. Runtime metadata set by the operator author using a new api (important if the task triggers an external job and gets an ID back, which could be required to cancel the external job)

The question then becomes where to run the cleanup handler code. I think it makes sense to run it in the existing triggerers, as we can reuse the existing task serialization behavior, and know that it will run at least once.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:async-operators AIP-40: Deferrable ("Async") Operators area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests