Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Common tasks in downstream of multiple branches always set to skipped due to 'Not Previously Skipped' dependency #10686

Closed
jonasmiederer opened this issue Sep 2, 2020 · 10 comments
Labels
area:core kind:bug This is a clearly a bug pending-response stale Stale PRs per the .github/workflows/stale.yml policy file

Comments

@jonasmiederer
Copy link

Please note: This issue is very similar to #7885, but I think it caused by the new functionality Clearing tasks skipped by SkipMixin will skip them introduced in 1.10.12.
The problem is the same, but the solution (setting the trigger rule of the joined task to none_failed as described in the docs) stopped working after updating to 1.10.12

Apache Airflow version: 1.10.12

What happened:
Two branches join into the same task at one point of their downstream. This joint task will set to be skipped due to one of the two branches being skipped.
Setting the trigger rule of the joined task to none_failed used to work (the task is executed after the chosen branch was executed), but after updating to 1.10.12 the task is always skipped.

Screenshot 2020-09-02 at 09 52 25

If the join task was selected, everything is fine:
Screenshot 2020-09-02 at 09 33 13

What you expected to happen:
The joint task should be executed, although not chosen by the branch operator, because it is a downstream task of the chosen branch.

What do you think went wrong?:
If I look into the task instance details, I can see that the task was skipped "because of previous XCom result from parent task branching", listed as the "Not Previously Skipped" dependency.
Screenshot 2020-09-02 at 09 46 24

I think that is related to the new functionality Clearing tasks skipped by SkipMixin will skip them introduced in 1.10.12, but I'm not sure whether this is a bug or I did something wrong.

How to reproduce it:

import random

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
}

dag = DAG(
    dag_id='branch_test',
    default_args=args,
    schedule_interval="@daily",
    tags=['example']
)

run_this_first = DummyOperator(
    task_id='run_this_first',
    dag=dag,
)

options = ['branch_a']

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=lambda: random.choice(options),
    dag=dag,
)
run_this_first >> branching

join = DummyOperator(
    task_id='join',
    trigger_rule='none_failed',
    dag=dag,
)

for option in options:
    t = DummyOperator(
        task_id=option,
        dag=dag,
    )

    dummy_follow = DummyOperator(
        task_id='follow_' + option,
        dag=dag,
    )

    branching >> t >> dummy_follow >> join

options.append('join')
branching >> join

Has something changed in the functionality how downstream tasks are skipped or is this a bug in the new release?

@jonasmiederer jonasmiederer added the kind:bug This is a clearly a bug label Sep 2, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 2, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@dimon222
Copy link
Contributor

dimon222 commented Sep 3, 2020

I do see similar change in behaviour for case all_done. The result join task for me is having trigger rule all_done, and it seems to have started in 1.10.12 to comply to exact same scenario and mark as skipped, but should instead actually execute.

This does sound like bug to me. Does anyone know?

@kaxil
Copy link
Member

kaxil commented Sep 4, 2020

cc @yuqian90

@KulykDmytro
Copy link
Contributor

KulykDmytro commented Oct 21, 2020

Initial issue which seems to be fixed with AIRFLOW-4453 has been returned back (at least in 1.10.12)
PR: #7464
so none_failed behavior is ignored despite of documentation

    t_ready = DummyOperator(
        task_id = 'calc_ready',
        trigger_rule = 'none_failed',
        dag=dag)

image

Used version: 1.10.12 with KubernetesExecutor

@KulykDmytro
Copy link
Contributor

KulykDmytro commented Oct 21, 2020

Another one example on 1.10.12 shows same behavior (join task skipping) for downstream tasks despite of any trigger_rule set (all_done, none_failed, none_failed_or_skipped) same as all_success

import datetime as dt

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.dates import days_ago
#from airflow.utils.trigger_rule import TriggerRule

dag = DAG(
    dag_id='latest_only_with_trigger',
    schedule_interval=dt.timedelta(hours=4),
    start_date=days_ago(2),
    tags=['example']
)

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
task0 = DummyOperator(task_id='task0', dag=dag)
task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)

task0 >> [task1, task2]
latest_only >> task1 
tr_list =  ['all_done', 'none_failed', 'none_failed_or_skipped']

for tr in tr_list:
    taska = DummyOperator(dag=dag, task_id=f'taska_{tr}', trigger_rule=tr)
    taskb = DummyOperator(task_id=f'taskb_{tr_list.index(tr)}', dag=dag)
    taskc = DummyOperator(task_id=f'taskc_{tr_list.index(tr)}', dag=dag)

    task1 >> [taska, taskb] >> taskc
    task2 >> [taska, taskb]

image

@KulykDmytro
Copy link
Contributor

KulykDmytro commented Oct 21, 2020

Anyhow behavior not corresponds with documentation
Even using a code snippet mentioned there receiving non-expected result (task4 being skipped)
image

@yuqian90
Copy link
Contributor

Hi @KulykDmytro, the original issue reported in #10686 and #10725 has been fixed in #10751 and will be released in the release of airflow.

I assume latest_only is a LatestOnlyOperator? The issue you are reporting here is unlikely caused by Clearing tasks skipped by SkipMixin will skip them. It looks more like a flaw in LatestOnlyOperator itself. And it must have been happening in versions prior to 1.10.12 already. If you look at LatestOnlyOperator implementation, it always skips all its downstream tasks (including indirect downstream tasks). This is probably not what you are expecting. It's more reasonable to make LatestOnlyOperator to skip only the direct downstream tasks. if you create a new issue to address this problem in LatestOnlyOperator, someone will probably help you fix.

        if not left_window < now <= right_window:
            self.log.info('Not latest execution, skipping downstream.')

            downstream_tasks = context['task'].get_flat_relatives(upstream=False)
            self.log.debug("Downstream task_ids %s", downstream_tasks)

            if downstream_tasks:
                self.skip(context['dag_run'],
                          context['ti'].execution_date,
                          downstream_tasks)

@eladkal
Copy link
Contributor

eladkal commented Jun 20, 2021

Tested latest main branch with the reproduced code:

import random

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago

args = {
    'owner': 'Airflow',
    'start_date': days_ago(2),
}

dag = DAG(
    dag_id='branch_test_issue_10686',
    default_args=args,
    schedule_interval="@daily",
    tags=['example']
)

run_this_first = DummyOperator(
    task_id='run_this_first',
    dag=dag,
)

options = ['branch_a']

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=lambda: random.choice(options),
    dag=dag,
)
run_this_first >> branching

join = DummyOperator(
    task_id='join',
    trigger_rule='none_failed',
    dag=dag,
)

for option in options:
    t = DummyOperator(
        task_id=option,
        dag=dag,
    )

    dummy_follow = DummyOperator(
        task_id='follow_' + option,
        dag=dag,
    )

    branching >> t >> dummy_follow >> join

options.append('join')
branching >> join

The branching task gives:
[2021-06-20, 08:33:47 UTC] {python.py:151} INFO - Done. Returned value was: branch_a
[2021-06-20, 08:33:47 UTC] {skipmixin.py:124} INFO - Following branch branch_a
[2021-06-20, 08:33:47 UTC] {skipmixin.py:155} INFO - Skipping tasks []

Screen Shot 2021-06-20 at 11 37 14

This seems fine to me.
Cloud the issue already been resolved?

@github-actions
Copy link

This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Aug 20, 2021
@github-actions
Copy link

github-actions bot commented Sep 3, 2021

This issue has been closed because it has not received response from the issue author.

@github-actions github-actions bot closed this as completed Sep 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug pending-response stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

No branches or pull requests

6 participants