diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 674542d4e63..acc86458619 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -18,8 +18,10 @@ jobs: strategy: matrix: os: ['ubuntu-latest'] - python: ['3.7', '3.8', '3.9', '3.10', '3.11'] + python: ['3.8', '3.9', '3.10', '3.11'] include: + - os: 'ubuntu-22.04' + python: '3.7' - os: 'macos-latest' python: '3.8' steps: diff --git a/.github/workflows/test_fast.yml b/.github/workflows/test_fast.yml index b58bc50ed99..dd65adc5e52 100644 --- a/.github/workflows/test_fast.yml +++ b/.github/workflows/test_fast.yml @@ -20,9 +20,10 @@ jobs: fail-fast: false # don't stop on first failure matrix: os: ['ubuntu-latest'] - python-version: ['3.7', '3.8', '3.10', '3.11', '3'] + python-version: ['3.8', '3.10', '3.11', '3'] include: - # mac os test + - os: 'ubuntu-22.04' + python-version: '3.7' - os: 'macos-latest' python-version: '3.9' # oldest supported version # non-utc timezone test diff --git a/.github/workflows/test_functional.yml b/.github/workflows/test_functional.yml index f055353d904..309e0f8b6fe 100644 --- a/.github/workflows/test_functional.yml +++ b/.github/workflows/test_functional.yml @@ -40,7 +40,7 @@ jobs: strategy: fail-fast: false matrix: - os: ['ubuntu-latest'] + os: ['ubuntu-22.04'] python-version: ['3.7'] test-base: ['tests/f'] chunk: ['1/4', '2/4', '3/4', '4/4'] @@ -56,20 +56,20 @@ jobs: platform: '_local_background*' # tests/k - name: 'flaky' - os: 'ubuntu-latest' + os: 'ubuntu-22.04' python-version: '3.7' test-base: 'tests/k' chunk: '1/1' platform: '_local_background* _local_at*' # remote platforms - name: '_remote_background_indep_poll' - os: 'ubuntu-latest' + os: 'ubuntu-22.04' python-version: '3.7' test-base: 'tests/f tests/k' chunk: '1/1' platform: '_remote_background_indep_poll _remote_at_indep_poll' - name: '_remote_background_indep_tcp' - os: 'ubuntu-latest' + os: 'ubuntu-22.04' test-base: 'tests/f tests/k' python-version: '3.7' chunk: '1/1' diff --git a/.github/workflows/test_tutorial_workflow.yml b/.github/workflows/test_tutorial_workflow.yml index 7859b8588e2..a0d2ec777b5 100644 --- a/.github/workflows/test_tutorial_workflow.yml +++ b/.github/workflows/test_tutorial_workflow.yml @@ -21,8 +21,12 @@ jobs: test: strategy: matrix: - python-version: ['3.7', '3'] - runs-on: ubuntu-latest + include: + - os: 'ubuntu-latest' + python-version: '3' + - os: 'ubuntu-22.04' + python-version: '3.7' + runs-on: ${{ matrix.os }} timeout-minutes: 10 steps: - name: configure python diff --git a/CHANGES.md b/CHANGES.md index b9fba716bb9..9f62c06f46f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -11,6 +11,18 @@ $ towncrier create ..md --content "Short description" +## __cylc-8.3.5 (Released 2024-10-15)__ + +### 🔧 Fixes + +[#6316](https://github.com/cylc/cylc-flow/pull/6316) - Fixed bug in `cylc vr` where an initial cycle point of `now`/`next()`/`previous()` would result in an error. + +[#6362](https://github.com/cylc/cylc-flow/pull/6362) - Fixed simulation mode bug where the task submit number would not increment + +[#6367](https://github.com/cylc/cylc-flow/pull/6367) - Fix bug where `cylc trigger` and `cylc set` would assign active flows to existing tasks by default. + +[#6397](https://github.com/cylc/cylc-flow/pull/6397) - Fix "dictionary changed size during iteration error" which could occur with broadcasts. + ## __cylc-8.3.4 (Released 2024-09-12)__ ### 🚀 Enhancements diff --git a/changes.d/6362.fix.md b/changes.d/6362.fix.md deleted file mode 100644 index d469ede6b95..00000000000 --- a/changes.d/6362.fix.md +++ /dev/null @@ -1 +0,0 @@ -Fixed simulation mode bug where the task submit number would not increment diff --git a/cylc/flow/command_validation.py b/cylc/flow/command_validation.py index bbdc7089fa1..7ba2dfb4f42 100644 --- a/cylc/flow/command_validation.py +++ b/cylc/flow/command_validation.py @@ -41,7 +41,7 @@ f"or '{FLOW_NONE}'" ) ERR_OPT_FLOW_VAL_2 = f"Flow values must be an integer, or '{FLOW_ALL}'" -ERR_OPT_FLOW_INT = "Multiple flow options must all be integer valued" +ERR_OPT_FLOW_COMBINE = "Cannot combine --flow={0} with other flow values" ERR_OPT_FLOW_WAIT = ( f"--wait is not compatible with --flow={FLOW_NEW} or --flow={FLOW_NONE}" ) @@ -54,10 +54,11 @@ def flow_opts( ) -> None: """Check validity of flow-related CLI options. - Note the schema defaults flows to ["all"]. + Note the schema defaults flows to []. Examples: Good: + >>> flow_opts([], False) >>> flow_opts(["new"], False) >>> flow_opts(["1", "2"], False) >>> flow_opts(["1", "2"], True) @@ -65,7 +66,8 @@ def flow_opts( Bad: >>> flow_opts(["none", "1"], False) Traceback (most recent call last): - cylc.flow.exceptions.InputError: ... must all be integer valued + cylc.flow.exceptions.InputError: Cannot combine --flow=none with other + flow values >>> flow_opts(["cheese", "2"], True) Traceback (most recent call last): @@ -73,25 +75,31 @@ def flow_opts( >>> flow_opts(["new"], True) Traceback (most recent call last): - cylc.flow.exceptions.InputError: --wait is not compatible ... + cylc.flow.exceptions.InputError: --wait is not compatible with + --flow=new or --flow=none >>> flow_opts(["new"], False, allow_new_or_none=False) Traceback (most recent call last): cylc.flow.exceptions.InputError: ... must be an integer, or 'all' """ + if not flows: + return + + flows = [val.strip() for val in flows] + for val in flows: val = val.strip() if val in {FLOW_NONE, FLOW_NEW, FLOW_ALL}: if len(flows) != 1: - raise InputError(ERR_OPT_FLOW_INT) + raise InputError(ERR_OPT_FLOW_COMBINE.format(val)) if not allow_new_or_none and val in {FLOW_NEW, FLOW_NONE}: raise InputError(ERR_OPT_FLOW_VAL_2) else: try: int(val) except ValueError: - raise InputError(ERR_OPT_FLOW_VAL.format(val)) from None + raise InputError(ERR_OPT_FLOW_VAL) from None if flow_wait and flows[0] in {FLOW_NEW, FLOW_NONE}: raise InputError(ERR_OPT_FLOW_WAIT) diff --git a/cylc/flow/config.py b/cylc/flow/config.py index df27078ee44..786095a215d 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -690,7 +690,7 @@ def process_initial_cycle_point(self) -> None: Sets: self.initial_point self.cfg['scheduling']['initial cycle point'] - self.options.icp + self.evaluated_icp Raises: WorkflowConfigError - if it fails to validate """ @@ -710,10 +710,11 @@ def process_initial_cycle_point(self) -> None: icp = ingest_time(orig_icp, get_current_time_string()) except IsodatetimeError as exc: raise WorkflowConfigError(str(exc)) from None - if orig_icp != icp: + self.evaluated_icp = None + if icp != orig_icp: # now/next()/previous() was used, need to store # evaluated point in DB - self.options.icp = icp + self.evaluated_icp = icp self.initial_point = get_point(icp).standardise() self.cfg['scheduling']['initial cycle point'] = str(self.initial_point) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index 29be92969f4..43ad90c582b 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -948,7 +948,7 @@ def increment_graph_window( ) for items in graph_children.values(): for child_name, child_point, _ in items: - if child_point > final_point: + if final_point and child_point > final_point: continue child_tokens = self.id_.duplicate( cycle=str(child_point), @@ -978,7 +978,7 @@ def increment_graph_window( taskdefs ).values(): for parent_name, parent_point, _ in items: - if parent_point > final_point: + if final_point and parent_point > final_point: continue parent_tokens = self.id_.duplicate( cycle=str(parent_point), @@ -1419,7 +1419,7 @@ def apply_task_proxy_db_history(self): itask, is_parent = self.db_load_task_proxies[relative_id] itask.submit_num = submit_num flow_nums = deserialise_set(flow_nums_str) - # Do not set states and outputs for future tasks in flow. + # Do not set states and outputs for inactive tasks in flow. if ( itask.flow_nums and flow_nums != itask.flow_nums and @@ -2250,7 +2250,9 @@ def delta_broadcast(self): def _generate_broadcast_node_deltas(self, node_data, node_type): cfg = self.schd.config.cfg - for node_id, node in node_data.items(): + # NOTE: node_data may change during operation so make a copy + # see https://github.com/cylc/cylc-flow/pull/6397 + for node_id, node in list(node_data.items()): tokens = Tokens(node_id) new_runtime = runtime_from_config( self._apply_broadcasts_to_runtime( diff --git a/cylc/flow/etc/tutorial/cylc-forecasting-workflow/lib/python/util.py b/cylc/flow/etc/tutorial/cylc-forecasting-workflow/lib/python/util.py index 6f24b28cbe0..6450bbc161d 100644 --- a/cylc/flow/etc/tutorial/cylc-forecasting-workflow/lib/python/util.py +++ b/cylc/flow/etc/tutorial/cylc-forecasting-workflow/lib/python/util.py @@ -282,13 +282,18 @@ def __call__(self, grid_x, grid_y): return z_val -def parse_domain(domain): - bbox = list(map(float, domain.split(','))) +def parse_domain(domain: str): + lng1, lat1, lng2, lat2 = list(map(float, domain.split(','))) + msg = "Invalid domain '{}' ({} {} >= {})" + if lng1 >= lng2: + raise ValueError(msg.format(domain, 'longitude', lng1, lng2)) + if lat1 >= lat2: + raise ValueError(msg.format(domain, 'latitude', lat1, lat2)) return { - 'lng1': bbox[0], - 'lat1': bbox[1], - 'lng2': bbox[2], - 'lat2': bbox[3] + 'lng1': lng1, + 'lat1': lat1, + 'lng2': lng2, + 'lat2': lat2, } diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 41d11ef7450..76cbdc323ad 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -2027,17 +2027,20 @@ class Arguments: class FlowMutationArguments: flow = graphene.List( graphene.NonNull(Flow), - default_value=[FLOW_ALL], + default_value=[], description=sstrip(f''' - The flow(s) to trigger these tasks in. - - This should be a list of flow numbers OR a single-item list - containing one of the following three strings: - - * {FLOW_ALL} - Triggered tasks belong to all active flows - (default). - * {FLOW_NEW} - Triggered tasks are assigned to a new flow. - * {FLOW_NONE} - Triggered tasks do not belong to any flow. + The flow(s) to trigger/set these tasks in. + + By default: + * active tasks (n=0) keep their existing flow assignment + * inactive tasks (n>0) get assigned all active flows + + Otherwise you can assign (inactive tasks) or add to (active tasks): + * a list of integer flow numbers + or one of the following strings: + * {FLOW_ALL} - all active flows + * {FLOW_NEW} - an automatically generated new flow number + * {FLOW_NONE} - (ignored for active tasks): no flow ''') ) flow_wait = Boolean( diff --git a/cylc/flow/scripts/show.py b/cylc/flow/scripts/show.py index da7d7e5723e..c9bf6670fcf 100755 --- a/cylc/flow/scripts/show.py +++ b/cylc/flow/scripts/show.py @@ -387,8 +387,10 @@ async def prereqs_and_outputs_query( if not task_proxies: ansiprint( - f"No matching active tasks found: {', '.join(ids_list)}", - file=sys.stderr) + "No matching active tasks found: " + f"{', '.join(ids_list)}", + file=sys.stderr, + ) return 1 return 0 diff --git a/cylc/flow/scripts/trigger.py b/cylc/flow/scripts/trigger.py index de788481cfe..1e6ef913696 100755 --- a/cylc/flow/scripts/trigger.py +++ b/cylc/flow/scripts/trigger.py @@ -17,19 +17,12 @@ """cylc trigger [OPTIONS] ARGS -Force tasks to run despite unsatisfied prerequisites. +Force tasks to run regardless of prerequisites. * Triggering an unqueued waiting task queues it, regardless of prerequisites. * Triggering a queued task submits it, regardless of queue limiting. * Triggering an active task has no effect (it already triggered). -Incomplete and active-waiting tasks in the n=0 window already belong to a flow. -Triggering them queues them to run (or rerun) in the same flow. - -Beyond n=0, triggered tasks get all current active flow numbers by default, or -specified flow numbers via the --flow option. Those flows - if/when they catch -up - will see tasks that ran after triggering event as having run already. - Examples: # trigger task foo in cycle 1234 in test $ cylc trigger test//1234/foo @@ -39,6 +32,21 @@ # start a new flow by triggering 1234/foo in test $ cylc trigger --flow=new test//1234/foo + +Flows: + Active tasks (in the n=0 window) already belong to a flow. + * by default, if triggered, they run in the same flow + * or with --flow=all, they are assigned all active flows + * or with --flow=INT or --flow=new, the original and new flows are merged + * (--flow=none is ignored for active tasks) + + Inactive tasks (n>0) do not already belong to a flow. + * by default they are assigned all active flows + * otherwise, they are assigned the --flow value + + Note --flow=new increments the global flow counter with each use. If it + takes multiple commands to start a new flow use the actual flow number + after the first command (you can read it from the scheduler log). """ from functools import partial diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index a866eccddc3..72e6545b73f 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -882,6 +882,11 @@ def remove(self, itask: 'TaskProxy', reason: Optional[str] = None) -> None: msg += " - active job orphaned" LOG.log(level, f"[{itask}] {msg}") + + # ensure this task is written to the DB before moving on + # https://github.com/cylc/cylc-flow/issues/6315 + self.workflow_db_mgr.process_queued_ops() + del itask def get_tasks(self) -> List[TaskProxy]: @@ -1314,7 +1319,7 @@ def hold_tasks(self, items: Iterable[str]) -> int: ) for itask in itasks: self.hold_active_task(itask) - # Set inactive/future tasks to be held: + # Set inactive tasks to be held: for name, cycle in inactive_tasks: self.data_store_mgr.delta_task_held((name, cycle, True)) self.tasks_to_hold.update(inactive_tasks) @@ -1332,7 +1337,7 @@ def release_held_tasks(self, items: Iterable[str]) -> int: ) for itask in itasks: self.release_held_active_task(itask) - # Unhold inactive/future tasks: + # Unhold inactive tasks: for name, cycle in inactive_tasks: self.data_store_mgr.delta_task_held((name, cycle, False)) self.tasks_to_hold.difference_update(inactive_tasks) @@ -1924,11 +1929,6 @@ def set_prereqs_and_outputs( flow_descr: description of new flow """ - flow_nums = self._get_flow_nums(flow, flow_descr) - if flow_nums is None: - # Illegal flow command opts - return - # Get matching pool tasks and inactive task definitions. itasks, inactive_tasks, unmatched = self.filter_task_proxies( items, @@ -1936,14 +1936,20 @@ def set_prereqs_and_outputs( warn_no_active=False, ) + flow_nums = self._get_flow_nums(flow, flow_descr) + + # Set existing task proxies. for itask in itasks: - # Existing task proxies. self.merge_flows(itask, flow_nums) if prereqs: self._set_prereqs_itask(itask, prereqs, flow_nums) else: self._set_outputs_itask(itask, outputs) + # Spawn and set inactive tasks. + if not flow: + # default: assign to all active flows + flow_nums = self._get_active_flow_nums() for name, point in inactive_tasks: tdef = self.config.get_taskdef(name) if prereqs: @@ -2031,7 +2037,7 @@ def _set_prereqs_itask( def _set_prereqs_tdef( self, point, taskdef, prereqs, flow_nums, flow_wait ): - """Spawn a future task and set prerequisites on it.""" + """Spawn an inactive task and set prerequisites on it.""" itask = self.spawn_task( taskdef.name, point, flow_nums, flow_wait=flow_wait @@ -2170,38 +2176,30 @@ def remove_tasks( self.release_runahead_tasks() def _get_flow_nums( - self, - flow: List[str], - meta: Optional[str] = None, - ) -> Optional[Set[int]]: - """Get correct flow numbers given user command options.""" - if set(flow).intersection({FLOW_ALL, FLOW_NEW, FLOW_NONE}): - if len(flow) != 1: - LOG.warning( - f'The "flow" values {FLOW_ALL}, {FLOW_NEW} & {FLOW_NONE}' - ' cannot be used in combination with integer flow numbers.' - ) - return None - if flow[0] == FLOW_ALL: - flow_nums = self._get_active_flow_nums() - elif flow[0] == FLOW_NEW: - flow_nums = {self.flow_mgr.get_flow_num(meta=meta)} - elif flow[0] == FLOW_NONE: - flow_nums = set() - else: - try: - flow_nums = { - self.flow_mgr.get_flow_num( - flow_num=int(n), meta=meta - ) - for n in flow - } - except ValueError: - LOG.warning( - f"Ignoring command: illegal flow values {flow}" - ) - return None - return flow_nums + self, + flow: List[str], + meta: Optional[str] = None, + ) -> Set[int]: + """Return flow numbers corresponding to user command options. + + Arg should have been validated already during command validation. + + In the default case (--flow option not provided), stick with the + existing flows (so return empty set) - NOTE this only applies for + active tasks. + + """ + if flow == [FLOW_NONE]: + return set() + if flow == [FLOW_ALL]: + return self._get_active_flow_nums() + if flow == [FLOW_NEW]: + return {self.flow_mgr.get_flow_num(meta=meta)} + # else specific flow numbers: + return { + self.flow_mgr.get_flow_num(flow_num=int(n), meta=meta) + for n in flow + } def _force_trigger(self, itask): """Assumes task is in the pool""" @@ -2264,17 +2262,14 @@ def force_trigger_tasks( unless flow-wait is set. """ - # Get flow numbers for the tasks to be triggered. - flow_nums = self._get_flow_nums(flow, flow_descr) - if flow_nums is None: - return - # Get matching tasks proxies, and matching inactive task IDs. existing_tasks, inactive_ids, unmatched = self.filter_task_proxies( items, inactive=True, warn_no_active=False, ) - # Trigger existing tasks. + flow_nums = self._get_flow_nums(flow, flow_descr) + + # Trigger active tasks. for itask in existing_tasks: if itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE): LOG.warning(f"[{itask}] ignoring trigger - already active") @@ -2283,11 +2278,12 @@ def force_trigger_tasks( self._force_trigger(itask) # Spawn and trigger inactive tasks. + if not flow: + # default: assign to all active flows + flow_nums = self._get_active_flow_nums() for name, point in inactive_ids: - if not self.can_be_spawned(name, point): continue - submit_num, _, prev_fwait = ( self._get_task_history(name, point, flow_nums) ) diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 762324c9d75..ec7ea0bdade 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -17,10 +17,17 @@ """Task definition.""" from collections import deque -from typing import TYPE_CHECKING, Dict, List +from typing import ( + TYPE_CHECKING, + Dict, + List, + NamedTuple, + Set, + Tuple, +) -import cylc.flow.flags from cylc.flow.exceptions import TaskDefError +import cylc.flow.flags from cylc.flow.task_id import TaskID from cylc.flow.task_outputs import ( SORT_ORDERS, @@ -31,17 +38,29 @@ if TYPE_CHECKING: - from cylc.flow.cycling import PointBase, SequenceBase - from cylc.flow.task_trigger import Dependency + from cylc.flow.cycling import ( + PointBase, + SequenceBase, + ) + from cylc.flow.task_trigger import ( + Dependency, + TaskTrigger, + ) -def generate_graph_children(tdef, point): +class TaskTuple(NamedTuple): + name: str + point: 'PointBase' + is_abs: bool + + +def generate_graph_children( + tdef: 'TaskDef', point: 'PointBase' +) -> Dict[str, List[TaskTuple]]: """Determine graph children of this task at point.""" - graph_children = {} + graph_children: Dict[str, List[TaskTuple]] = {} for seq, dout in tdef.graph_children.items(): for output, downs in dout.items(): - if output not in graph_children: - graph_children[output] = [] for name, trigger in downs: child_point = trigger.get_child_point(point, seq) is_abs = ( @@ -55,7 +74,9 @@ def generate_graph_children(tdef, point): # E.g.: foo should trigger only on T06: # PT6H = "waz" # T06 = "waz[-PT6H] => foo" - graph_children[output].append((name, child_point, is_abs)) + graph_children.setdefault(output, []).append( + TaskTuple(name, child_point, is_abs) + ) if tdef.sequential: # Add next-instance child. @@ -66,20 +87,21 @@ def generate_graph_children(tdef, point): # Within sequence bounds. nexts.append(nxt) if nexts: - if TASK_OUTPUT_SUCCEEDED not in graph_children: - graph_children[TASK_OUTPUT_SUCCEEDED] = [] - graph_children[TASK_OUTPUT_SUCCEEDED].append( - (tdef.name, min(nexts), False)) + graph_children.setdefault(TASK_OUTPUT_SUCCEEDED, []).append( + TaskTuple(tdef.name, min(nexts), False) + ) return graph_children -def generate_graph_parents(tdef, point, taskdefs): +def generate_graph_parents( + tdef: 'TaskDef', point: 'PointBase', taskdefs: Dict[str, 'TaskDef'] +) -> Dict['SequenceBase', List[TaskTuple]]: """Determine concrete graph parents of task tdef at point. Infer parents be reversing upstream triggers that lead to point/task. """ - graph_parents = {} + graph_parents: Dict['SequenceBase', List[TaskTuple]] = {} for seq, triggers in tdef.graph_parents.items(): if not seq.is_valid(point): # Don't infer parents if the trigger belongs to a sequence that @@ -104,7 +126,9 @@ def generate_graph_parents(tdef, point, taskdefs): # TODO ideally validation would flag this as an error. continue is_abs = trigger.offset_is_absolute or trigger.offset_is_from_icp - graph_parents[seq].append((parent_name, parent_point, is_abs)) + graph_parents[seq].append( + TaskTuple(parent_name, parent_point, is_abs) + ) if tdef.sequential: # Add implicit previous-instance parent. @@ -115,9 +139,9 @@ def generate_graph_parents(tdef, point, taskdefs): # Within sequence bounds. prevs.append(prev) if prevs: - if seq not in graph_parents: - graph_parents[seq] = [] - graph_parents[seq].append((tdef.name, min(prevs), False)) + graph_parents.setdefault(seq, []).append( + TaskTuple(tdef.name, min(prevs), False) + ) return graph_parents @@ -159,8 +183,12 @@ def __init__(self, name, rtcfg, run_mode, start_point, initial_point): self.namespace_hierarchy = [] self.dependencies: Dict[SequenceBase, List[Dependency]] = {} self.outputs = {} # {output: (message, is_required)} - self.graph_children = {} - self.graph_parents = {} + self.graph_children: Dict[ + SequenceBase, Dict[str, List[Tuple[str, TaskTrigger]]] + ] = {} + self.graph_parents: Dict[ + SequenceBase, Set[Tuple[str, TaskTrigger]] + ] = {} self.param_var = {} self.external_triggers = [] self.xtrig_labels = {} # {sequence: [labels]} @@ -211,7 +239,9 @@ def tweak_outputs(self): ]: self.set_required_output(output, True) - def add_graph_child(self, trigger, taskname, sequence): + def add_graph_child( + self, trigger: 'TaskTrigger', taskname: str, sequence: 'SequenceBase' + ) -> None: """Record child task instances that depend on my outputs. {sequence: { @@ -220,18 +250,20 @@ def add_graph_child(self, trigger, taskname, sequence): } """ self.graph_children.setdefault( - sequence, {}).setdefault( - trigger.output, []).append((taskname, trigger)) - - def add_graph_parent(self, trigger, parent, sequence): + sequence, {} + ).setdefault( + trigger.output, [] + ).append((taskname, trigger)) + + def add_graph_parent( + self, trigger: 'TaskTrigger', parent: str, sequence: 'SequenceBase' + ) -> None: """Record task instances that I depend on. { sequence: set([(a,t1), (b,t2), ...]) # (task-name, trigger) } """ - if sequence not in self.graph_parents: - self.graph_parents[sequence] = set() - self.graph_parents[sequence].add((parent, trigger)) + self.graph_parents.setdefault(sequence, set()).add((parent, trigger)) def add_dependency(self, dependency, sequence): """Add a dependency to a named sequence. diff --git a/cylc/flow/workflow_db_mgr.py b/cylc/flow/workflow_db_mgr.py index 29cc8e6fbf7..ebe176b85ac 100644 --- a/cylc/flow/workflow_db_mgr.py +++ b/cylc/flow/workflow_db_mgr.py @@ -350,8 +350,16 @@ def put_workflow_params(self, schd: 'Scheduler') -> None: {"key": self.KEY_STOP_CLOCK_TIME, "value": schd.stop_clock_time}, {"key": self.KEY_STOP_TASK, "value": schd.stop_task}, ]) - for key in ( + + # Store raw initial cycle point in the DB. + value = schd.config.evaluated_icp + value = None if value == 'reload' else value + self.put_workflow_params_1( self.KEY_INITIAL_CYCLE_POINT, + value or str(schd.config.initial_point) + ) + + for key in ( self.KEY_FINAL_CYCLE_POINT, self.KEY_START_CYCLE_POINT, self.KEY_STOP_CYCLE_POINT diff --git a/etc/bin/swarm b/etc/bin/swarm index 6e8c814d49e..77d5beba251 100755 --- a/etc/bin/swarm +++ b/etc/bin/swarm @@ -145,11 +145,9 @@ prompt () { case $USR in [Yy]) return 0 - break ;; [Nn]) return 1 - break ;; esac done diff --git a/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t b/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t index a01d42e2ab3..a7711318690 100755 --- a/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t +++ b/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t @@ -33,6 +33,7 @@ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" workflow_run_ok "${TEST_NAME_BASE}-run" \ cylc play --reference-test -v --no-detach "${WORKFLOW_NAME}" --timestamp #------------------------------------------------------------------------------- +# shellcheck disable=SC2317 cmp_times () { # Test if the times $1 and $2 are within $3 seconds of each other. python3 -u - "$@" <<'__PYTHON__' diff --git a/tests/flakyfunctional/database/00-simple.t b/tests/flakyfunctional/database/00-simple.t index edf86107e51..c3f1ad19faf 100644 --- a/tests/flakyfunctional/database/00-simple.t +++ b/tests/flakyfunctional/database/00-simple.t @@ -46,7 +46,7 @@ UTC_mode|0 cycle_point_format| cylc_version|$(cylc --version) fcp| -icp| +icp|1 is_paused|0 n_restart|0 run_mode| diff --git a/tests/flakyfunctional/xtriggers/00-wall_clock.t b/tests/flakyfunctional/xtriggers/00-wall_clock.t index 9e966f8cd52..49d9bbe20b8 100644 --- a/tests/flakyfunctional/xtriggers/00-wall_clock.t +++ b/tests/flakyfunctional/xtriggers/00-wall_clock.t @@ -18,6 +18,7 @@ # Test clock xtriggers . "$(dirname "$0")/test_header" +# shellcheck disable=SC2317 run_workflow() { cylc play --no-detach --debug "$1" \ -s "START='$2'" -s "HOUR='$3'" -s "OFFSET='$4'" diff --git a/tests/functional/cylc-combination-scripts/09-vr-icp-now.t b/tests/functional/cylc-combination-scripts/09-vr-icp-now.t new file mode 100644 index 00000000000..932e735fff1 --- /dev/null +++ b/tests/functional/cylc-combination-scripts/09-vr-icp-now.t @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +#------------------------------------------------------------------------------ +# Ensure that validate step of Cylc VR cannot change the options object. +# See https://github.com/cylc/cylc-flow/issues/6262 + +. "$(dirname "$0")/test_header" +set_test_number 2 + +WORKFLOW_ID=$(workflow_id) + +cp -r "${TEST_SOURCE_DIR}/${TEST_NAME_BASE}/flow.cylc" . + +run_ok "${TEST_NAME_BASE}-vip" \ + cylc vip . \ + --workflow-name "${WORKFLOW_ID}" \ + --no-detach \ + --no-run-name + +echo "# Some Comment" >> flow.cylc + +run_ok "${TEST_NAME_BASE}-vr" \ + cylc vr "${WORKFLOW_ID}" \ + --stop-cycle-point 2020-01-01T00:02Z diff --git a/tests/functional/cylc-combination-scripts/09-vr-icp-now/flow.cylc b/tests/functional/cylc-combination-scripts/09-vr-icp-now/flow.cylc new file mode 100644 index 00000000000..e9f6284769e --- /dev/null +++ b/tests/functional/cylc-combination-scripts/09-vr-icp-now/flow.cylc @@ -0,0 +1,9 @@ +[scheduling] + initial cycle point = 2020 + stop after cycle point = 2020-01-01T00:01Z + [[graph]] + PT1M = foo +[runtime] + [[foo]] + [[[simulation]]] + default run length = PT0S diff --git a/tests/functional/cylc-set/04-switch/flow.cylc b/tests/functional/cylc-set/04-switch/flow.cylc index 18402c7b64c..8f7c4329af6 100644 --- a/tests/functional/cylc-set/04-switch/flow.cylc +++ b/tests/functional/cylc-set/04-switch/flow.cylc @@ -1,4 +1,4 @@ -# Set outputs of future task to direct the flow at an optional branch point. +# Set outputs of inactive task to direct the flow at an optional branch point. [scheduler] [[events]] diff --git a/tests/functional/cylc-set/05-expire/flow.cylc b/tests/functional/cylc-set/05-expire/flow.cylc index 9717664132f..4e5ca9f0608 100644 --- a/tests/functional/cylc-set/05-expire/flow.cylc +++ b/tests/functional/cylc-set/05-expire/flow.cylc @@ -1,4 +1,4 @@ -# Expire a future task, so it won't run. +# Expire an inactive task, so it won't run. [scheduler] [[events]] diff --git a/tests/functional/data-store/00-prune-optional-break.t b/tests/functional/data-store/00-prune-optional-break.t index 9b09ac8d156..b50e5a51664 100755 --- a/tests/functional/data-store/00-prune-optional-break.t +++ b/tests/functional/data-store/00-prune-optional-break.t @@ -27,9 +27,9 @@ init_workflow "${TEST_NAME_BASE}" << __FLOW__ final cycle point = 1 [[graph]] P1 = """ -a? => b? => c? -d => e -""" + a => b? => c? + a => d => e + """ [runtime] [[a,c,e]] script = true @@ -37,15 +37,15 @@ d => e script = false [[d]] script = """ -cylc workflow-state \${CYLC_WORKFLOW_ID}//1/b:failed --interval=2 -cylc pause \$CYLC_WORKFLOW_ID -""" + cylc workflow-state \${CYLC_WORKFLOW_ID}//1/b:failed --interval=2 --max-polls=20 -v + cylc pause \$CYLC_WORKFLOW_ID + """ __FLOW__ # run workflow run_ok "${TEST_NAME_BASE}-run" cylc play "${WORKFLOW_NAME}" -cylc workflow-state "${WORKFLOW_NAME}/1/d:succeeded" --interval=2 --max-polls=60 +cylc workflow-state "${WORKFLOW_NAME}//1/d:succeeded" --interval=2 --max-polls=60 -v # query workflow TEST_NAME="${TEST_NAME_BASE}-prune-optional-break" diff --git a/tests/functional/flow-triggers/10-specific-flow.t b/tests/functional/flow-triggers/10-specific-flow.t index ce5e80a6c68..238f1e12670 100644 --- a/tests/functional/flow-triggers/10-specific-flow.t +++ b/tests/functional/flow-triggers/10-specific-flow.t @@ -1,7 +1,7 @@ #!/usr/bin/env bash # THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. # Copyright (C) NIWA & British Crown (Met Office) & Contributors. -# +# # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -15,6 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . #------------------------------------------------------------------------------- +# Test targeting a specific flow, with trigger --wait. . "$(dirname "$0")/test_header" set_test_number 2 diff --git a/tests/functional/flow-triggers/10-specific-flow/flow.cylc b/tests/functional/flow-triggers/10-specific-flow/flow.cylc index 46ba6dab4c1..ff6196a6871 100644 --- a/tests/functional/flow-triggers/10-specific-flow/flow.cylc +++ b/tests/functional/flow-triggers/10-specific-flow/flow.cylc @@ -17,6 +17,10 @@ [[trigger-happy]] script = """ cylc trigger --flow=2 --wait ${CYLC_WORKFLOW_ID}//1/f - cylc__job__poll_grep_workflow_log "1/d/01:submitted.*running" - cylc trigger --flow=2 ${CYLC_WORKFLOW_ID}//1/b + """ + [[d]] + script = """ + if [[ "$CYLC_TASK_SUBMIT_NUMBER" == "1" ]]; then + cylc trigger --flow=2 ${CYLC_WORKFLOW_ID}//1/b + fi """ diff --git a/tests/functional/lib/bash/test_header b/tests/functional/lib/bash/test_header index f9b58a35f75..ce0dd6165f7 100644 --- a/tests/functional/lib/bash/test_header +++ b/tests/functional/lib/bash/test_header @@ -1160,7 +1160,6 @@ for SKIP in ${CYLC_TEST_SKIP}; do # Deliberately print variable substitution syntax unexpanded # shellcheck disable=SC2016 skip_all 'this test is in $CYLC_TEST_SKIP.' - break fi done diff --git a/tests/functional/reload/17-graphing-change.t b/tests/functional/reload/17-graphing-change.t index 9df561384ff..41e0b4697c9 100755 --- a/tests/functional/reload/17-graphing-change.t +++ b/tests/functional/reload/17-graphing-change.t @@ -20,6 +20,7 @@ #------------------------------------------------------------------------------- set_test_number 12 +# shellcheck disable=SC2317 grep_workflow_log_n_times() { TEXT="$1" N_TIMES="$2" diff --git a/tests/functional/reload/26-stalled.t b/tests/functional/reload/26-stalled.t index 63dabb2ba81..8f6e7594a48 100644 --- a/tests/functional/reload/26-stalled.t +++ b/tests/functional/reload/26-stalled.t @@ -26,7 +26,7 @@ init_workflow "${TEST_NAME_BASE}" <<'__FLOW__' [scheduler] [[events]] stall handlers = cylc reload %(workflow)s - stall timeout = PT10S + stall timeout = PT30S abort on stall timeout = True # Prevent infinite loop if the bug resurfaces workflow timeout = PT3M diff --git a/tests/functional/triggering/08-fam-finish-any.t b/tests/functional/triggering/08-fam-finish-any.t index 2dda6132723..6849ee4a1c2 100644 --- a/tests/functional/triggering/08-fam-finish-any.t +++ b/tests/functional/triggering/08-fam-finish-any.t @@ -1,7 +1,7 @@ #!/usr/bin/env bash # THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. # Copyright (C) NIWA & British Crown (Met Office) & Contributors. -# +# # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -17,6 +17,7 @@ #------------------------------------------------------------------------------- # Test correct expansion of 'FAM:finish-any' . "$(dirname "$0")/test_header" +skip_macos_gh_actions set_test_number 2 reftest exit diff --git a/tests/functional/triggering/08-fam-finish-any/flow.cylc b/tests/functional/triggering/08-fam-finish-any/flow.cylc index 6d8790a829f..6ecb0bf9781 100644 --- a/tests/functional/triggering/08-fam-finish-any/flow.cylc +++ b/tests/functional/triggering/08-fam-finish-any/flow.cylc @@ -2,12 +2,19 @@ [[graph]] R1 = """FAM:finish-any => foo""" [runtime] + [[root]] + script = true [[FAM]] - script = sleep 10 - [[a,c]] + [[a]] inherit = FAM + script = """ + cylc__job__poll_grep_workflow_log -E "1/b.*succeeded" + """ [[b]] inherit = FAM - script = true + [[c]] + inherit = FAM + script = """ + cylc__job__poll_grep_workflow_log -E "1/b.*succeeded" + """ [[foo]] - script = true diff --git a/tests/integration/test_data_store_mgr.py b/tests/integration/test_data_store_mgr.py index 9e2d4cc938e..cb19fb1fe23 100644 --- a/tests/integration/test_data_store_mgr.py +++ b/tests/integration/test_data_store_mgr.py @@ -314,7 +314,7 @@ def test_delta_task_prerequisite(harness): [t.identity for t in schd.pool.get_tasks()], [(TASK_STATUS_SUCCEEDED,)], [], - "all" + flow=[] ) assert all(p.satisfied for p in get_pb_prereqs(schd)) for itask in schd.pool.get_tasks(): diff --git a/tests/integration/test_flow_assignment.py b/tests/integration/test_flow_assignment.py new file mode 100644 index 00000000000..5816b08527f --- /dev/null +++ b/tests/integration/test_flow_assignment.py @@ -0,0 +1,155 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Test for flow-assignment in triggered/set tasks.""" + +import functools +import time +from typing import Callable + +import pytest + +from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE +from cylc.flow.scheduler import Scheduler + + +async def test_trigger_no_flows(one, start): + """Test triggering a task with no flows present. + + It should get the flow numbers of the most recent active tasks. + """ + async with start(one): + + # Remove the task (flow 1) --> pool empty + task = one.pool.get_tasks()[0] + one.pool.remove(task) + assert len(one.pool.get_tasks()) == 0 + + # Trigger the task, with new flow nums. + time.sleep(2) # The flows need different timestamps! + one.pool.force_trigger_tasks([task.identity], flow=['5', '9']) + assert len(one.pool.get_tasks()) == 1 + + # Ensure the new flow is in the db. + one.pool.workflow_db_mgr.process_queued_ops() + + # Remove the task --> pool empty + task = one.pool.get_tasks()[0] + one.pool.remove(task) + assert len(one.pool.get_tasks()) == 0 + + # Trigger the task; it should get flow nums 5, 9 + one.pool.force_trigger_tasks([task.identity], [FLOW_ALL]) + assert len(one.pool.get_tasks()) == 1 + task = one.pool.get_tasks()[0] + assert task.flow_nums == {5, 9} + + +async def test_get_flow_nums(one: Scheduler, start): + """Test the task pool _get_flow_nums() method.""" + async with start(one): + # flow 1 is already present + task = one.pool.get_tasks()[0] + assert one.pool._get_flow_nums([FLOW_NEW]) == {2} + one.pool.merge_flows(task, {2}) + # now we have flows {1, 2}: + + assert one.pool._get_flow_nums([FLOW_NONE]) == set() + assert one.pool._get_flow_nums([FLOW_ALL]) == {1, 2} + assert one.pool._get_flow_nums([FLOW_NEW]) == {3} + assert one.pool._get_flow_nums(['4', '5']) == {4, 5} + # the only active task still only has flows {1, 2} + assert one.pool._get_flow_nums([FLOW_ALL]) == {1, 2} + + +@pytest.mark.parametrize('command', ['trigger', 'set']) +async def test_flow_assignment(flow, scheduler, start, command: str): + """Test flow assignment when triggering/setting tasks. + + Active tasks: + By default keep existing flows, else merge with requested flows. + Inactive tasks: + By default assign active flows; else assign requested flows. + + """ + conf = { + 'scheduler': { + 'allow implicit tasks': 'True' + }, + 'scheduling': { + 'graph': { + 'R1': "foo & bar => a & b & c & d & e" + } + }, + 'runtime': { + 'foo': { + 'outputs': {'x': 'x'} + } + }, + } + id_ = flow(conf) + schd: Scheduler = scheduler(id_, run_mode='simulation', paused_start=True) + async with start(schd): + if command == 'set': + do_command: Callable = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=['x'], prereqs=[] + ) + else: + do_command = schd.pool.force_trigger_tasks + + active_a, active_b = schd.pool.get_tasks() + schd.pool.merge_flows(active_b, schd.pool._get_flow_nums([FLOW_NEW])) + assert active_a.flow_nums == {1} + assert active_b.flow_nums == {1, 2} + + # -----(1. Test active tasks)----- + + # By default active tasks keep existing flow assignment. + do_command([active_a.identity], flow=[]) + assert active_a.flow_nums == {1} + + # Else merge existing flow with requested flows. + do_command([active_a.identity], flow=[FLOW_ALL]) + assert active_a.flow_nums == {1, 2} + + # (no-flow is ignored for active tasks) + do_command([active_a.identity], flow=[FLOW_NONE]) + assert active_a.flow_nums == {1, 2} + + do_command([active_a.identity], flow=[FLOW_NEW]) + assert active_a.flow_nums == {1, 2, 3} + + # -----(2. Test inactive tasks)----- + if command == 'set': + do_command = functools.partial( + schd.pool.set_prereqs_and_outputs, outputs=[], prereqs=['all'] + ) + + # By default inactive tasks get all active flows. + do_command(['1/a'], flow=[]) + assert schd.pool._get_task_by_id('1/a').flow_nums == {1, 2, 3} + + # Else assign requested flows. + do_command(['1/b'], flow=[FLOW_NONE]) + assert schd.pool._get_task_by_id('1/b').flow_nums == set() + + do_command(['1/c'], flow=[FLOW_NEW]) + assert schd.pool._get_task_by_id('1/c').flow_nums == {4} + + do_command(['1/d'], flow=[FLOW_ALL]) + assert schd.pool._get_task_by_id('1/d').flow_nums == {1, 2, 3, 4} + do_command(['1/e'], flow=[7]) + assert schd.pool._get_task_by_id('1/e').flow_nums == {7} diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index cb6c65efd4a..5681c9963c9 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -339,7 +339,7 @@ async def test_match_taskdefs( [ param( ['1/foo', '3/asd'], ['1/foo', '3/asd'], [], - id="Active & future tasks" + id="Active & inactive tasks" ), param( ['1/*', '2/*', '3/*', '6/*'], @@ -364,7 +364,7 @@ async def test_match_taskdefs( ['1/foo:waiting', '1/foo:failed', '6/bar:waiting'], ['1/foo'], ["No active tasks matching: 1/foo:failed", "No active tasks matching: 6/bar:waiting"], - id="Specifying task state works for active tasks, not future tasks" + id="Specifying task state works for active tasks, not inactive tasks" ) ] ) @@ -409,7 +409,7 @@ async def test_release_held_tasks( ) -> None: """Test TaskPool.release_held_tasks(). - For a workflow with held active tasks 1/foo & 1/bar, and held future task + For a workflow with held active tasks 1/foo & 1/bar, and held inactive task 3/asd. We skip testing the matching logic here because it would be slow using the @@ -1335,7 +1335,7 @@ async def test_set_prereqs( "20400101T0000Z/foo"] ) - # set one prereq of future task 20400101T0000Z/qux + # set one prereq of inactive task 20400101T0000Z/qux schd.pool.set_prereqs_and_outputs( ["20400101T0000Z/qux"], None, @@ -1505,7 +1505,7 @@ async def test_set_outputs_future( start, log_filter, ): - """Check manual setting of future task outputs. + """Check manual setting of inactive task outputs. """ id_ = flow( @@ -1532,7 +1532,7 @@ async def test_set_outputs_future( # it should start up with just 1/a assert pool_get_task_ids(schd.pool) == ["1/a"] - # setting future task b succeeded should spawn c but not b + # setting inactive task b succeeded should spawn c but not b schd.pool.set_prereqs_and_outputs( ["1/b"], ["succeeded"], None, ['all']) assert ( @@ -2110,3 +2110,58 @@ async def test_expire_dequeue_with_retries(flow, scheduler, start, expire_type): # the task should also have been removed from the queue assert not schd.pool.task_queue_mgr.remove_task(itask) + + +async def test_downstream_complete_before_upstream( + flow, scheduler, start, db_select +): + """It should handle an upstream task completing before a downstream task. + + See https://github.com/cylc/cylc-flow/issues/6315 + """ + id_ = flow( + { + 'scheduling': { + 'graph': { + 'R1': 'a => b', + }, + }, + } + ) + schd = scheduler(id_) + async with start(schd): + # 1/a should be pre-spawned (parentless) + a_1 = schd.pool.get_task(IntegerPoint('1'), 'a') + assert a_1 + + # spawn 1/b (this can happens as the result of request e.g. trigger) + b_1 = schd.pool.spawn_task('b', IntegerPoint('1'), {1}) + schd.pool.add_to_pool(b_1) + assert b_1 + + # mark 1/b as succeeded + schd.task_events_mgr.process_message(b_1, 'INFO', 'succeeded') + + # 1/b should be removed from the pool (completed) + assert schd.pool.get_tasks() == [a_1] + + # as a side effect the DB should have been updated + assert ( + TASK_OUTPUT_SUCCEEDED + in db_select( + schd, + # "False" means "do not run the DB update before checking it" + False, # do not change this to "True" + 'task_outputs', + 'outputs', + name='b', + cycle='1', + )[0][0] + ) + + # mark 1/a as succeeded + schd.task_events_mgr.process_message(a_1, 'INFO', 'succeeded') + + # 1/a should be removed from the pool (completed) + # 1/b should not be re-spawned by the success of 1/a + assert schd.pool.get_tasks() == [] diff --git a/tests/integration/test_trigger.py b/tests/integration/test_trigger.py deleted file mode 100644 index c907506e9ae..00000000000 --- a/tests/integration/test_trigger.py +++ /dev/null @@ -1,73 +0,0 @@ -# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. -# Copyright (C) NIWA & British Crown (Met Office) & Contributors. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import logging - -from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE - -import pytest -import time - - -@pytest.mark.parametrize( - 'flow_strs', - ( - [FLOW_ALL, '1'], - ['1', FLOW_ALL], - [FLOW_NEW, '1'], - [FLOW_NONE, '1'], - ['a'], - ['1', 'a'], - ) -) -async def test_trigger_invalid(mod_one, start, log_filter, flow_strs): - """Ensure invalid flow values are rejected.""" - async with start(mod_one) as log: - log.clear() - assert mod_one.pool.force_trigger_tasks(['*'], flow_strs) is None - assert len(log_filter(logging.WARNING)) == 1 - - -async def test_trigger_no_flows(one, start, log_filter): - """Test triggering a task with no flows present. - - It should get the flow numbers of the most recent active tasks. - """ - async with start(one): - - # Remove the task (flow 1) --> pool empty - task = one.pool.get_tasks()[0] - one.pool.remove(task) - assert len(one.pool.get_tasks()) == 0 - - # Trigger the task, with new flow nums. - time.sleep(2) # The flows need different timestamps! - one.pool.force_trigger_tasks([task.identity], [5, 9]) - assert len(one.pool.get_tasks()) == 1 - - # Ensure the new flow is in the db. - one.pool.workflow_db_mgr.process_queued_ops() - - # Remove the task --> pool empty - task = one.pool.get_tasks()[0] - one.pool.remove(task) - assert len(one.pool.get_tasks()) == 0 - - # Trigger the task; it should get flow nums 5, 9 - one.pool.force_trigger_tasks([task.identity], [FLOW_ALL]) - assert len(one.pool.get_tasks()) == 1 - task = one.pool.get_tasks()[0] - assert task.flow_nums == {5, 9} diff --git a/tests/unit/test_command_validation.py b/tests/unit/test_command_validation.py new file mode 100644 index 00000000000..42fdda5aedf --- /dev/null +++ b/tests/unit/test_command_validation.py @@ -0,0 +1,41 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import pytest + +from cylc.flow.command_validation import ( + ERR_OPT_FLOW_COMBINE, + ERR_OPT_FLOW_VAL, + flow_opts, +) +from cylc.flow.exceptions import InputError +from cylc.flow.flow_mgr import FLOW_ALL, FLOW_NEW, FLOW_NONE + + +@pytest.mark.parametrize('flow_strs, expected_msg', [ + ([FLOW_ALL, '1'], ERR_OPT_FLOW_COMBINE.format(FLOW_ALL)), + (['1', FLOW_ALL], ERR_OPT_FLOW_COMBINE.format(FLOW_ALL)), + ([FLOW_NEW, '1'], ERR_OPT_FLOW_COMBINE.format(FLOW_NEW)), + ([FLOW_NONE, '1'], ERR_OPT_FLOW_COMBINE.format(FLOW_NONE)), + ([FLOW_NONE, FLOW_ALL], ERR_OPT_FLOW_COMBINE.format(FLOW_NONE)), + (['a'], ERR_OPT_FLOW_VAL), + (['1', 'a'], ERR_OPT_FLOW_VAL), +]) +async def test_trigger_invalid(flow_strs, expected_msg): + """Ensure invalid flow values are rejected during command validation.""" + with pytest.raises(InputError) as exc_info: + flow_opts(flow_strs, False) + assert str(exc_info.value) == expected_msg diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 33d6241f278..18c5f37bd8b 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -267,7 +267,7 @@ def test_family_inheritance_and_quotes( @pytest.mark.parametrize( - ('cycling_type', 'scheduling_cfg', 'expected_icp', 'expected_opt_icp', + ('cycling_type', 'scheduling_cfg', 'expected_icp', 'expected_eval_icp', 'expected_err'), [ pytest.param( @@ -356,7 +356,7 @@ def test_process_icp( cycling_type: str, scheduling_cfg: Dict[str, Any], expected_icp: Optional[str], - expected_opt_icp: Optional[str], + expected_eval_icp: Optional[str], expected_err: Optional[Tuple[Type[Exception], str]], monkeypatch: pytest.MonkeyPatch, set_cycling_type: Fixture ) -> None: @@ -368,7 +368,7 @@ def test_process_icp( cycling_type: Workflow cycling type. scheduling_cfg: 'scheduling' section of workflow config. expected_icp: The expected icp value that gets set. - expected_opt_icp: The expected value of options.icp that gets set + expected_eval_icp: The expected value of options.icp that gets set (this gets stored in the workflow DB). expected_err: Exception class expected to be raised plus the message. """ @@ -396,10 +396,10 @@ def test_process_icp( assert mocked_config.cfg[ 'scheduling']['initial cycle point'] == expected_icp assert str(mocked_config.initial_point) == expected_icp - opt_icp = mocked_config.options.icp - if opt_icp is not None: - opt_icp = str(loader.get_point(opt_icp).standardise()) - assert opt_icp == expected_opt_icp + eval_icp = mocked_config.evaluated_icp + if eval_icp is not None: + eval_icp = str(loader.get_point(eval_icp).standardise()) + assert eval_icp == expected_eval_icp @pytest.mark.parametrize(