Skip to content

Commit

Permalink
Merge pull request #6436 from cylc/8.3.x-sync
Browse files Browse the repository at this point in the history
🤖 Merge 8.3.x-sync into master
  • Loading branch information
hjoliver authored Oct 22, 2024
2 parents a536629 + df89a77 commit 4aa492d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
1 change: 1 addition & 0 deletions changes.d/6433.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ignore requests to trigger or set active tasks with --flow=none.
14 changes: 13 additions & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1926,6 +1926,12 @@ def set_prereqs_and_outputs(

# Set existing task proxies.
for itask in itasks:
if flow == ['none'] and itask.flow_nums != set():
LOG.error(
f"[{itask}] ignoring 'flow=none' set: task already has"
f" {stringify_flow_nums(itask.flow_nums, full=True)}"
)
continue
self.merge_flows(itask, flow_nums)
if prereqs:
self._set_prereqs_itask(itask, prereqs, flow_nums)
Expand Down Expand Up @@ -2168,8 +2174,14 @@ def force_trigger_tasks(

# Trigger active tasks.
for itask in existing_tasks:
if flow == ['none'] and itask.flow_nums != set():
LOG.error(
f"[{itask}] ignoring 'flow=none' trigger: task already has"
f" {stringify_flow_nums(itask.flow_nums, full=True)}"
)
continue
if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
LOG.warning(f"[{itask}] ignoring trigger - already active")
LOG.error(f"[{itask}] ignoring trigger - already active")
continue
self.merge_flows(itask, flow_nums)
self._force_trigger(itask)
Expand Down
22 changes: 19 additions & 3 deletions tests/integration/test_flow_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@
"""Test for flow-assignment in triggered/set tasks."""

import functools
import logging
import time
from typing import Callable

import pytest

from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE
from cylc.flow.flow_mgr import (
FLOW_ALL,
FLOW_NEW,
FLOW_NONE,
stringify_flow_nums
)
from cylc.flow.scheduler import Scheduler


Expand Down Expand Up @@ -76,7 +82,9 @@ async def test_get_flow_nums(one: Scheduler, start):


@pytest.mark.parametrize('command', ['trigger', 'set'])
async def test_flow_assignment(flow, scheduler, start, command: str):
async def test_flow_assignment(
flow, scheduler, start, command: str, log_filter: Callable
):
"""Test flow assignment when triggering/setting tasks.
Active tasks:
Expand All @@ -102,7 +110,7 @@ async def test_flow_assignment(flow, scheduler, start, command: str):
}
id_ = flow(conf)
schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True)
async with start(schd):
async with start(schd) as log:
if command == 'set':
do_command: Callable = functools.partial(
schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[]
Expand All @@ -128,6 +136,14 @@ async def test_flow_assignment(flow, scheduler, start, command: str):
# (no-flow is ignored for active tasks)
do_command([active_a.identity], flow=[FLOW_NONE])
assert active_a.flow_nums == {1, 2}
assert log_filter(
log,
contains=(
f'[{active_a}] ignoring \'flow=none\' {command}: '
f'task already has {stringify_flow_nums(active_a.flow_nums)}'
),
level=logging.ERROR
)

do_command([active_a.identity], flow=[FLOW_NEW])
assert active_a.flow_nums == {1, 2, 3}
Expand Down

0 comments on commit 4aa492d

Please sign in to comment.