From a5fe590322f8e08454564e9edb8c9528e9453e9b Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Wed, 13 Sep 2023 13:14:07 +0100 Subject: [PATCH 1/5] Fix pre-mature evaluation of tasks in mapped task group Getting the relevant upstream indexes of a task instance in a mapped task group should only be done when the task has expanded. If the task has not expanded yet, we should return None so that the task can wait for the upstreams before trying to run. This issue is more noticeable when the trigger rule is ONE_FAILED because then, the task instance is marked as SKIPPED. This commit fixes this issue. closes: https://github.com/apache/airflow/issues/34023 --- airflow/ti_deps/deps/trigger_rule_dep.py | 5 ++++ tests/ti_deps/deps/test_trigger_rule_dep.py | 31 +++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index ca2a6100a27846..5349cd2e4c6e4b 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -146,6 +146,11 @@ def _get_relevant_upstream_map_indexes(upstream_id: str) -> int | range | None: expanded_ti_count = _get_expanded_ti_count() except (NotFullyPopulated, NotMapped): return None + if ti.map_index < 0: + # This can happen in mapped task groups. + # The current task is not expanded yet even though the mapped group has expanded. + # We should return None + return None return ti.get_relevant_upstream_map_indexes( upstream=ti.task.dag.task_dict[upstream_id], ti_count=expanded_ti_count, diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py b/tests/ti_deps/deps/test_trigger_rule_dep.py index 00cbcd449af3ef..0917c827cc1b27 100644 --- a/tests/ti_deps/deps/test_trigger_rule_dep.py +++ b/tests/ti_deps/deps/test_trigger_rule_dep.py @@ -1407,3 +1407,34 @@ def w2(): (status,) = self.get_dep_statuses(dr, "w2", flag_upstream_failed=True, session=session) assert status.reason.startswith("All setup tasks must complete successfully") assert self.get_ti(dr, "w2").state == expected + + +def test_mapped_tasks_in_mapped_task_group_waits_for_upstreams_to_complete(dag_maker, session): + """Test that one failed trigger rule works well in mapped task group""" + with dag_maker() as dag: + + @dag.task + def t1(): + return [1, 2, 3] + + @task_group("tg1") + def tg1(a): + @dag.task() + def t2(a): + return a + + @dag.task(trigger_rule=TriggerRule.ONE_FAILED) + def t3(a): + return a + + t2(a) >> t3(a) + + t = t1() + tg1.expand(a=t) + + dr = dag_maker.create_dagrun() + ti = dr.get_task_instance(task_id="t1") + ti.run() + dr.task_instance_scheduling_decisions() + ti3 = dr.get_task_instance(task_id="tg1.t3") + assert not ti3.state From 6a43f267a03fe93bb1a8da729182d8efb22bed60 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 15 Sep 2023 18:16:29 +0100 Subject: [PATCH 2/5] fixup! Fix pre-mature evaluation of tasks in mapped task group --- airflow/ti_deps/deps/trigger_rule_dep.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index 5349cd2e4c6e4b..4f6910873d4815 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -132,6 +132,18 @@ def _get_expanded_ti_count() -> int: """ return ti.task.get_mapped_ti_count(ti.run_id, session=session) + def _iter_expansion_dependencies() -> Iterator[str]: + from airflow.models.mappedoperator import MappedOperator + + if isinstance(ti.task, MappedOperator): + for op in ti.task.iter_mapped_dependencies(): + yield op.task_id + task_group = ti.task.task_group + if task_group: + groups = task_group.iter_mapped_task_groups() + if groups: + yield from (op.task_id for tg in groups for op in tg.iter_mapped_dependencies()) + @functools.lru_cache def _get_relevant_upstream_map_indexes(upstream_id: str) -> int | range | None: """Get the given task's map indexes relevant to the current ti. @@ -142,15 +154,12 @@ def _get_relevant_upstream_map_indexes(upstream_id: str) -> int | range | None: """ if TYPE_CHECKING: assert isinstance(ti.task.dag, DAG) + if upstream_id not in set(_iter_expansion_dependencies()): + return None try: expanded_ti_count = _get_expanded_ti_count() except (NotFullyPopulated, NotMapped): return None - if ti.map_index < 0: - # This can happen in mapped task groups. - # The current task is not expanded yet even though the mapped group has expanded. - # We should return None - return None return ti.get_relevant_upstream_map_indexes( upstream=ti.task.dag.task_dict[upstream_id], ti_count=expanded_ti_count, From be8202c144bbe2883794b0f207df67f81bcb3784 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 9 Oct 2023 15:02:24 +0100 Subject: [PATCH 3/5] fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group --- airflow/ti_deps/deps/trigger_rule_dep.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index 4f6910873d4815..2e4b6be1cc783a 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -139,10 +139,12 @@ def _iter_expansion_dependencies() -> Iterator[str]: for op in ti.task.iter_mapped_dependencies(): yield op.task_id task_group = ti.task.task_group - if task_group: - groups = task_group.iter_mapped_task_groups() - if groups: - yield from (op.task_id for tg in groups for op in tg.iter_mapped_dependencies()) + if task_group and task_group.iter_mapped_task_groups(): + yield from ( + op.task_id + for tg in task_group.iter_mapped_task_groups() + for op in tg.iter_mapped_dependencies() + ) @functools.lru_cache def _get_relevant_upstream_map_indexes(upstream_id: str) -> int | range | None: From cb1237b24ce9e9c6d2adccd3201bef4b82b601ee Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 16 Oct 2023 07:32:59 +0100 Subject: [PATCH 4/5] fixup! fixup! fixup! Fix pre-mature evaluation of tasks in mapped task group --- airflow/ti_deps/deps/trigger_rule_dep.py | 6 ++++-- tests/ti_deps/deps/test_trigger_rule_dep.py | 16 ++++++++++------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index 2e4b6be1cc783a..6203b2a79b6d11 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -27,6 +27,7 @@ from airflow.models.taskinstance import PAST_DEPENDS_MET from airflow.ti_deps.deps.base_ti_dep import BaseTIDep from airflow.utils.state import TaskInstanceState +from airflow.utils.task_group import MappedTaskGroup from airflow.utils.trigger_rule import TriggerRule as TR if TYPE_CHECKING: @@ -156,8 +157,9 @@ def _get_relevant_upstream_map_indexes(upstream_id: str) -> int | range | None: """ if TYPE_CHECKING: assert isinstance(ti.task.dag, DAG) - if upstream_id not in set(_iter_expansion_dependencies()): - return None + if isinstance(ti.task.task_group, MappedTaskGroup): + if upstream_id not in set(_iter_expansion_dependencies()): + return None try: expanded_ti_count = _get_expanded_ti_count() except (NotFullyPopulated, NotMapped): diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py b/tests/ti_deps/deps/test_trigger_rule_dep.py index 0917c827cc1b27..1bc8808cb8b2ba 100644 --- a/tests/ti_deps/deps/test_trigger_rule_dep.py +++ b/tests/ti_deps/deps/test_trigger_rule_dep.py @@ -1165,19 +1165,23 @@ def _one_scheduling_decision_iteration() -> dict[tuple[str, int], TaskInstance]: tis = _one_scheduling_decision_iteration() assert sorted(tis) == [("tg.t1", 0), ("tg.t1", 1), ("tg.t1", 2)] - # After running the first t1, the first t2 becomes immediately available. + # After running the first t1, the remaining t1 must be run before t2 is available. tis["tg.t1", 0].run() tis = _one_scheduling_decision_iteration() - assert sorted(tis) == [("tg.t1", 1), ("tg.t1", 2), ("tg.t2", 0)] + assert sorted(tis) == [("tg.t1", 1), ("tg.t1", 2)] - # Similarly for the subsequent t2 instances. + # After running all t1, t2 is available. + tis["tg.t1", 1].run() tis["tg.t1", 2].run() tis = _one_scheduling_decision_iteration() - assert sorted(tis) == [("tg.t1", 1), ("tg.t2", 0), ("tg.t2", 2)] + assert sorted(tis) == [("tg.t2", 0), ("tg.t2", 1), ("tg.t2", 2)] - # But running t2 partially does not make t3 available. - tis["tg.t1", 1].run() + # Similarly for t2 instances. They both have to complete before t3 is available tis["tg.t2", 0].run() + tis = _one_scheduling_decision_iteration() + assert sorted(tis) == [("tg.t2", 1), ("tg.t2", 2)] + + # But running t2 partially does not make t3 available. tis["tg.t2", 2].run() tis = _one_scheduling_decision_iteration() assert sorted(tis) == [("tg.t2", 1)] From ff281d0bb2150a2a751140f2f7cd39fcaaadcef2 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Tue, 17 Oct 2023 10:49:53 +0100 Subject: [PATCH 5/5] Fix tests --- tests/models/test_mappedoperator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/models/test_mappedoperator.py b/tests/models/test_mappedoperator.py index 7244c55774840c..5c2e23c1f9e30c 100644 --- a/tests/models/test_mappedoperator.py +++ b/tests/models/test_mappedoperator.py @@ -1305,8 +1305,8 @@ def file_transforms(filename): states = self.get_states(dr) expected = { "file_transforms.my_setup": {0: "success", 1: "failed", 2: "skipped"}, - "file_transforms.my_work": {0: "success", 1: "upstream_failed", 2: "skipped"}, - "file_transforms.my_teardown": {0: "success", 1: "upstream_failed", 2: "skipped"}, + "file_transforms.my_work": {2: "upstream_failed", 1: "upstream_failed", 0: "upstream_failed"}, + "file_transforms.my_teardown": {2: "success", 1: "success", 0: "success"}, } assert states == expected