Skip to content

Commit

Permalink
Refactor test_dag and add new test for DAG pause behavior
Browse files Browse the repository at this point in the history
Refactored `test_dag.py` by abstracting repeated code into `add_failed_dag_run` function. This makes the tests more readable and maintainable. Furthermore, a new test was added to verify the 'pause' behavior of a DAG after a limit of consecutive failures has been reached. This ensures that the DAG behaves as expected under failure conditions.
  • Loading branch information
pateash committed Feb 13, 2024
1 parent 835562f commit 081176f
Showing 1 changed file with 8 additions and 11 deletions.
19 changes: 8 additions & 11 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1352,34 +1352,31 @@ def test_new_dag_is_paused_upon_creation(self):
session.close()

def test_existing_dag_is_paused_after_limit(self):
def add_failed_dag_run(dag, id, session):
op1 = BashOperator(task_id="task"+id, bash_command="exit 1;")
dag.add_task(op1)
def add_failed_dag_run(id, execution_date):
dr = dag.create_dagrun(
run_type=DagRunType.MANUAL,
run_id="run_id_"+id,
execution_date=TEST_DATE,
execution_date=execution_date,
state=State.FAILED,
)
ti_op1 = dr.get_task_instance(task_id=op1.task_id, session=session)
ti_op1.set_state(state=TaskInstanceState.FAILED, session=session)
dr.update_state(session=session)
# assert State.FAILED == dr.state

dag_id = "dag_paused_after_limit"
dag = DAG(dag_id, is_paused_upon_creation=False, max_consecutive_failed_dag_runs=1)
dag = DAG(dag_id, is_paused_upon_creation=False, max_consecutive_failed_dag_runs=2)
op1 = BashOperator(task_id="task", bash_command="exit 1;")
dag.add_task(op1)
session = settings.Session()
dag.sync_to_db(session=session)
# it should not follow the pause flag upon creation
assert not dag.get_is_paused()

add_failed_dag_run(dag, "1", session=session)
# add_failed_dag_run(dag, "2", session=session)

# dag should be paused after 2 failed dag_runs
add_failed_dag_run("1", TEST_DATE,)
add_failed_dag_run("2", TEST_DATE + timedelta(days=1))
assert dag.get_is_paused()
dag.clear()
self._clean_up(dag_id)
# now, we will run with 2 failed tasks and check this should be paused

def test_existing_dag_default_view(self):
with create_session() as session:
Expand Down

0 comments on commit 081176f

Please sign in to comment.