Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BranchPythonOperator skipping following tasks #10725

Closed
CatarinaSilva opened this issue Sep 4, 2020 · 15 comments · Fixed by #10751
Closed

BranchPythonOperator skipping following tasks #10725

CatarinaSilva opened this issue Sep 4, 2020 · 15 comments · Fixed by #10751
Labels
kind:bug This is a clearly a bug

Comments

@CatarinaSilva
Copy link
Contributor

CatarinaSilva commented Sep 4, 2020

Apache Airflow version:

1.10.12

What happened:

Seems that from 1.10.12 the behavior from BranchPythonOperator was reversed. A task after all branches would be excluded from the skipped tasks before but now it is skipped. This prevents empty branches.

What you expected to happen:

Tasks after all branches should respect the trigger_rule and not be automatically skipped by the branch operator

How to reproduce it:

Run following code in a dag:

    def needs_some_extra_task(some_bool_field, **kwargs):
        if some_bool_field:
            return f"extra_task"
        else:
            return f"final_task"

    branch_op = BranchPythonOperator(
        dag=dag,
        task_id=f"branch_task",
        provide_context=True,
        python_callable=needs_some_extra_task,
        op_kwargs={"some_bool_field": True},  # For purposes of showing the problem
    )

    # will be always ran in this example
    extra_op = DummyOperator(
        dag=dag,
        task_id=f"extra_task",
    )
    extra_op.set_upstream(branch_op)

    # will be skipped
    final_op = DummyOperator(
        dag=dag,
        task_id="final_task",
        trigger_rule="none_failed_or_skipped",
    )
    final_op.set_upstream([extra_op, branch_op])

Temporary workaround:

Add dummy operators to any branch with no tasks

@CatarinaSilva CatarinaSilva added the kind:bug This is a clearly a bug label Sep 4, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 4, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@kaxil
Copy link
Member

kaxil commented Sep 4, 2020

@yuqian90 Do you think this might have been caused by #7276?

If you have time can you try an reproduce this bug?

@yuqian90
Copy link
Contributor

yuqian90 commented Sep 4, 2020

Thanks for bringing this up. Definitely looks related to #7276.

Let me clarify the problem first. @CatarinaSilva 's example looks like this. The expected end state should be both extra_task and final_task are in success. But the actual behaviour looks like this. extra_task is success, final_task is skipped.

image

I'm sorry that I did not anticipate empty branch cases when working on #7276. That said, if we actually look at the docstr of BranchPythonOperator, it says this:

    ... expects a Python function that returns
    a single task_id or list of task_ids to follow. The task_id(s) returned
    should point to a task directly downstream from {self}. All other "branches"
    or directly downstream tasks are marked with a state of ``skipped`` so that
    these paths can't move forward.

In this case, both extra_task and final_task are directly downstream of branch_task. Its python_callable returned extra_task. So it now faithfully does what its docstr said, follow extra_task and skip the others.

I understand this sounds counter-intuitive. The problem is NotPreviouslySkippedDep tells Airflow final_task should be skipped because it is directly downstream of a BranchPythonOperator that decided to follow another branch. At the same time, TriggerRuleDep says that final_task can be run because its trigger_rule none_failed_or_skipped is satisfied.

To remove this ambiguity, may I suggest changing the python_callable to this:

    def needs_some_extra_task(some_bool_field, **kwargs):
        if some_bool_field:
            return ["extra_task", "final_task"]
        else:
            return ["final_task"]

i.e. always include "final_task" in the return because the intention is to always run "final_task" no matter what the branching outcome is.

The DAG will then look like this and the behaviour will be as expected:

image

If this solution is not satisfactory, please let me know and I'll think about something better.

@HenryLinTw
Copy link

Just want to bring this up because we also encountered this.

If what mentioned above is eventually finalized, the example in the doc here: https://airflow.apache.org/docs/stable/concepts.html?highlight=branch#trigger-rules should be updated accordingly to avoid confusion.

@dimon222
Copy link
Contributor

dimon222 commented Sep 4, 2020

Okay, I thought I'm alone, but then saw #10686 and now this, I start to think that its a regression.
I'm also affected, what is the proposed way around this? Adding dummy operators doesn't sound great.

@kaxil
Copy link
Member

kaxil commented Sep 4, 2020

@yuqian90 Looks like more people rely on this feature, would you be able to make a PR that restores this behavior to allow empty branch please

@yuqian90
Copy link
Contributor

yuqian90 commented Sep 5, 2020

@kaxil will do

@CatarinaSilva
Copy link
Contributor Author

Thanks @yuqian90 , however the logic you proposed:

    def needs_some_extra_task(some_bool_field, **kwargs):
        if some_bool_field:
            return ["extra_task", "final_task"]
        else:
            return ["final_task"]

seems good to me, for some reason I didn't think I could have the same task in both sides of the branch 😅 I will use that until the behavior is restored and maybe even after honestly. What version will your PR be included in?

@yuqian90
Copy link
Contributor

yuqian90 commented Sep 8, 2020

Hi, @CatarinaSilva , yes the workaround will work

Thanks @yuqian90 , however the logic you proposed:

    def needs_some_extra_task(some_bool_field, **kwargs):
        if some_bool_field:
            return ["extra_task", "final_task"]
        else:
            return ["final_task"]

seems good to me, for some reason I didn't think I could have the same task in both sides of the branch 😅 I will use that until the behavior is restored and maybe even after honestly. What version will your PR be included in?

Hi, @CatarinaSilva the workaround should continue to work after the fix is merged. So you can start using it right now and after too.

@kaxil I'm hoping the fix #10751 can be merged in the next release of 1.10.* although the PR is for master.

@kaxil
Copy link
Member

kaxil commented Sep 8, 2020

Thanks for the quick workaround @yuqian90 , I will look at the PR and merge it soon

@marcoaaguiar
Copy link
Contributor

Regarding of what @yuqian90 said, the question is should we fix the documentation to match the behavior (and the example) or should we fix the behavior and the examples to match the documentation.

TBH the previous behavior seems more logical, where task are only skipped if there is no task running or to be run that points to it.
At least from a user perspective that is what I expect.

kaxil pushed a commit that referenced this issue Sep 22, 2020
closes: #10725

Make sure SkipMixin.skip_all_except() handles empty branches like this properly. When "task1" is followed, "join" must not be skipped even though it is considered to be immediately downstream of "branch".
@kaxil
Copy link
Member

kaxil commented Sep 22, 2020

Thanks, @yuqian90 -- If it is not much work can you create a backport PR for this targeting v1-10-test?

@yuqian90
Copy link
Contributor

Thanks, @yuqian90 -- If it is not much work can you create a backport PR for this targeting v1-10-test?

Will do.

@yuqian90
Copy link
Contributor

Regarding of what @yuqian90 said, the question is should we fix the documentation to match the behavior (and the example) or should we fix the behavior and the examples to match the documentation.

TBH the previous behavior seems more logical, where task are only skipped if there is no task running or to be run that points to it.
At least from a user perspective that is what I expect.

BranchPythonOperator skips child tasks if they are not returned by python_callable. The ambiguity only happens when there are empty branches. Either way won't be perfect. This PR aims to keep the behaviour the same as before 1.10.12.

@yuqian90
Copy link
Contributor

@yuqian90 Looks like more people rely on this feature, would you be able to make a PR that restores this behavior to allow empty branch please

@kaxil please see #11120

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants