From 97f09c7bc807bf7df578b27671acf47a2394c75c Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Tue, 29 Oct 2024 15:29:24 +0000 Subject: [PATCH] `cylc remove`: update data store with changed prereqs; add tests --- cylc/flow/scripts/show.py | 4 ++ cylc/flow/task_pool.py | 21 +++++++- tests/integration/conftest.py | 26 +++++++++- tests/integration/test_examples.py | 10 ++++ tests/integration/test_remove.py | 80 +++++++++++++++++++++++------- 5 files changed, 121 insertions(+), 20 deletions(-) diff --git a/cylc/flow/scripts/show.py b/cylc/flow/scripts/show.py index 7d7bab1dfdc..c9bf6670fcf 100755 --- a/cylc/flow/scripts/show.py +++ b/cylc/flow/scripts/show.py @@ -60,6 +60,7 @@ from cylc.flow.option_parsers import ( CylcOptionParser as COP, ID_MULTI_ARG_DOC, + Options, ) from cylc.flow.terminal import cli_function from cylc.flow.util import BOOL_SYMBOLS @@ -246,6 +247,9 @@ def get_option_parser(): return parser +ShowOptions = Options(get_option_parser()) + + async def workflow_meta_query(workflow_id, pclient, options, json_filter): query = WORKFLOW_META_QUERY query_kwargs = { diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index b7e02e9d68c..8634fb5f902 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -1266,7 +1266,7 @@ def log_unsatisfied_prereqs(self) -> bool: LOG.warning( "Partially satisfied prerequisites:\n" + "\n".join( - f" * {id_} is waiting on {others}" + f" * {id_} is waiting on {sorted(others)}" for id_, others in unsat.items() ) ) @@ -2149,6 +2149,7 @@ def remove_tasks( fnums_to_remove = child_itask.match_flows(flow_nums) if not fnums_to_remove: continue + prereq_changed = False for prereq in ( *child_itask.state.prerequisites, *child_itask.state.suicide_prerequisites, @@ -2156,9 +2157,25 @@ def remove_tasks( for msg in prereq.naturally_satisfied_dependencies(): if msg.get_id() == id_: prereq[msg] = False - self.unqueue_task(child_itask) + prereq_changed = True if id_ not in removed: removed[id_] = fnums_to_remove + break + if (not prereq_changed) or all( + pre.is_satisfied() + for pre in child_itask.state.prerequisites + ): + continue + self.data_store_mgr.delta_task_prerequisite(child_itask) + if ( + # Skip tasks we are already dealing with: + child_itask.identity in matched_task_ids + # Or tasks that are also in other flows: + or child_itask.flow_nums != fnums_to_remove + ): + continue + # Downstream task no longer ready to run + self.unqueue_task(child_itask) # Remove from DB tables: db_removed_fnums = self.workflow_db_mgr.remove_task_from_flows( diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 0abc57024bc..fe4b19ab92b 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -33,6 +33,7 @@ from cylc.flow.config import WorkflowConfig from cylc.flow.id import Tokens +from cylc.flow.network.client import WorkflowRuntimeClient from cylc.flow.option_parsers import Options from cylc.flow.pathutil import get_cylc_run_dir from cylc.flow.rundb import CylcWorkflowDAO @@ -40,6 +41,10 @@ get_option_parser as install_gop, install as cylc_install, ) +from cylc.flow.scripts.show import ( + ShowOptions, + prereqs_and_outputs_query, +) from cylc.flow.scripts.validate import ValidateOptions from cylc.flow.util import serialise_set from cylc.flow.wallclock import get_current_time_string @@ -57,7 +62,6 @@ if TYPE_CHECKING: - from cylc.flow.network.client import WorkflowRuntimeClient from cylc.flow.scheduler import Scheduler from cylc.flow.task_proxy import TaskProxy @@ -697,3 +701,23 @@ async def _reftest( return triggers return _reftest + + +@pytest.fixture +def cylc_show(): + """Fixture that runs `cylc show` on a scheduler, returning JSON object.""" + + async def _cylc_show(schd: 'Scheduler', *task_ids: str) -> dict: + pclient = WorkflowRuntimeClient(schd.workflow) + await schd.update_data_structure() + json_filter: dict = {} + await prereqs_and_outputs_query( + schd.id, + [Tokens(id_, relative=True) for id_ in task_ids], + pclient, + ShowOptions(json=True), + json_filter, + ) + return json_filter + + return _cylc_show diff --git a/tests/integration/test_examples.py b/tests/integration/test_examples.py index d3f6436b729..a0d15ee7289 100644 --- a/tests/integration/test_examples.py +++ b/tests/integration/test_examples.py @@ -23,9 +23,11 @@ import asyncio import logging from pathlib import Path + import pytest from cylc.flow import __version__ +from cylc.flow.scheduler import Scheduler async def test_create_flow(flow, run_dir): @@ -286,3 +288,11 @@ async def test_reftest(flow, scheduler, reftest): ('1/a', None), ('1/b', ('1/a',)), } + + +async def test_show(one: Scheduler, start, cylc_show): + """Demonstrate the `cylc_show` fixture""" + async with start(one): + out = await cylc_show(one, '1/one') + assert list(out.keys()) == ['1/one'] + assert out['1/one']['state'] == 'waiting' diff --git a/tests/integration/test_remove.py b/tests/integration/test_remove.py index f0cb3409b43..858f10f85c3 100644 --- a/tests/integration/test_remove.py +++ b/tests/integration/test_remove.py @@ -31,6 +31,7 @@ def example_workflow(flow): return flow({ 'scheduling': { 'graph': { + # Note: test both `&` and separate arrows for combining 'R1': ''' a1 & a2 => b a3 => b @@ -42,7 +43,8 @@ def example_workflow(flow): def get_data_store_flow_nums(schd: Scheduler, itask: TaskProxy): _, ds_tproxy = schd.data_store_mgr.store_node_fetcher(itask.tokens) - return ds_tproxy.flow_nums + if ds_tproxy: + return ds_tproxy.flow_nums async def test_basic( @@ -52,7 +54,9 @@ async def test_basic( schd: Scheduler = scheduler(example_workflow) async with start(schd): a1 = schd.pool._get_task_by_id('1/a1') + a3 = schd.pool._get_task_by_id('1/a3') schd.pool.spawn_on_output(a1, TASK_OUTPUT_SUCCEEDED) + schd.pool.spawn_on_output(a3, TASK_OUTPUT_SUCCEEDED) await schd.update_data_structure() assert a1 in schd.pool.get_tasks() @@ -253,8 +257,8 @@ async def test_logging_flow_nums( assert schd.pool._get_task_by_id('1/a1').flow_nums == {1} -async def test_ref1(flow, scheduler, run, reflog, complete, log_filter): - """Test prereqs/stall & re-run behaviour when removing tasks.""" +async def test_retrigger(flow, scheduler, run, reflog, complete): + """Test prereqs & re-run behaviour when removing tasks.""" schd: Scheduler = scheduler( flow({ 'scheduling': { @@ -268,28 +272,70 @@ async def test_ref1(flow, scheduler, run, reflog, complete, log_filter): async with run(schd): reflog_triggers: set = reflog(schd) await complete(schd, '1/b') - assert not schd.pool.is_stalled() - assert len(schd.pool.task_queue_mgr.queues['default'].deque) await run_cmd(remove_tasks(schd, ['1/a', '1/b'], [FLOW_ALL])) schd.process_workflow_db_queue() - # Removing 1/b should cause stall because it is prereq of 1/c: + # Removing 1/b should un-queue 1/c: assert len(schd.pool.task_queue_mgr.queues['default'].deque) == 0 - assert schd.pool.is_stalled() - assert log_filter( - logging.WARNING, "1/c is waiting on ['1/b:succeeded']" - ) + assert reflog_triggers == { ('1/a', None), ('1/b', ('1/a',)), } reflog_triggers.clear() - await run_cmd(force_trigger_tasks(schd, ['1/a'], [FLOW_ALL])) - await complete(schd, '1/b') + await run_cmd(force_trigger_tasks(schd, ['1/a'], [])) + await complete(schd) + + assert reflog_triggers == { + ('1/a', None), + # 1/b should have run again after 1/a on the re-trigger in flow 1: + ('1/b', ('1/a',)), + ('1/c', ('1/b',)), + } + + +async def test_prereqs( + flow, scheduler, run, reflog, complete, cylc_show, log_filter +): + """Test prereqs & stall behaviour when removing tasks.""" + schd: Scheduler = scheduler( + flow({ + 'scheduling': { + 'graph': { + 'R1': '(a1 | a2) & b => c', + }, + }, + }), + paused_start=False, + ) + async with run(schd): + reflog_triggers: set = reflog(schd) + await complete(schd, '1/a1', '1/a2', '1/b') assert not schd.pool.is_stalled() - assert reflog_triggers == { - ('1/a', None), - # 1/b should have run again after 1/a on the re-trigger in flow 1: - ('1/b', ('1/a',)), - } + assert len(schd.pool.task_queue_mgr.queues['default'].deque) + assert { + i['satisfied'] + for i in (await cylc_show(schd, '1/c'))['1/c']['prerequisites'] + } == {True} + + await run_cmd(remove_tasks(schd, ['1/a1', '1/b',], [FLOW_ALL])) + schd.process_workflow_db_queue() + # Removing the tasks should cause stall because they are prereqs of 1/c + assert len(schd.pool.task_queue_mgr.queues['default'].deque) == 0 + assert schd.pool.is_stalled() + assert log_filter( + logging.WARNING, + "1/c is waiting on ['1/a1:succeeded', '1/b:succeeded']", + ) + # `cylc show` should reflect the now-unsatisfied prereq: + assert { + i['satisfied'] + for i in (await cylc_show(schd, '1/c'))['1/c']['prerequisites'] + } == {False} + + assert reflog_triggers == { + ('1/a1', None), + ('1/a2', None), + ('1/b', None), + }