Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ShortCircuitOperator does not properly retry when python_callable exits unexpectedly #36809

Closed
2 tasks done
vlieven opened this issue Jan 16, 2024 · 3 comments · Fixed by #36986
Closed
2 tasks done

ShortCircuitOperator does not properly retry when python_callable exits unexpectedly #36809

vlieven opened this issue Jan 16, 2024 · 3 comments · Fixed by #36986
Labels
area:core kind:bug This is a clearly a bug

Comments

@vlieven
Copy link
Contributor

vlieven commented Jan 16, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.7.3

What happened?

If the python_callable used by a ShortCircuitOperator does not properly return a boolean, but instead crashes, the retry and downstream behavior is not deterministic. Either:

  • the task properly retries, succeeds and downstream tasks are triggered
  • the task does not properly retry, downstream tasks are marked as upstream_failed
  • the task does retry, is not marked as success, but downstream tasks are triggered

What you think should happen instead?

In case the callable crashes, it should be treated as a failure, and retry logic should be properly executed until the task completes successfully.

How to reproduce

As the behavior is not deterministic, I've created 10 parallel flows to demonstrate the issue. A crash is simulated by exit(-1).

This DAG file should demonstrate the broken behavior:

from airflow import DAG
from airflow.models.taskinstance import TaskInstance
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import ShortCircuitOperator
from datetime import datetime, timedelta


default_args = {
    "owner": "Test",
    "depends_on_past": False,
    "start_date": datetime.now() - timedelta(days=2),
    "retries": 1,
    "retry_delay": timedelta(seconds=5),
}


def python_callable(ti: TaskInstance):
    if ti.try_number % 2 == 1:
        exit(-1)
    else:
        return True


with DAG(
    "test_shortcircuit",
    default_args=default_args,
    schedule_interval="@daily",
) as test_dag:
    for i in range(10):
        ShortCircuitOperator(
            task_id=f"short-{i}", python_callable=python_callable,
        ) >> EmptyOperator(
            task_id=f"empty-{i}"
        )

The resulting grid view is the following:

Screenshot 2024-01-16 at 09 14 08

Operating System

Amazon Linux 2

Versions of Apache Airflow Providers

This is using only the built-in operators

Deployment

Other Docker-based deployment

Deployment details

Running on a Kubernetes cluster

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@vlieven vlieven added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jan 16, 2024
@Taragolis
Copy link
Contributor

I check on the main and this kind of bug exists.
However it not only affects ShortCircuitOperator, I reproduced on regular task flow operators

from __future__ import annotations

from typing import TYPE_CHECKING

from airflow import DAG
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup

if TYPE_CHECKING:
    from airflow.models import TaskInstance


with DAG(dag_id="issue_36809", schedule=None, tags=["issue", "36809", "zombie?"]):
    @task(retries=1, retry_delay=5)
    def flakey_failure(ti: TaskInstance = None) -> int:
        if (try_number := ti.try_number) == 1:
            raise SystemExit(-1)
        return try_number

    @task
    def my_task(i: int):
        print(f"Upstream try number: {i}")

    for ix in range(1, 10 + 1):
        with TaskGroup(group_id=f"group_{ix:02d}"):
            my_task(flakey_failure())

Seems like the problem with zombie detection/resolution. In addition found interesting bug, on next DagRun when task failed it also might marks (with high probability) as failed even if it previously run successfully

Unsaved Image 4

@Taragolis Taragolis removed the needs-triage label for new issues that we didn't triage yet label Jan 17, 2024
@avkirilishin
Copy link
Contributor

@vlieven, there is an issue with using exit() to simulate a crash. I hope it will be fixed in #36986, but I'm not certain if this fix will resolve your original problem. Could you please check it?

@vlieven
Copy link
Contributor Author

vlieven commented Jan 26, 2024

Hi @avkirilishin, thanks for your work on the PR, it looks like this will definitely improve the behavior!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants