Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pre-mature evaluation of tasks in mapped task group #34337

Merged
merged 5 commits into from
Nov 1, 2023

Conversation

ephraimbuddy
Copy link
Contributor

Getting the relevant upstream indexes of a task instance in a mapped task group should only be done when the task has expanded. If the task has not expanded yet, we should return None so that the task can wait for the upstreams before trying to run.
This issue is more noticeable when the trigger rule is ONE_FAILED because then, the task instance is marked as SKIPPED.
This commit fixes this issue.
closes: #34023

@@ -146,6 +146,11 @@ def _get_relevant_upstream_map_indexes(upstream_id: str) -> int | range | None:
expanded_ti_count = _get_expanded_ti_count()
except (NotFullyPopulated, NotMapped):
return None
if ti.map_index < 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why this case does not raise NotFullyPopulated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From this

"""Get how many tis the current task is supposed to be expanded into.
, it seems like it's the supposed number it could expand into but I think this case is because the group has expanded but the task hasn't?

Copy link
Member

@uranusjr uranusjr Sep 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I digged around a bit, this counts as expanded because there is indeed enough information to expand the operator, but the other depended task (t2) has not finished. This makes me think the fix is not right; we should investigate why t2 is not considered relevant instead. We need to somehow distinguish upstreams that are depended on as a DAG structure (e.g. with >>), or depended on for expansion (what _get_relevant_upstream_map_indexes is for); for >> dependencies we need to depend on the entire task regardless of map indexes instead.

@uranusjr
Copy link
Member

uranusjr commented Sep 15, 2023

I think the fix to this would be we should check whether an upstream is a dependency used for task expansion. If it is, we go through the normal “relevant map index” check; if not, we need to depend on its entirety i.e. _get_relevant_upstream_map_indexes should return None.

This change seems to work for me:

def _iter_expansion_dependencies() -> Iterator[str]:
    from airflow.models.mappedoperator import MappedOperator

    if isinstance(ti.task, MappedOperator):
        for op in ti.task.iter_mapped_dependencies():
            yield op.task_id
    for tg in ti.task.task_group.iter_mapped_task_groups():
        for op in tg.iter_mapped_dependencies():
            yield op.task_id

def _get_relevant_upstream_map_indexes(upstream_id: str) -> int | range | None:
    if upstream_id not in set(_iter_expansion_dependencies()):
        return None
    # Same as before...

This is obviously not optimised and I want to add a better abstraction for this. But @ephraimbuddy maybe you can help check if this can fix the actual DAG in the reported issue.

@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Sep 15, 2023

I think the fix to this would be we should check whether an upstream is a dependency used for task expansion. If it is, we go through the normal “relevant map index” check; if not, we need to depend on its entirety i.e. _get_relevant_upstream_map_indexes should return None.

This change seems to work for me:

def _iter_expansion_dependencies() -> Iterator[str]:
    from airflow.models.mappedoperator import MappedOperator

    if isinstance(ti.task, MappedOperator):
        for op in ti.task.iter_mapped_dependencies():
            yield op.task_id
    for tg in ti.task.task_group.iter_mapped_task_groups():
        for op in tg.iter_mapped_dependencies():
            yield op.task_id

def _get_relevant_upstream_map_indexes(upstream_id: str) -> int | range | None:
    if upstream_id not in set(_iter_expansion_dependencies()):
        return None
    # Same as before...

This is obviously not optimised and I want to add a better abstraction for this. But @ephraimbuddy maybe you can help check if this can fix the actual DAG in the reported issue.

It works. Feel free to optimize it on this PR. Also I modified the code to make mypy happy

Edit:
@uranusjr, this works however, all the upstream tasks have to be completed before the task can run. This might not go well with the fast fail feature where if we have multiple upstream then the first fail will trigger a failure on its downstream even if some upstream tasks are still running. If this behaviour is ok for the mapped task group, I can go ahead and address the failing test.

@uranusjr
Copy link
Member

all the upstream tasks have to be completed before the task can run. This might not go well with the fast fail feature where if we have multiple upstream then the first fail will trigger a failure on its downstream even if some upstream tasks are still running.

Does this mean with this PR the downstream will run when the first failure happens, or when all tasks (success or not) finish?

@ephraimbuddy
Copy link
Contributor Author

ephraimbuddy commented Oct 16, 2023

when all tasks (success or not) finish?

Yes. When all tasks are completed before the downstream

@ephraimbuddy
Copy link
Contributor Author

@uranusjr, Do you think I should fix the tests and get this ready?

@ephraimbuddy ephraimbuddy force-pushed the fix-mapped-tg branch 3 times, most recently from 4dce93c to a9aa3bb Compare October 19, 2023 08:49
@eladkal eladkal added this to the Airflow 2.7.3 milestone Oct 27, 2023
@eladkal eladkal added the type:bug-fix Changelog: Bug Fixes label Oct 27, 2023
Getting the relevant upstream indexes of a task instance in a mapped task group
should only be done when the task has expanded. If the task has not expanded yet,
we should return None so that the task can wait for the upstreams before trying
to run.
This issue is more noticeable when the trigger rule is ONE_FAILED because then,
the task instance is marked as SKIPPED.
This commit fixes this issue.
closes: apache#34023
@ephraimbuddy ephraimbuddy merged commit 69938fd into apache:main Nov 1, 2023
45 checks passed
@ephraimbuddy ephraimbuddy deleted the fix-mapped-tg branch November 1, 2023 20:37
ephraimbuddy added a commit that referenced this pull request Nov 1, 2023
* Fix pre-mature evaluation of tasks in mapped task group

Getting the relevant upstream indexes of a task instance in a mapped task group
should only be done when the task has expanded. If the task has not expanded yet,
we should return None so that the task can wait for the upstreams before trying
to run.
This issue is more noticeable when the trigger rule is ONE_FAILED because then,
the task instance is marked as SKIPPED.
This commit fixes this issue.
closes: #34023

* fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* Fix tests

(cherry picked from commit 69938fd)
romsharon98 pushed a commit to romsharon98/airflow that referenced this pull request Nov 10, 2023
* Fix pre-mature evaluation of tasks in mapped task group

Getting the relevant upstream indexes of a task instance in a mapped task group
should only be done when the task has expanded. If the task has not expanded yet,
we should return None so that the task can wait for the upstreams before trying
to run.
This issue is more noticeable when the trigger rule is ONE_FAILED because then,
the task instance is marked as SKIPPED.
This commit fixes this issue.
closes: apache#34023

* fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* fixup! fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group

* Fix tests
ephraimbuddy added a commit to astronomer/airflow that referenced this pull request Nov 15, 2023
ephraimbuddy added a commit that referenced this pull request Nov 15, 2023
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Nov 20, 2023
@ephraimbuddy ephraimbuddy removed the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Nov 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:bug-fix Changelog: Bug Fixes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Trigger Rule ONE_FAILED does not work in task group with mapped tasks
3 participants