Skip to content

Commit

Permalink
cylc remove: update data store with changed prereqs; add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Oct 30, 2024
1 parent 82861ea commit 97f09c7
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 20 deletions.
4 changes: 4 additions & 0 deletions cylc/flow/scripts/show.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from cylc.flow.option_parsers import (
CylcOptionParser as COP,
ID_MULTI_ARG_DOC,
Options,
)
from cylc.flow.terminal import cli_function
from cylc.flow.util import BOOL_SYMBOLS
Expand Down Expand Up @@ -246,6 +247,9 @@ def get_option_parser():
return parser


ShowOptions = Options(get_option_parser())


async def workflow_meta_query(workflow_id, pclient, options, json_filter):
query = WORKFLOW_META_QUERY
query_kwargs = {
Expand Down
21 changes: 19 additions & 2 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,7 @@ def log_unsatisfied_prereqs(self) -> bool:
LOG.warning(
"Partially satisfied prerequisites:\n"
+ "\n".join(
f" * {id_} is waiting on {others}"
f" * {id_} is waiting on {sorted(others)}"
for id_, others in unsat.items()
)
)
Expand Down Expand Up @@ -2149,16 +2149,33 @@ def remove_tasks(
fnums_to_remove = child_itask.match_flows(flow_nums)
if not fnums_to_remove:
continue

Check warning on line 2151 in cylc/flow/task_pool.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/task_pool.py#L2151

Added line #L2151 was not covered by tests
prereq_changed = False
for prereq in (
*child_itask.state.prerequisites,
*child_itask.state.suicide_prerequisites,
):
for msg in prereq.naturally_satisfied_dependencies():
if msg.get_id() == id_:
prereq[msg] = False
self.unqueue_task(child_itask)
prereq_changed = True
if id_ not in removed:
removed[id_] = fnums_to_remove
break
if (not prereq_changed) or all(
pre.is_satisfied()
for pre in child_itask.state.prerequisites
):
continue
self.data_store_mgr.delta_task_prerequisite(child_itask)
if (
# Skip tasks we are already dealing with:
child_itask.identity in matched_task_ids
# Or tasks that are also in other flows:
or child_itask.flow_nums != fnums_to_remove
):
continue
# Downstream task no longer ready to run
self.unqueue_task(child_itask)

# Remove from DB tables:
db_removed_fnums = self.workflow_db_mgr.remove_task_from_flows(
Expand Down
26 changes: 25 additions & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,18 @@

from cylc.flow.config import WorkflowConfig
from cylc.flow.id import Tokens
from cylc.flow.network.client import WorkflowRuntimeClient
from cylc.flow.option_parsers import Options
from cylc.flow.pathutil import get_cylc_run_dir
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.scripts.install import (
get_option_parser as install_gop,
install as cylc_install,
)
from cylc.flow.scripts.show import (
ShowOptions,
prereqs_and_outputs_query,
)
from cylc.flow.scripts.validate import ValidateOptions
from cylc.flow.util import serialise_set
from cylc.flow.wallclock import get_current_time_string
Expand All @@ -57,7 +62,6 @@


if TYPE_CHECKING:
from cylc.flow.network.client import WorkflowRuntimeClient
from cylc.flow.scheduler import Scheduler
from cylc.flow.task_proxy import TaskProxy

Expand Down Expand Up @@ -697,3 +701,23 @@ async def _reftest(
return triggers

return _reftest


@pytest.fixture
def cylc_show():
"""Fixture that runs `cylc show` on a scheduler, returning JSON object."""

async def _cylc_show(schd: 'Scheduler', *task_ids: str) -> dict:
pclient = WorkflowRuntimeClient(schd.workflow)
await schd.update_data_structure()
json_filter: dict = {}
await prereqs_and_outputs_query(
schd.id,
[Tokens(id_, relative=True) for id_ in task_ids],
pclient,
ShowOptions(json=True),
json_filter,
)
return json_filter

return _cylc_show
10 changes: 10 additions & 0 deletions tests/integration/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import asyncio
import logging
from pathlib import Path

import pytest

from cylc.flow import __version__
from cylc.flow.scheduler import Scheduler


async def test_create_flow(flow, run_dir):
Expand Down Expand Up @@ -286,3 +288,11 @@ async def test_reftest(flow, scheduler, reftest):
('1/a', None),
('1/b', ('1/a',)),
}


async def test_show(one: Scheduler, start, cylc_show):
"""Demonstrate the `cylc_show` fixture"""
async with start(one):
out = await cylc_show(one, '1/one')
assert list(out.keys()) == ['1/one']
assert out['1/one']['state'] == 'waiting'
80 changes: 63 additions & 17 deletions tests/integration/test_remove.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def example_workflow(flow):
return flow({
'scheduling': {
'graph': {
# Note: test both `&` and separate arrows for combining
'R1': '''
a1 & a2 => b
a3 => b
Expand All @@ -42,7 +43,8 @@ def example_workflow(flow):

def get_data_store_flow_nums(schd: Scheduler, itask: TaskProxy):
_, ds_tproxy = schd.data_store_mgr.store_node_fetcher(itask.tokens)
return ds_tproxy.flow_nums
if ds_tproxy:
return ds_tproxy.flow_nums


async def test_basic(
Expand All @@ -52,7 +54,9 @@ async def test_basic(
schd: Scheduler = scheduler(example_workflow)
async with start(schd):
a1 = schd.pool._get_task_by_id('1/a1')
a3 = schd.pool._get_task_by_id('1/a3')
schd.pool.spawn_on_output(a1, TASK_OUTPUT_SUCCEEDED)
schd.pool.spawn_on_output(a3, TASK_OUTPUT_SUCCEEDED)
await schd.update_data_structure()

assert a1 in schd.pool.get_tasks()
Expand Down Expand Up @@ -253,8 +257,8 @@ async def test_logging_flow_nums(
assert schd.pool._get_task_by_id('1/a1').flow_nums == {1}


async def test_ref1(flow, scheduler, run, reflog, complete, log_filter):
"""Test prereqs/stall & re-run behaviour when removing tasks."""
async def test_retrigger(flow, scheduler, run, reflog, complete):
"""Test prereqs & re-run behaviour when removing tasks."""
schd: Scheduler = scheduler(
flow({
'scheduling': {
Expand All @@ -268,28 +272,70 @@ async def test_ref1(flow, scheduler, run, reflog, complete, log_filter):
async with run(schd):
reflog_triggers: set = reflog(schd)
await complete(schd, '1/b')
assert not schd.pool.is_stalled()
assert len(schd.pool.task_queue_mgr.queues['default'].deque)

await run_cmd(remove_tasks(schd, ['1/a', '1/b'], [FLOW_ALL]))
schd.process_workflow_db_queue()
# Removing 1/b should cause stall because it is prereq of 1/c:
# Removing 1/b should un-queue 1/c:
assert len(schd.pool.task_queue_mgr.queues['default'].deque) == 0
assert schd.pool.is_stalled()
assert log_filter(
logging.WARNING, "1/c is waiting on ['1/b:succeeded']"
)

assert reflog_triggers == {
('1/a', None),
('1/b', ('1/a',)),
}
reflog_triggers.clear()

await run_cmd(force_trigger_tasks(schd, ['1/a'], [FLOW_ALL]))
await complete(schd, '1/b')
await run_cmd(force_trigger_tasks(schd, ['1/a'], []))
await complete(schd)

assert reflog_triggers == {
('1/a', None),
# 1/b should have run again after 1/a on the re-trigger in flow 1:
('1/b', ('1/a',)),
('1/c', ('1/b',)),
}


async def test_prereqs(
flow, scheduler, run, reflog, complete, cylc_show, log_filter
):
"""Test prereqs & stall behaviour when removing tasks."""
schd: Scheduler = scheduler(
flow({
'scheduling': {
'graph': {
'R1': '(a1 | a2) & b => c',
},
},
}),
paused_start=False,
)
async with run(schd):
reflog_triggers: set = reflog(schd)
await complete(schd, '1/a1', '1/a2', '1/b')
assert not schd.pool.is_stalled()
assert reflog_triggers == {
('1/a', None),
# 1/b should have run again after 1/a on the re-trigger in flow 1:
('1/b', ('1/a',)),
}
assert len(schd.pool.task_queue_mgr.queues['default'].deque)
assert {
i['satisfied']
for i in (await cylc_show(schd, '1/c'))['1/c']['prerequisites']
} == {True}

await run_cmd(remove_tasks(schd, ['1/a1', '1/b',], [FLOW_ALL]))
schd.process_workflow_db_queue()
# Removing the tasks should cause stall because they are prereqs of 1/c
assert len(schd.pool.task_queue_mgr.queues['default'].deque) == 0
assert schd.pool.is_stalled()
assert log_filter(
logging.WARNING,
"1/c is waiting on ['1/a1:succeeded', '1/b:succeeded']",
)
# `cylc show` should reflect the now-unsatisfied prereq:
assert {
i['satisfied']
for i in (await cylc_show(schd, '1/c'))['1/c']['prerequisites']
} == {False}

assert reflog_triggers == {
('1/a1', None),
('1/a2', None),
('1/b', None),
}

0 comments on commit 97f09c7

Please sign in to comment.