Skip to content
This repository has been archived by the owner on Feb 10, 2024. It is now read-only.

Commit

Permalink
Fix deadlock when chaining multiple empty mapped tasks (apache#27964)
Browse files Browse the repository at this point in the history
The fix here was to set changed_tis to True if there was an expansion.
  • Loading branch information
ephraimbuddy authored and jrggggg committed Dec 1, 2022
1 parent a683736 commit 88342a8
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
1 change: 1 addition & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ def _filter_tis_and_exclude_removed(dag: DAG, tis: list[TI]) -> Iterable[TI]:
# During expansion we may change some tis into non-schedulable
# states, so we need to re-compute.
if expansion_happened:
changed_tis = True
new_unfinished_tis = [t for t in unfinished_tis if t.state in State.unfinished]
finished_tis.extend(t for t in unfinished_tis if t.state in State.finished)
unfinished_tis = new_unfinished_tis
Expand Down
49 changes: 48 additions & 1 deletion tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1943,7 +1943,9 @@ def say_hi():
tis["say_hi"].state = TaskInstanceState.SUCCESS
session.flush()

dr.update_state(session=session)
dr.update_state(session=session) # expands the mapped tasks
dr.update_state(session=session) # marks the task as skipped
dr.update_state(session=session) # marks dagrun as success
assert dr.state == DagRunState.SUCCESS
assert tis["add_one__1"].state == TaskInstanceState.SKIPPED

Expand Down Expand Up @@ -2099,3 +2101,48 @@ def tg(va):
ti.run()
assert len(results) == 1
assert list(results[("t3", -1)]) == [["a", "b"], [4], ["z"]]


def test_mapping_against_empty_list(dag_maker, session):
with dag_maker(session=session):

@task
def add_one(x: int):
return x + 1

@task
def say_hi():
print("Hi")

@task
def say_bye():
print("Bye")

added_values = add_one.expand(x=[])
added_more_values = add_one.expand(x=[])
added_more_more_values = add_one.expand(x=[])
say_hi() >> say_bye() >> added_values
added_values >> added_more_values >> added_more_more_values

dr: DagRun = dag_maker.create_dagrun()

tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
say_hi_ti = tis["say_hi"]
say_bye_ti = tis["say_bye"]
say_hi_ti.state = TaskInstanceState.SUCCESS
say_bye_ti.state = TaskInstanceState.SUCCESS
session.merge(say_hi_ti)
session.merge(say_bye_ti)
session.flush()

dr.update_state(session=session)
dr.update_state(session=session) # marks first empty mapped task as skipped
dr.update_state(session=session) # marks second empty mapped task as skipped
dr.update_state(session=session) # marks the third empty mapped task as skipped and dagrun as success
tis = {ti.task_id: ti.state for ti in dr.get_task_instances(session=session)}
assert tis["say_hi"] == TaskInstanceState.SUCCESS
assert tis["say_bye"] == TaskInstanceState.SUCCESS
assert tis["add_one"] == TaskInstanceState.SKIPPED
assert tis["add_one__1"] == TaskInstanceState.SKIPPED
assert tis["add_one__2"] == TaskInstanceState.SKIPPED
assert dr.state == State.SUCCESS

0 comments on commit 88342a8

Please sign in to comment.