diff --git a/changes.d/6433.fix.md b/changes.d/6433.fix.md new file mode 100644 index 0000000000..92531f0d89 --- /dev/null +++ b/changes.d/6433.fix.md @@ -0,0 +1 @@ +Ignore requests to trigger or set active tasks with --flow=none. diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 3ddf991cb9..e149630d32 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1927,6 +1927,12 @@ def set_prereqs_and_outputs( # Set existing task proxies. for itask in itasks: + if flow == ['none'] and itask.flow_nums != set(): + LOG.error( + f"[{itask}] ignoring 'flow=none' set: task already has" + f" {stringify_flow_nums(itask.flow_nums, full=True)}" + ) + continue self.merge_flows(itask, flow_nums) if prereqs: self._set_prereqs_itask(itask, prereqs, flow_nums) @@ -2169,8 +2175,14 @@ def force_trigger_tasks( # Trigger active tasks. for itask in existing_tasks: + if flow == ['none'] and itask.flow_nums != set(): + LOG.error( + f"[{itask}] ignoring 'flow=none' trigger: task already has" + f" {stringify_flow_nums(itask.flow_nums, full=True)}" + ) + continue if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): - LOG.warning(f"[{itask}] ignoring trigger - already active") + LOG.error(f"[{itask}] ignoring trigger - already active") continue self.merge_flows(itask, flow_nums) self._force_trigger(itask) diff --git a/tests/integration/test_flow_assignment.py b/tests/integration/test_flow_assignment.py index 5816b08527..6c0c58a875 100644 --- a/tests/integration/test_flow_assignment.py +++ b/tests/integration/test_flow_assignment.py @@ -17,12 +17,18 @@ """Test for flow-assignment in triggered/set tasks.""" import functools +import logging import time from typing import Callable import pytest -from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE +from cylc.flow.flow_mgr import ( + FLOW_ALL, + FLOW_NEW, + FLOW_NONE, + stringify_flow_nums +) from cylc.flow.scheduler import Scheduler @@ -76,7 +82,9 @@ async def test_get_flow_nums(one: Scheduler, start): @pytest.mark.parametrize('command', ['trigger', 'set']) -async def test_flow_assignment(flow, scheduler, start, command: str): +async def test_flow_assignment( + flow, scheduler, start, command: str, log_filter: Callable +): """Test flow assignment when triggering/setting tasks. Active tasks: @@ -102,7 +110,7 @@ async def test_flow_assignment(flow, scheduler, start, command: str): } id_ = flow(conf) schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True) - async with start(schd): + async with start(schd) as log: if command == 'set': do_command: Callable = functools.partial( schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[] @@ -128,6 +136,14 @@ async def test_flow_assignment(flow, scheduler, start, command: str): # (no-flow is ignored for active tasks) do_command([active_a.identity], flow=[FLOW_NONE]) assert active_a.flow_nums == {1, 2} + assert log_filter( + log, + contains=( + f'[{active_a}] ignoring \'flow=none\' {command}: ' + f'task already has {stringify_flow_nums(active_a.flow_nums)}' + ), + level=logging.ERROR + ) do_command([active_a.identity], flow=[FLOW_NEW]) assert active_a.flow_nums == {1, 2, 3}