Skip to content

Commit

Permalink
Fix clear future recursive when ExternalTaskMarker is used (#9515)
Browse files Browse the repository at this point in the history
(cherry picked from commit 4454224)
  • Loading branch information
yuqian90 authored and kaxil committed Aug 15, 2020
1 parent ee345bb commit 0ab6139
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
2 changes: 1 addition & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ def clear(
instances = tis.all()
for ti in instances:
if ti.operator == ExternalTaskMarker.__name__:
ti.task = self.get_task(ti.task_id)
ti.task = copy.copy(self.get_task(ti.task_id))

if recursion_depth == 0:
# Maximum recursion depth allowed is the recursion_depth of the first
Expand Down
28 changes: 26 additions & 2 deletions tests/sensors/test_external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,12 +433,12 @@ def assert_ti_state_equal(task_instance, state):
assert task_instance.state == state


def clear_tasks(dag_bag, dag, task):
def clear_tasks(dag_bag, dag, task, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE):
"""
Clear the task and its downstream tasks recursively for the dag in the given dagbag.
"""
subdag = dag.sub_dag(task_regex="^{}$".format(task.task_id), include_downstream=True)
subdag.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, dag_bag=dag_bag)
subdag.clear(start_date=start_date, end_date=end_date, dag_bag=dag_bag)


# pylint: disable=redefined-outer-name
Expand All @@ -456,6 +456,30 @@ def test_external_task_marker_transitive(dag_bag_ext):
assert_ti_state_equal(ti_b_3, State.NONE)


def test_external_task_marker_future(dag_bag_ext):
"""
Test clearing tasks with no end_date. This is the case when users clear tasks with
Future, Downstream and Recursive selected.
"""
date_0 = DEFAULT_DATE
date_1 = DEFAULT_DATE + timedelta(days=1)

tis_date_0 = run_tasks(dag_bag_ext, execution_date=date_0)
tis_date_1 = run_tasks(dag_bag_ext, execution_date=date_1)

dag_0 = dag_bag_ext.get_dag("dag_0")
task_a_0 = dag_0.get_task("task_a_0")
# This should clear all tasks on dag_0 to dag_3 on both date_0 and date_1
clear_tasks(dag_bag_ext, dag_0, task_a_0, end_date=None)

ti_a_0_date_0 = tis_date_0["task_a_0"]
ti_b_3_date_0 = tis_date_0["task_b_3"]
ti_b_3_date_1 = tis_date_1["task_b_3"]
assert_ti_state_equal(ti_a_0_date_0, State.NONE)
assert_ti_state_equal(ti_b_3_date_0, State.NONE)
assert_ti_state_equal(ti_b_3_date_1, State.NONE)


def test_external_task_marker_exception(dag_bag_ext):
"""
Clearing across multiple DAGs should raise AirflowException if more levels are being cleared
Expand Down

0 comments on commit 0ab6139

Please sign in to comment.