diff --git a/CHANGES.md b/CHANGES.md index 0c33e6ce915..15e89f118a2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -29,8 +29,6 @@ $ towncrier create ..md --content "Short description" ### 🔧 Fixes -[#6178](https://github.com/cylc/cylc-flow/pull/6178) - Fix an issue where Tui could hang when closing. - [#6186](https://github.com/cylc/cylc-flow/pull/6186) - Fixed bug where using flow numbers with `cylc set` would not work correctly. [#6200](https://github.com/cylc/cylc-flow/pull/6200) - Fixed bug where a stalled paused workflow would be incorrectly reported as running, not paused @@ -47,6 +45,8 @@ $ towncrier create ..md --content "Short description" [#6176](https://github.com/cylc/cylc-flow/pull/6176) - Fix bug where jobs which fail to submit are not shown in GUI/TUI if submission retries are set. +[#6178](https://github.com/cylc/cylc-flow/pull/6178) - Fix an issue where Tui could hang when closing. + ## __cylc-8.3.0 (Released 2024-06-18)__ ### ⚠ Breaking Changes diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 30e81f26f93..adc342dcab5 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -95,6 +95,7 @@ requests_). - Mark Dawson - Diquan Jabbour - Shixian Sheng + - Paul Earnshaw (All contributors are identifiable with email addresses in the git version diff --git a/changes.d/6137.feat.md b/changes.d/6137.feat.md new file mode 100644 index 00000000000..d947999de99 --- /dev/null +++ b/changes.d/6137.feat.md @@ -0,0 +1 @@ +New Cylc lint rule: S014: Don't use job runner specific execution time limit directives, use execution time limit. \ No newline at end of file diff --git a/changes.d/6168.feat.md b/changes.d/6168.feat.md new file mode 100644 index 00000000000..84ccc73b236 --- /dev/null +++ b/changes.d/6168.feat.md @@ -0,0 +1 @@ +Allow symlinking log/job separately from log diff --git a/cylc/flow/__init__.py b/cylc/flow/__init__.py index 3f0bfe5c0b1..71a45f6bbf3 100644 --- a/cylc/flow/__init__.py +++ b/cylc/flow/__init__.py @@ -15,9 +15,8 @@ # along with this program. If not, see . """Set up the cylc environment.""" -import os import logging - +import os CYLC_LOG = 'cylc' @@ -53,7 +52,7 @@ def environ_init(): environ_init() -__version__ = '8.3.4.dev' +__version__ = '8.4.0.dev' def iter_entry_points(entry_point_name): diff --git a/cylc/flow/broadcast_report.py b/cylc/flow/broadcast_report.py index 72fedb4cbef..cb44f2212f3 100644 --- a/cylc/flow/broadcast_report.py +++ b/cylc/flow/broadcast_report.py @@ -72,7 +72,7 @@ def get_broadcast_change_iter(modified_settings, is_cancel=False): value = setting keys_str = "" while isinstance(value, dict): - key, value = list(value.items())[0] + key, value = next(iter(value.items())) if isinstance(value, dict): keys_str += "[" + key + "]" else: diff --git a/cylc/flow/cfgspec/globalcfg.py b/cylc/flow/cfgspec/globalcfg.py index 7dc04c99a02..4988da885e2 100644 --- a/cylc/flow/cfgspec/globalcfg.py +++ b/cylc/flow/cfgspec/globalcfg.py @@ -30,6 +30,7 @@ from cylc.flow.exceptions import GlobalConfigError from cylc.flow.hostuserutil import get_user_home from cylc.flow.network.client_factory import CommsMeth +from cylc.flow.pathutil import SYMLINKABLE_LOCATIONS from cylc.flow.parsec.config import ( ConfigNode as Conf, ParsecConfig, @@ -1148,55 +1149,21 @@ def default_for( .. versionadded:: 8.0.0 """) - Conf('log', VDR.V_STRING, None, desc=""" - Alternative location for the log dir. - - If specified the workflow log directory will be created in - ``/cylc-run//log`` and a - symbolic link will be created from - ``$HOME/cylc-run//log``. If not specified - the workflow log directory will be created in - ``$HOME/cylc-run//log``. - - .. versionadded:: 8.0.0 - """) - Conf('share', VDR.V_STRING, None, desc=""" - Alternative location for the share dir. - - If specified the workflow share directory will be - created in ``/cylc-run//share`` - and a symbolic link will be created from - ``<$HOME/cylc-run//share``. If not specified - the workflow share directory will be created in - ``$HOME/cylc-run//share``. - - .. versionadded:: 8.0.0 - """) - Conf('share/cycle', VDR.V_STRING, None, desc=""" - Alternative directory for the share/cycle dir. - - If specified the workflow share/cycle directory - will be created in - ``/cylc-run//share/cycle`` - and a symbolic link will be created from - ``$HOME/cylc-run//share/cycle``. If not - specified the workflow share/cycle directory will be - created in ``$HOME/cylc-run//share/cycle``. - - .. versionadded:: 8.0.0 - """) - Conf('work', VDR.V_STRING, None, desc=""" - Alternative directory for the work dir. - - If specified the workflow work directory will be created in - ``/cylc-run//work`` and a - symbolic link will be created from - ``$HOME/cylc-run//work``. If not specified - the workflow work directory will be created in - ``$HOME/cylc-run//work``. - - .. versionadded:: 8.0.0 - """) + for folder, versionadded in SYMLINKABLE_LOCATIONS.items(): + Conf(folder, VDR.V_STRING, None, desc=f""" + Alternative location for the log dir. + + If specified the workflow {folder} directory will + be created in + ``/cylc-run//{folder}`` + and a symbolic link will be created from + ``$HOME/cylc-run//{folder}``. + If not specified the workflow log directory will + be created in + ``$HOME/cylc-run//{folder}``. + + .. versionadded:: {versionadded} + """) with Conf('platforms', desc=''' Platforms allow you to define compute resources available at your site. @@ -1311,7 +1278,7 @@ def default_for( The means by which task progress messages are reported back to the running workflow. - Options: + ..rubric:: Options: zmq Direct client-server TCP communication via network ports @@ -1320,6 +1287,8 @@ def default_for( ssh Use non-interactive ssh for task communications + For more information, see :ref:`TaskComms`. + .. versionchanged:: 8.0.0 {REPLACES}``global.rc[hosts][]task communication diff --git a/cylc/flow/clean.py b/cylc/flow/clean.py index b38f01b12fc..b4597780fd3 100644 --- a/cylc/flow/clean.py +++ b/cylc/flow/clean.py @@ -129,7 +129,7 @@ def _clean_check(opts: 'Values', id_: str, run_dir: Path) -> None: except ContactFileExists as exc: raise ServiceFileError( f"Cannot clean running workflow {id_}.\n\n{exc}" - ) + ) from None def init_clean(id_: str, opts: 'Values') -> None: @@ -173,7 +173,7 @@ def init_clean(id_: str, opts: 'Values') -> None: try: platform_names = get_platforms_from_db(local_run_dir) except ServiceFileError as exc: - raise ServiceFileError(f"Cannot clean {id_} - {exc}") + raise ServiceFileError(f"Cannot clean {id_} - {exc}") from None except sqlite3.OperationalError as exc: # something went wrong with the query # e.g. the table/field we need isn't there @@ -186,7 +186,7 @@ def init_clean(id_: str, opts: 'Values') -> None: ' local files (you may need to remove files on other' ' platforms manually).' ) - raise ServiceFileError(f"Cannot clean {id_} - {exc}") + raise ServiceFileError(f"Cannot clean {id_} - {exc}") from exc if platform_names and platform_names != {'localhost'}: remote_clean( @@ -361,7 +361,8 @@ def remote_clean( except PlatformLookupError as exc: raise PlatformLookupError( f"Cannot clean {id_} on remote platforms as the workflow database " - f"is out of date/inconsistent with the global config - {exc}") + f"is out of date/inconsistent with the global config - {exc}" + ) from None queue: Deque[RemoteCleanQueueTuple] = deque() remote_clean_cmd = partial( diff --git a/cylc/flow/command_validation.py b/cylc/flow/command_validation.py index 1c57d452c43..ea67f00206e 100644 --- a/cylc/flow/command_validation.py +++ b/cylc/flow/command_validation.py @@ -70,7 +70,7 @@ def flow_opts(flows: List[str], flow_wait: bool) -> None: try: int(val) except ValueError: - raise InputError(ERR_OPT_FLOW_VAL.format(val)) + raise InputError(ERR_OPT_FLOW_VAL.format(val)) from None if flow_wait and flows[0] in [FLOW_NEW, FLOW_NONE]: raise InputError(ERR_OPT_FLOW_WAIT) diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index 134681bdfd5..173984f17e0 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -211,7 +211,7 @@ async def stop( try: mode = StopMode(mode) except ValueError: - raise CommandFailedError(f"Invalid stop mode: '{mode}'") + raise CommandFailedError(f"Invalid stop mode: '{mode}'") from None schd._set_stop(mode) if mode is StopMode.REQUEST_KILL: schd.time_next_kill = time() @@ -309,7 +309,7 @@ async def set_verbosity(schd: 'Scheduler', level: Union[int, str]): lvl = int(level) LOG.setLevel(lvl) except (TypeError, ValueError) as exc: - raise CommandFailedError(exc) + raise CommandFailedError(exc) from None cylc.flow.flags.verbosity = log_level_to_verbosity(lvl) diff --git a/cylc/flow/config.py b/cylc/flow/config.py index a3da01011a5..d18ee89a0d4 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -199,11 +199,11 @@ def interpolate_template(tmpl, params_dict): try: return tmpl % params_dict except KeyError: - raise ParamExpandError('bad parameter') + raise ParamExpandError('bad parameter') from None except TypeError: - raise ParamExpandError('wrong data type for parameter') + raise ParamExpandError('wrong data type for parameter') from None except ValueError: - raise ParamExpandError('bad template syntax') + raise ParamExpandError('bad template syntax') from None class WorkflowConfig: @@ -480,8 +480,8 @@ def __init__( get_interval(offset_string).standardise()) except IntervalParsingError: raise WorkflowConfigError( - "Illegal %s spec: %s" % ( - s_type, offset_string)) + "Illegal %s spec: %s" % (s_type, offset_string) + ) from None extn = "(" + offset_string + ")" # Replace family names with members. @@ -710,10 +710,12 @@ def process_initial_cycle_point(self) -> None: icp = ingest_time(orig_icp, get_current_time_string()) except IsodatetimeError as exc: raise WorkflowConfigError(str(exc)) + # Save un-evaluated point for DB self.raw_icp = None if icp != orig_icp: self.raw_icp = icp + self.initial_point = get_point(icp).standardise() self.cfg['scheduling']['initial cycle point'] = str(self.initial_point) @@ -761,7 +763,7 @@ def process_start_cycle_point(self) -> None: for taskid in self.options.starttask ] except ValueError as exc: - raise InputError(str(exc)) + raise InputError(str(exc)) from None self.start_point = min( get_point(cycle).standardise() for cycle in cycle_points if cycle @@ -1114,7 +1116,7 @@ def _check_completion_expression(self, task_name: str, expr: str) -> None: f'\n {expr}' '\nThe "finished" output cannot be used in completion' ' expressions, use "succeeded or failed".' - ) + ) from None for alt_qualifier, qualifier in ALT_QUALIFIERS.items(): _alt_compvar = trigger_to_completion_variable(alt_qualifier) @@ -1125,21 +1127,21 @@ def _check_completion_expression(self, task_name: str, expr: str) -> None: f'\n {expr}' f'\nUse "{_compvar}" not "{_alt_compvar}" ' 'in completion expressions.' - ) + ) from None raise WorkflowConfigError( # NOTE: str(exc) == "name 'x' is not defined" tested in # tests/integration/test_optional_outputs.py f'Error in [runtime][{task_name}]completion:' f'\n{error}' - ) + ) from None except Exception as exc: # includes InvalidCompletionExpression # expression contains non-whitelisted syntax or any other error in # the expression e.g. SyntaxError raise WorkflowConfigError( f'Error in [runtime][{task_name}]completion:' f'\n{str(exc)}' - ) + ) from None # ensure consistency between the graph and the completion expression for compvar in ( @@ -1415,11 +1417,12 @@ def compute_family_tree(self): c3_single.mro(name)) except RecursionError: raise WorkflowConfigError( - "circular [runtime] inheritance?") + "circular [runtime] inheritance?" + ) from None except Exception as exc: # catch inheritance errors # TODO - specialise MRO exceptions - raise WorkflowConfigError(str(exc)) + raise WorkflowConfigError(str(exc)) from None for name in self.cfg['runtime']: ancestors = self.runtime['linearized ancestors'][name] @@ -1771,7 +1774,7 @@ def _check_task_event_handlers(self): f' {taskdef.name}:' f' {handler_template}:' f' {repr(exc)}' - ) + ) from None def _check_special_tasks(self): """Check declared special tasks are valid, and detect special @@ -1878,7 +1881,9 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, try: expr_list = listify(lexpression) except SyntaxError: - raise WorkflowConfigError('Error in expression "%s"' % lexpression) + raise WorkflowConfigError( + 'Error in expression "%s"' % lexpression + ) from None triggers = {} xtrig_labels = set() @@ -1955,7 +1960,9 @@ def generate_triggers(self, lexpression, left_nodes, right, seq, xtrig = xtrigs[label] except KeyError: if label != 'wall_clock': - raise WorkflowConfigError(f"xtrigger not defined: {label}") + raise WorkflowConfigError( + f"xtrigger not defined: {label}" + ) from None else: # Allow "@wall_clock" in graph as implicit zero-offset. xtrig = SubFuncContext('wall_clock', 'wall_clock', [], {}) @@ -2289,7 +2296,7 @@ def load_graph(self): msg += ' (final cycle point=%s)' % fcp if isinstance(exc, CylcError): msg += ' %s' % exc.args[0] - raise WorkflowConfigError(msg) + raise WorkflowConfigError(msg) from None self.sequences.append(seq) parser = GraphParser( family_map, @@ -2444,7 +2451,7 @@ def get_taskdef( except TaskDefError as exc: if orig_expr: LOG.error(orig_expr) - raise WorkflowConfigError(str(exc)) + raise WorkflowConfigError(str(exc)) from None else: # Record custom message outputs from [runtime]. messages = set(self.cfg['runtime'][name]['outputs'].values()) @@ -2458,7 +2465,7 @@ def get_taskdef( 'Duplicate task message in' f' "[runtime][{name}][outputs]' f'{output} = {message}" - messages must be unique' - ) + ) from None valid, msg = TaskOutputValidator.validate(output) if not valid: raise WorkflowConfigError( @@ -2484,7 +2491,7 @@ def _get_taskdef(self, name: str) -> TaskDef: try: rtcfg = self.cfg['runtime'][name] except KeyError: - raise WorkflowConfigError("Task not defined: %s" % name) + raise WorkflowConfigError("Task not defined: %s" % name) from None # We may want to put in some handling for cases of changing the # initial cycle via restart (accidentally or otherwise). @@ -2576,7 +2583,9 @@ def process_metadata_urls(self): 'workflow': self.workflow, } except (KeyError, ValueError): - raise InputError(f'Invalid template [meta]URL: {url}') + raise InputError( + f'Invalid template [meta]URL: {url}' + ) from None else: LOG.warning( 'Detected deprecated template variables in [meta]URL.' @@ -2612,7 +2621,9 @@ def process_metadata_urls(self): 'task': name, } except (KeyError, ValueError): - raise InputError(f'Invalid template [meta]URL: {url}') + raise InputError( + f'Invalid template [meta]URL: {url}' + ) from None else: LOG.warning( 'Detected deprecated template variables in' diff --git a/cylc/flow/cycling/integer.py b/cylc/flow/cycling/integer.py index 749c651fc08..20cc3765670 100644 --- a/cylc/flow/cycling/integer.py +++ b/cylc/flow/cycling/integer.py @@ -150,7 +150,7 @@ def standardise(self): try: self.value = str(int(self)) except (TypeError, ValueError) as exc: - raise PointParsingError(type(self), self.value, exc) + raise PointParsingError(type(self), self.value, exc) from None return self def __int__(self): diff --git a/cylc/flow/cycling/iso8601.py b/cylc/flow/cycling/iso8601.py index a66ce3f5ba0..2ab311df425 100644 --- a/cylc/flow/cycling/iso8601.py +++ b/cylc/flow/cycling/iso8601.py @@ -102,7 +102,7 @@ def standardise(self): WorkflowSpecifics.NUM_EXPANDED_YEAR_DIGITS) else: message = str(exc) - raise PointParsingError(type(self), self.value, message) + raise PointParsingError(type(self), self.value, message) from None return self def sub(self, other): @@ -176,7 +176,7 @@ def standardise(self): try: self.value = str(interval_parse(self.value)) except IsodatetimeError: - raise IntervalParsingError(type(self), self.value) + raise IntervalParsingError(type(self), self.value) from None return self def add(self, other): @@ -782,7 +782,7 @@ def prev_next( raise WorkflowConfigError( f'Invalid offset: {my_time}:' f' Offset lists are semicolon separated, try {suggest}' - ) + ) from None timepoints.append(parsed_point + now) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index b98a055f882..459abf77e56 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -116,6 +116,7 @@ if TYPE_CHECKING: from cylc.flow.cycling import PointBase from cylc.flow.flow_mgr import FlowNums + from cylc.flow.prerequisite import Prerequisite EDGES = 'edges' FAMILIES = 'families' @@ -1444,7 +1445,7 @@ def apply_task_proxy_db_history(self): prereq_ids.add(f'{relative_id}/{flow_nums_str}') # Batch load prerequisites of tasks according to flow. - prereqs_map = {} + prereqs_map: Dict[str, dict] = {} for ( cycle, name, prereq_name, prereq_cycle, prereq_output, satisfied @@ -1458,16 +1459,14 @@ def apply_task_proxy_db_history(self): ] = satisfied if satisfied != '0' else False for ikey, prereqs in prereqs_map.items(): + itask_prereq: Prerequisite for itask_prereq in ( - self.db_load_task_proxies[ikey][0].state.prerequisites + self.db_load_task_proxies[ikey][0].state.prerequisites ): - for key in itask_prereq.satisfied.keys(): - try: - itask_prereq.satisfied[key] = prereqs[key] - except KeyError: - # This prereq is not in the DB: new dependencies - # added to an already-spawned task before restart. - itask_prereq.satisfied[key] = False + for key in itask_prereq: + itask_prereq[key] = prereqs.get(key, False) + # (False if prereq is not in the DB: new dependencies + # added to an already-spawned task before restart.) # Extract info from itasks to data-store. for task_info in self.db_load_task_proxies.values(): diff --git a/cylc/flow/dbstatecheck.py b/cylc/flow/dbstatecheck.py index b38c394ccdb..88dd5de3be1 100644 --- a/cylc/flow/dbstatecheck.py +++ b/cylc/flow/dbstatecheck.py @@ -86,7 +86,7 @@ def __init__(self, rund, workflow, db_path=None): except sqlite3.OperationalError: with suppress(Exception): self.conn.close() - raise exc # original error + raise exc from None # original error def __enter__(self): return self @@ -137,7 +137,7 @@ def adjust_point_to_db(self, cycle, offset): raise InputError( f'Cycle point "{cycle}" is not compatible' f' with DB point format "{self.db_point_fmt}"' - ) + ) from None return cycle @staticmethod diff --git a/cylc/flow/etc/examples/extending-workflow/.validate b/cylc/flow/etc/examples/extending-workflow/.validate new file mode 100755 index 00000000000..43c810372ce --- /dev/null +++ b/cylc/flow/etc/examples/extending-workflow/.validate @@ -0,0 +1,80 @@ +#!/bin/bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +set -eux + +test_simple () { + local ID + ID="$(< /dev/urandom tr -dc A-Za-z | head -c6)" + + # lint + cylc lint ./simple + + # copy into a temp directory + local SRC_DIR + SRC_DIR="$(mktemp -d)" + cp simple/flow.cylc "$SRC_DIR" + + # speed things up with simulation mode + cat >>"${SRC_DIR}/flow.cylc" <<__HERE__ + [runtime] + [[root]] + [[[simulation]]] + default run length = PT0S +__HERE__ + + # start the workflow + cylc vip \ + --check-circular \ + --no-run-name \ + --no-detach \ + --workflow-name "$ID" \ + --mode=simulation \ + "$SRC_DIR" + + # it should have reached the 2002 cycle + grep '2002/a' "${HOME}/cylc-run/${ID}/log/scheduler/log" + if grep '2003/a' "${HOME}/cylc-run/${ID}/log/scheduler/log"; then + exit 1 + fi + + # edit the "stop after cycle point" + sed -i \ + 's/stop after cycle point.*/stop after cycle point = 2004/' \ + "${SRC_DIR}/flow.cylc" + + # continue the run + cylc vr \ + --no-detach \ + --mode=simulation \ + --yes \ + "$ID" + + # it should have reached the 2004 cycle + grep '2004/a' "${HOME}/cylc-run/${ID}/log/scheduler/log" + if grep '2005/a' "${HOME}/cylc-run/${ID}/log/scheduler/log"; then + exit 1 + fi + + # clean up + cylc clean "$ID" + + rm -r "${SRC_DIR}" +} + + +test_simple diff --git a/cylc/flow/etc/examples/extending-workflow/index.rst b/cylc/flow/etc/examples/extending-workflow/index.rst new file mode 100644 index 00000000000..036438ff466 --- /dev/null +++ b/cylc/flow/etc/examples/extending-workflow/index.rst @@ -0,0 +1,105 @@ +Extending Workflow +------------------ + +.. cylc-scope:: flow.cylc[scheduling] + +Sometimes we may run a workflow to :term:`completion `, +but subsequently wish to run it for a few more cycles. + +With Cylc 7 this was often done by changing the `final cycle point` and +restarting the workflow. This approach worked, but was a little awkward. +It's possible with Cylc 8, but we would recommend moving away from this +pattern instead. + +The recommended approach to this problem (Cylc 6+) is to use the +`stop after cycle point` rather than the `final cycle point`. + +The `stop after cycle point` tells Cylc to **stop** after the workflow passes +the specified point, whereas the `final cycle point` tells Cylc that the +workflow **finishes** at the specified point. + +When a workflow **finishes**, it is a little awkward to restart as you have to +tell Cylc which tasks to continue on from. The `stop after cycle point` +solution avoids this issue. + + +Example +^^^^^^^ + +.. admonition:: Get a copy of this example + :class: hint + + .. code-block:: console + + $ cylc get-resources examples/extending-workflow/simple + +This workflow will stop at the end of the ``2002`` cycle: + +.. literalinclude:: simple/flow.cylc + :language: cylc + +After it has run and shut down, change the `stop after cycle point` to +the desired value and restart it. E.g: + +.. code-block:: bash + + # install and run the workflow: + cylc vip + + # then later edit "stop after cycle point" to "2004" + + # then reinstall and restart the workflow: + cylc vr + +The workflow will continue from where it left off and run until the end of the +``2004`` cycle. Because the workflow never hit the `final cycle point` it +never "finished" so no special steps are required to restart the workflow. + +You can also set the `stop after cycle point` when you start the workflow: + +.. code-block:: bash + + cylc play --stop-cycle-point=2020 myworkflow + +Or change it at any point whilst the workflow is running: + +.. code-block:: bash + + cylc stop myworkflow//2030 # change the stop after cycle point to 2030 + +.. note:: + + If you set the `stop after cycle point` on the command line, this value will + take precedence over the one in the workflow configuration. Use + ``cylc play --stop-cycle-point=reload`` to restart the workflow using the + `stop after cycle point` configured in the workflow configuration. + + +Running Tasks At The `stop after cycle point` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If you have tasks that you want to run before the workflow shuts down at the +`stop after cycle point`, use the recurrence ``R1/`` to schedudule +them, e.g: + +.. code-block:: cylc + + #!Jinja2 + + {% set stop_cycle = '3000' %} + + [scheduling] + initial cycle point = 2000 + stop after cycle point = {{ stop_cycle }} + [[graph]] + R1/{{ stop_cycle }} = """ + # these tasks will run *before* the workflow shuts down + z => run_me => and_me + """ + +When the workflow is subsequently restarted with a later +`stop after cycle point`, these tasks will be re-scheduled at the new +stop point. + + +.. cylc-scope:: diff --git a/cylc/flow/etc/examples/extending-workflow/simple/flow.cylc b/cylc/flow/etc/examples/extending-workflow/simple/flow.cylc new file mode 100644 index 00000000000..2de2f28f2ea --- /dev/null +++ b/cylc/flow/etc/examples/extending-workflow/simple/flow.cylc @@ -0,0 +1,24 @@ +[meta] + title = "Basic extendable workflow" + description = """ + Use the "stop after cycle point" rather than the "final cycle point" + to allow this workflow to be easily extended at a later date. + """ + +[scheduler] + # use the year for the cycle point + # (strip off the month, day, hour and minute) + cycle point format = CCYY + +[scheduling] + initial cycle point = 2000 + stop after cycle point = 2002 # stop after two years of simulated time + [[graph]] + P1Y = """ + z[-P1Y] => a + a => z + """ + +[runtime] + [[a]] + [[z]] diff --git a/cylc/flow/etc/syntax/cylc.lang b/cylc/flow/etc/syntax/cylc.lang index 0270c7f3cfc..c3f43da2c95 100644 --- a/cylc/flow/etc/syntax/cylc.lang +++ b/cylc/flow/etc/syntax/cylc.lang @@ -219,7 +219,7 @@ #\} - \{\{.*\}\} + \{\{.*?\}\} \{% diff --git a/cylc/flow/graph_parser.py b/cylc/flow/graph_parser.py index efbce16fb36..3fbfd8c1754 100644 --- a/cylc/flow/graph_parser.py +++ b/cylc/flow/graph_parser.py @@ -346,7 +346,7 @@ def parse_graph(self, graph_string: str) -> None: raise GraphParseError( f"Dangling {seq}:" f"{this_line}" - ) + ) from None part_lines.append(this_line) # Check that a continuation sequence doesn't end this line and @@ -638,7 +638,8 @@ def _proc_dep_pair( except KeyError: # "FAM:bad => foo" in LHS (includes "FAM => bar" too). raise GraphParseError( - f"Illegal family trigger in {expr}") + f"Illegal family trigger in {expr}" + ) from None else: # Not a family. if trig in self.__class__.fam_to_mem_trigger_map: @@ -911,7 +912,8 @@ def _compute_triggers( except KeyError: # Illegal family trigger on RHS of a pair. raise GraphParseError( - f"Illegal family trigger: {name}:{output}") + f"Illegal family trigger: {name}:{output}" + ) from None else: fam = False if not output: diff --git a/cylc/flow/host_select.py b/cylc/flow/host_select.py index 0eb34d088ca..69e32c68a71 100644 --- a/cylc/flow/host_select.py +++ b/cylc/flow/host_select.py @@ -373,7 +373,7 @@ def _filter_by_ranking(hosts, rankings, results, data=None): f'\n Expression: {item}' f'\n Configuration: {GLBL_CFG_STR}' f'\n Error: {exc}' - ) + ) from None if isinstance(result, bool): host_rankings[item] = result data[host][item] = result diff --git a/cylc/flow/id.py b/cylc/flow/id.py index 58fff7fa7bc..f2c8b05b4a1 100644 --- a/cylc/flow/id.py +++ b/cylc/flow/id.py @@ -128,7 +128,7 @@ def __getitem__(self, key): return dict.__getitem__(self, key) except KeyError: if key not in self._KEYS: - raise ValueError(f'Invalid token: {key}') + raise ValueError(f'Invalid token: {key}') from None return None def __str__(self): diff --git a/cylc/flow/id_cli.py b/cylc/flow/id_cli.py index 9c7493fd612..35afbf80d5a 100644 --- a/cylc/flow/id_cli.py +++ b/cylc/flow/id_cli.py @@ -167,9 +167,9 @@ def _parse_cli(*ids: str) -> List[Tokens]: # this ID is invalid with or without the trailing slash tokens = cli_tokenise(id_[:-1]) except ValueError: - raise InputError(f'Invalid ID: {id_}') + raise InputError(f'Invalid ID: {id_}') from None else: - raise InputError(f'Invalid ID: {id_}') + raise InputError(f'Invalid ID: {id_}') from None is_partial = tokens.get('workflow') and not tokens.get('cycle') is_relative = not tokens.get('workflow') @@ -347,7 +347,7 @@ async def parse_ids_async( if src: if not flow_file_path: # get the workflow file path from the run dir - flow_file_path = get_flow_file(list(workflows)[0]) + flow_file_path = get_flow_file(next(iter(workflows))) return workflows, flow_file_path return workflows, multi_mode @@ -375,7 +375,7 @@ async def parse_id_async( 'max_tasks': 1, }, ) - workflow_id = list(workflows)[0] + workflow_id = next(iter(workflows)) tokens_list = workflows[workflow_id] tokens: Optional[Tokens] if tokens_list: diff --git a/cylc/flow/install.py b/cylc/flow/install.py index eb59fdeb3d4..2a57ba0098b 100644 --- a/cylc/flow/install.py +++ b/cylc/flow/install.py @@ -331,7 +331,7 @@ def install_workflow( # This occurs when the file exists but is _not_ a directory. raise WorkflowFilesError( f"Cannot install as there is an existing file at {rundir}." - ) + ) from None if relink: link_runN(rundir) rsync_cmd = get_rsync_rund_cmd(source, rundir) @@ -532,7 +532,7 @@ def parse_cli_sym_dirs(symlink_dirs: str) -> Dict[str, Dict[str, Any]]: 'There is an error in --symlink-dirs option:' f' {pair}. Try entering option in the form ' '--symlink-dirs=\'log=$DIR, share=$DIR2, ...\'' - ) + ) from None if key not in possible_symlink_dirs: dirs = ', '.join(possible_symlink_dirs) raise InputError( diff --git a/cylc/flow/install_plugins/log_vc_info.py b/cylc/flow/install_plugins/log_vc_info.py index 29d861f7654..41479d6f1ed 100644 --- a/cylc/flow/install_plugins/log_vc_info.py +++ b/cylc/flow/install_plugins/log_vc_info.py @@ -253,7 +253,7 @@ def _run_cmd( except FileNotFoundError as exc: # This will only be raised if the VCS command is not installed, # otherwise Popen() will succeed with a non-zero return code - raise VCSNotInstalledError(vcs, exc) + raise VCSNotInstalledError(vcs, exc) from None if stdout == PIPE: out, err = pipe_poller(proc, proc.stdout, proc.stderr) else: diff --git a/cylc/flow/job_runner_handlers/loadleveler.py b/cylc/flow/job_runner_handlers/loadleveler.py index ee8203b2b47..d097ff85faf 100644 --- a/cylc/flow/job_runner_handlers/loadleveler.py +++ b/cylc/flow/job_runner_handlers/loadleveler.py @@ -83,6 +83,7 @@ class LoadlevelerHandler(): re.compile("^llsubmit: Processed command file through Submit Filter:")] SUBMIT_CMD_TMPL = "llsubmit '%(job)s'" VACATION_SIGNAL = "USR1" + TIME_LIMIT_DIRECTIVE = "wall_clock_limit" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -96,8 +97,8 @@ def format_directives(self, job_conf): directives["output"] = job_file_path + ".out" directives["error"] = job_file_path + ".err" if (job_conf["execution_time_limit"] and - directives.get("wall_clock_limit") is None): - directives["wall_clock_limit"] = "%d,%d" % ( + directives.get(self.TIME_LIMIT_DIRECTIVE) is None): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d,%d" % ( job_conf["execution_time_limit"] + 60, job_conf["execution_time_limit"]) for key, value in list(job_conf["directives"].items()): diff --git a/cylc/flow/job_runner_handlers/lsf.py b/cylc/flow/job_runner_handlers/lsf.py index a465c9b7924..534d1205a9d 100644 --- a/cylc/flow/job_runner_handlers/lsf.py +++ b/cylc/flow/job_runner_handlers/lsf.py @@ -70,6 +70,7 @@ class LSFHandler(): POLL_CMD = "bjobs" REC_ID_FROM_SUBMIT_OUT = re.compile(r"^Job <(?P\d+)>") SUBMIT_CMD_TMPL = "bsub" + TIME_LIMIT_DIRECTIVE = "-W" @classmethod def format_directives(cls, job_conf): @@ -82,8 +83,11 @@ def format_directives(cls, job_conf): ) directives["-o"] = job_file_path + ".out" directives["-e"] = job_file_path + ".err" - if job_conf["execution_time_limit"] and directives.get("-W") is None: - directives["-W"] = str(math.ceil( + if ( + job_conf["execution_time_limit"] + and directives.get(cls.TIME_LIMIT_DIRECTIVE) is None + ): + directives[cls.TIME_LIMIT_DIRECTIVE] = str(math.ceil( job_conf["execution_time_limit"] / 60)) for key, value in list(job_conf["directives"].items()): directives[key] = value diff --git a/cylc/flow/job_runner_handlers/moab.py b/cylc/flow/job_runner_handlers/moab.py index 839d246ccbc..ec068d48420 100644 --- a/cylc/flow/job_runner_handlers/moab.py +++ b/cylc/flow/job_runner_handlers/moab.py @@ -78,6 +78,7 @@ class MoabHandler: POLL_CMD = "checkjob" REC_ID_FROM_SUBMIT_OUT = re.compile(r"""\A\s*(?P\S+)\s*\Z""") SUBMIT_CMD_TMPL = "msub '%(job)s'" + TIME_LIMIT_DIRECTIVE = "-l walltime" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -91,8 +92,9 @@ def format_directives(self, job_conf): directives["-o"] = job_file_path + ".out" directives["-e"] = job_file_path + ".err" if (job_conf["execution_time_limit"] and - directives.get("-l walltime") is None): - directives["-l walltime"] = "%d" % job_conf["execution_time_limit"] + directives.get(self.TIME_LIMIT_DIRECTIVE) is None): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d" % job_conf[ + "execution_time_limit"] # restartable? directives.update(job_conf["directives"]) lines = [] diff --git a/cylc/flow/job_runner_handlers/pbs.py b/cylc/flow/job_runner_handlers/pbs.py index aa264311fc4..ac0d6c47a00 100644 --- a/cylc/flow/job_runner_handlers/pbs.py +++ b/cylc/flow/job_runner_handlers/pbs.py @@ -84,6 +84,7 @@ class PBSHandler: POLL_CANT_CONNECT_ERR = "Connection refused" REC_ID_FROM_SUBMIT_OUT = re.compile(r"^\s*(?P\d+)", re.M) SUBMIT_CMD_TMPL = "qsub '%(job)s'" + TIME_LIMIT_DIRECTIVE = "-l walltime" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -105,9 +106,12 @@ def format_directives(self, job_conf): directives["-o"] = job_file_path + ".out" directives["-e"] = job_file_path + ".err" - if (job_conf["execution_time_limit"] and - directives.get("-l walltime") is None): - directives["-l walltime"] = "%d" % job_conf["execution_time_limit"] + if ( + job_conf["execution_time_limit"] + and directives.get(self.TIME_LIMIT_DIRECTIVE) is None + ): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d" % job_conf[ + "execution_time_limit"] for key, value in list(job_conf["directives"].items()): directives[key] = value lines = [] diff --git a/cylc/flow/job_runner_handlers/sge.py b/cylc/flow/job_runner_handlers/sge.py index 33f7d5a26d7..c7c50956fb9 100644 --- a/cylc/flow/job_runner_handlers/sge.py +++ b/cylc/flow/job_runner_handlers/sge.py @@ -37,7 +37,6 @@ -cwd = -q = foo -l h_data = 1024M - -l h_rt = 24:00:00 These are written to the top of the job script like this: @@ -76,6 +75,7 @@ class SGEHandler: POLL_CMD = "qstat" REC_ID_FROM_SUBMIT_OUT = re.compile(r"\D+(?P\d+)\D+") SUBMIT_CMD_TMPL = "qsub '%(job)s'" + TIME_LIMIT_DIRECTIVE = "-l h_rt" def format_directives(self, job_conf): """Format the job directives for a job file.""" @@ -88,8 +88,8 @@ def format_directives(self, job_conf): directives['-o'] = job_file_path + ".out" directives['-e'] = job_file_path + ".err" if (job_conf["execution_time_limit"] and - directives.get("-l h_rt") is None): - directives["-l h_rt"] = "%d:%02d:%02d" % ( + directives.get(self.TIME_LIMIT_DIRECTIVE) is None): + directives[self.TIME_LIMIT_DIRECTIVE] = "%d:%02d:%02d" % ( job_conf["execution_time_limit"] / 3600, (job_conf["execution_time_limit"] / 60) % 60, job_conf["execution_time_limit"] % 60) diff --git a/cylc/flow/job_runner_handlers/slurm.py b/cylc/flow/job_runner_handlers/slurm.py index 4ec6be20471..33df2ebc926 100644 --- a/cylc/flow/job_runner_handlers/slurm.py +++ b/cylc/flow/job_runner_handlers/slurm.py @@ -135,6 +135,8 @@ class SLURMHandler(): # Separator between het job directive sections SEP_HETJOB = "#SBATCH hetjob" + TIME_LIMIT_DIRECTIVE = "--time" + @classmethod def filter_poll_many_output(cls, out): """Return list of job IDs extracted from job poll stdout. @@ -161,8 +163,8 @@ def format_directives(cls, job_conf): directives['--output'] = job_file_path.replace('%', '%%') + ".out" directives['--error'] = job_file_path.replace('%', '%%') + ".err" if (job_conf["execution_time_limit"] and - directives.get("--time") is None): - directives["--time"] = "%d:%02d" % ( + directives.get(cls.TIME_LIMIT_DIRECTIVE) is None): + directives[cls.TIME_LIMIT_DIRECTIVE] = "%d:%02d" % ( job_conf["execution_time_limit"] / 60, job_conf["execution_time_limit"] % 60) for key, value in list(job_conf['directives'].items()): diff --git a/cylc/flow/loggingutil.py b/cylc/flow/loggingutil.py index cb645a929ff..55701cf3fe4 100644 --- a/cylc/flow/loggingutil.py +++ b/cylc/flow/loggingutil.py @@ -211,7 +211,7 @@ def should_rollover(self, record: logging.LogRecord) -> bool: self.stream.seek(0, 2) except ValueError as exc: # intended to catch - ValueError: I/O operation on closed file - raise SystemExit(exc) + raise SystemExit(exc) from None return self.stream.tell() + len(msg.encode('utf8')) >= self.max_bytes @property diff --git a/cylc/flow/main_loop/__init__.py b/cylc/flow/main_loop/__init__.py index 2350153842c..e9f9f35f5da 100644 --- a/cylc/flow/main_loop/__init__.py +++ b/cylc/flow/main_loop/__init__.py @@ -329,14 +329,14 @@ def load(config, additional_plugins=None): f'No main-loop plugin: "{plugin_name}"\n' + ' Available plugins:\n' + indent('\n'.join(sorted(entry_points)), ' ') - ) + ) from None # load plugin try: module = entry_point.load() except Exception as exc: raise PluginError( 'cylc.main_loop', entry_point.name, exc - ) + ) from None # load coroutines log = [] for coro_name, coro in getmembers(module): diff --git a/cylc/flow/main_loop/health_check.py b/cylc/flow/main_loop/health_check.py index b488c878c0e..6e42724ba91 100644 --- a/cylc/flow/main_loop/health_check.py +++ b/cylc/flow/main_loop/health_check.py @@ -52,8 +52,8 @@ def _check_contact_file(scheduler): scheduler.workflow) if contact_data != scheduler.contact_data: raise CylcError('contact file modified') - except (AssertionError, IOError, ValueError, ServiceFileError): + except (AssertionError, IOError, ValueError, ServiceFileError) as exc: raise CylcError( '%s: contact file corrupted/modified and may be left' % workflow_files.get_contact_file_path(scheduler.workflow) - ) + ) from exc diff --git a/cylc/flow/network/__init__.py b/cylc/flow/network/__init__.py index 916b129e244..42b79475ca5 100644 --- a/cylc/flow/network/__init__.py +++ b/cylc/flow/network/__init__.py @@ -78,7 +78,7 @@ def get_location(workflow: str) -> Tuple[str, int, int]: contact = load_contact_file(workflow) except (IOError, ValueError, ServiceFileError): # Contact file does not exist or corrupted, workflow should be dead - raise WorkflowStopped(workflow) + raise WorkflowStopped(workflow) from None host = contact[ContactFileFields.HOST] host = get_fqdn_by_host(host) @@ -176,16 +176,14 @@ def _socket_bind(self, min_port, max_port, srv_prv_key_loc=None): srv_prv_key_info.full_key_path) except ValueError: raise ServiceFileError( - f"Failed to find server's public " - f"key in " + "Failed to find server's public key in " f"{srv_prv_key_info.full_key_path}." - ) - except OSError: + ) from None + except OSError as exc: raise ServiceFileError( - f"IO error opening server's private " - f"key from " + "IO error opening server's private key from " f"{srv_prv_key_info.full_key_path}." - ) + ) from exc if server_private_key is None: # this can't be caught by exception raise ServiceFileError( f"Failed to find server's private " @@ -204,7 +202,9 @@ def _socket_bind(self, min_port, max_port, srv_prv_key_loc=None): self.port = self.socket.bind_to_random_port( 'tcp://*', min_port, max_port) except (zmq.error.ZMQError, zmq.error.ZMQBindError) as exc: - raise CylcError(f'could not start Cylc ZMQ server: {exc}') + raise CylcError( + f'could not start Cylc ZMQ server: {exc}' + ) from None # Keeping srv_public_key_loc as optional arg so as to not break interface def _socket_connect(self, host, port, srv_public_key_loc=None): @@ -236,8 +236,10 @@ def _socket_connect(self, host, port, srv_public_key_loc=None): try: client_public_key, client_priv_key = zmq.auth.load_certificate( client_priv_key_info.full_key_path) - except (OSError, ValueError): - raise ClientError(error_msg) + except ValueError: + raise ClientError(error_msg) from None + except OSError as exc: + raise ClientError(error_msg) from exc if client_priv_key is None: # this can't be caught by exception raise ClientError(error_msg) self.socket.curve_publickey = client_public_key @@ -245,6 +247,9 @@ def _socket_connect(self, host, port, srv_public_key_loc=None): # A client can only connect to the server if it knows its public key, # so we grab this from the location it was created on the filesystem: + error_msg = ( + "Failed to load the workflow's public key, so cannot connect." + ) try: # 'load_certificate' will try to load both public & private keys # from a provided file but will return None, not throw an error, @@ -254,9 +259,10 @@ def _socket_connect(self, host, port, srv_public_key_loc=None): server_public_key = zmq.auth.load_certificate( srv_pub_key_info.full_key_path)[0] self.socket.curve_serverkey = server_public_key - except (OSError, ValueError): # ValueError raised w/ no public key - raise ClientError( - "Failed to load the workflow's public key, so cannot connect.") + except ValueError: # ValueError raised w/ no public key + raise ClientError(error_msg) from None + except OSError as exc: + raise ClientError(error_msg) from exc self.socket.connect(f'tcp://{host}:{port}') diff --git a/cylc/flow/network/client.py b/cylc/flow/network/client.py index e7e26954d56..099ef8bc0ff 100644 --- a/cylc/flow/network/client.py +++ b/cylc/flow/network/client.py @@ -326,7 +326,7 @@ async def async_request( raise ClientError( error.get('message'), # type: ignore error.get('traceback'), # type: ignore - ) + ) from None def get_header(self) -> dict: """Return "header" data to attach to each request for traceability. diff --git a/cylc/flow/network/multi.py b/cylc/flow/network/multi.py index 2b9ea418976..9c190f68799 100644 --- a/cylc/flow/network/multi.py +++ b/cylc/flow/network/multi.py @@ -235,7 +235,7 @@ def _report( """ try: ret: List[Tuple[Optional[str], Optional[str], bool]] = [] - for _mutation_name, mutation_response in response.items(): + for mutation_response in response.values(): # extract the result of each mutation result in the response success, msg = mutation_response['result'][0]['response'] out = None diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 11eafd8bea5..fc9b67eeef5 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -750,7 +750,7 @@ async def _mutation_mapper( try: meth = COMMANDS[command] except KeyError: - raise ValueError(f"Command '{command}' not found") + raise ValueError(f"Command '{command}' not found") from None try: # Initiate the command. Validation may be performed at this point, diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index 70e40232c1d..355f12c4981 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -271,7 +271,7 @@ def field_name_from_type( try: return NODE_MAP[named_type.name] except KeyError: - raise ValueError(f"'{named_type.name}' is not a node type") + raise ValueError(f"'{named_type.name}' is not a node type") from None def get_resolvers(info: 'ResolveInfo') -> 'BaseResolvers': @@ -577,9 +577,10 @@ def resolve_mapping_to_list(root, info, **args): # Types: class NodeMeta(ObjectType): class Meta: - description = """ -Meta data fields, -including custom fields in a generic user-defined dump""" + description = sstrip(""" + Meta data fields, including custom fields in a generic user-defined + dump. + """) title = String(default_value=None) description = String(default_value=None) URL = String(default_value=None) @@ -588,7 +589,7 @@ class Meta: class TimeZone(ObjectType): class Meta: - description = """Time zone info.""" + description = 'Time zone info.' hours = Int() minutes = Int() string_basic = String() @@ -597,7 +598,7 @@ class Meta: class Workflow(ObjectType): class Meta: - description = """Global workflow info.""" + description = 'Global workflow info.' id = ID() # noqa: A003 (required for definition) name = String( description='The workflow ID with the ~user/ prefix removed.', @@ -730,7 +731,7 @@ class Meta: latest_state_tasks = GenericScalar( states=graphene.List( String, - description="List of task states to show", + description="List of task states to show.", default_value=TASK_STATUSES_ORDERED), resolver=resolve_state_tasks, description='The latest tasks to have entered each task state.', @@ -769,8 +770,9 @@ class Meta: ids=graphene.List( ID, description=sstrip(''' - Node IDs, cycle point and/or-just family/task namespace: - ["1234/foo", "1234/FAM", "*/FAM"] + Node IDs, cycle point and/or-just family/task namespace. + + E.g: `["1234/foo", "1234/FAM", "*/FAM"]` '''), default_value=[] ), @@ -858,7 +860,7 @@ class Meta: ) task_proxy = Field( lambda: TaskProxy, - description="The TaskProxy of the task which submitted this job", + description="The TaskProxy of the task which submitted this job.", strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, delta_type=DELTA_TYPE_DEFAULT, @@ -919,7 +921,7 @@ class Meta: ) runtime = Field( Runtime, - description="The task's `[runtime`] section.", + description="The task's `[runtime]` section.", ) mean_elapsed_time = Float( description="The task's average runtime." @@ -955,7 +957,7 @@ class Meta: class PollTask(ObjectType): class Meta: - description = """Polling task edge""" + description = 'Polling task edge.' local_proxy = ID() workflow = String() remote_proxy = ID() @@ -965,11 +967,11 @@ class Meta: class Condition(ObjectType): class Meta: - description = """Prerequisite conditions.""" + description = 'Prerequisite conditions.' task_id = String() task_proxy = Field( lambda: TaskProxy, - description="""Associated Task Proxy""", + description='Associated Task Proxy.', strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, delta_type=DELTA_TYPE_DEFAULT, @@ -985,18 +987,19 @@ def resolve_task_id(root, info, **args): class Prerequisite(ObjectType): class Meta: - description = """Task prerequisite.""" + description = 'Task prerequisite.' expression = String() conditions = graphene.List( Condition, - description="""Condition monomers of a task prerequisites.""") + description='Condition monomers of a task prerequisites.' + ) cycle_points = graphene.List(String) satisfied = Boolean() class Output(ObjectType): class Meta: - description = """Task output""" + description = 'Task output.' label = String() message = String() satisfied = Boolean() @@ -1004,16 +1007,16 @@ class Meta: class OutputLabel(String): - """Task output, e.g. "succeeded".""" + """Task output, e.g. `succeeded`.""" class PrerequisiteString(String): - """A task prerequisite, e.g. "2040/foo:succeeded".""" + """A task prerequisite, e.g. `2040/foo:succeeded`.""" class XTrigger(ObjectType): class Meta: - description = """Task trigger""" + description = 'Task trigger.' id = String() # noqa: A003 (required for definition) label = String() message = String() @@ -1023,7 +1026,7 @@ class Meta: class TaskProxy(ObjectType): class Meta: - description = """Task cycle instance.""" + description = 'Task cycle instance.' id = ID() # noqa: A003 (required for schema definition) task = Field( Task, @@ -1073,7 +1076,7 @@ class Meta: '''), ) depth = Int( - description='The family inheritance depth', + description='The family inheritance depth.', ) graph_depth = Int( description=sstrip(''' @@ -1158,7 +1161,7 @@ class Meta: class Family(ObjectType): class Meta: - description = """Task definition, static fields""" + description = 'Task definition, static fields.' id = ID() # noqa: A003 (required for schema definition) name = String() meta = Field(NodeMeta) @@ -1166,7 +1169,7 @@ class Meta: depth = Int() proxies = graphene.List( lambda: FamilyProxy, - description="""Associated cycle point proxies""", + description='Associated cycle point proxies.', args=PROXY_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1174,7 +1177,7 @@ class Meta: resolver=get_nodes_by_ids) parents = graphene.List( lambda: Family, - description="""Family definition parent.""", + description='Family definition parent.', args=DEF_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1182,7 +1185,7 @@ class Meta: resolver=get_nodes_by_ids) child_tasks = graphene.List( Task, - description="""Descendant definition tasks.""", + description='Descendant definition tasks.', args=DEF_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1190,7 +1193,7 @@ class Meta: resolver=get_nodes_by_ids) child_families = graphene.List( lambda: Family, - description="""Descendant desc families.""", + description='Descendant desc families.', args=DEF_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1198,7 +1201,7 @@ class Meta: resolver=get_nodes_by_ids) first_parent = Field( lambda: Family, - description="""Family first parent.""", + description='Family first parent.', strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, delta_type=DELTA_TYPE_DEFAULT, @@ -1207,14 +1210,14 @@ class Meta: class FamilyProxy(ObjectType): class Meta: - description = """Family composite.""" + description = 'Family composite.' id = ID() # noqa: A003 (required for schema definition) cycle_point = String() # name & namespace for filtering/sorting name = String() family = Field( Family, - description="""Family definition""", + description='Family definition.', strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, delta_type=DELTA_TYPE_DEFAULT, @@ -1238,7 +1241,7 @@ class Meta: ) child_tasks = graphene.List( TaskProxy, - description="""Descendant task proxies.""", + description='Descendant task proxies.', args=PROXY_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1246,7 +1249,7 @@ class Meta: resolver=get_nodes_by_ids) child_families = graphene.List( lambda: FamilyProxy, - description="""Descendant family proxies.""", + description='Descendant family proxies.', args=PROXY_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1254,7 +1257,7 @@ class Meta: resolver=get_nodes_by_ids) first_parent = Field( lambda: FamilyProxy, - description="""Task first parent.""", + description='Task first parent.', args=PROXY_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1262,7 +1265,7 @@ class Meta: resolver=get_node_by_id) ancestors = graphene.List( lambda: FamilyProxy, - description="""First parent ancestors.""", + description='First parent ancestors.', args=PROXY_ARGS, strip_null=STRIP_NULL_DEFAULT, delta_store=DELTA_STORE_DEFAULT, @@ -1283,7 +1286,7 @@ def resolve_type(cls, instance, info): class Edge(ObjectType): class Meta: - description = """Dependency edge task/family proxies""" + description = 'Dependency edge task/family proxies.' id = ID() # noqa: A003 (required for schema definition) source = ID() source_node = Field( @@ -1305,7 +1308,7 @@ class Meta: class Edges(ObjectType): class Meta: - description = """Dependency edge""" + description = 'Dependency edge.' edges = graphene.List( Edge, args=EDGE_ARGS, @@ -1320,19 +1323,19 @@ class Meta: class NodesEdges(ObjectType): class Meta: - description = """Related Nodes & Edges.""" + description = 'Related Nodes & Edges.' nodes = graphene.List( TaskProxy, - description="""Task nodes from and including root.""") + description='Task nodes from and including root.') edges = graphene.List( Edge, - description="""Edges associated with the nodes.""") + description='Edges associated with the nodes.') # Query declaration class Queries(ObjectType): class Meta: - description = """Multi-Workflow root level queries.""" + description = 'Multi-Workflow root level queries.' workflows = graphene.List( Workflow, description=Workflow._meta.description, @@ -1420,7 +1423,7 @@ class Meta: # Generic containers class GenericResponse(ObjectType): class Meta: - description = """Container for command queued response""" + description = 'Container for command queued response.' result = GenericScalar() @@ -1455,6 +1458,7 @@ async def mutator( method). If None, uses mutation class name converted to snake_case. workflows: List of workflow IDs. exworkflows: List of workflow IDs. + """ if command is None: command = to_snake_case(info.field_name) @@ -1567,13 +1571,13 @@ class TaskState(InputObjectType): status = TaskStatus() is_held = Boolean(description=sstrip(''' - If a task is held no new job submissions will be made + If a task is held no new job submissions will be made. ''')) is_queued = Boolean(description=sstrip(''' - Task is queued for job submission + Task is queued for job submission. ''')) is_runahead = Boolean(description=sstrip(''' - Task is runahead limited + Task is runahead limited. ''')) @@ -1792,7 +1796,7 @@ class Arguments: event_time = String(default_value=None) messages = graphene.List( graphene.List(String), - description="""List in the form `[[severity, message], ...]`.""", + description='List in the form `[[severity, message], ...]`.', default_value=None ) @@ -1820,6 +1824,7 @@ class Resume(Mutation): class Meta: description = sstrip(''' Resume a paused workflow. + See also the opposite command `pause`. Valid for: paused workflows. @@ -1952,8 +1957,8 @@ class Meta: trigger event. When an incoming message satisfies a task's external trigger the message ID is broadcast to all downstream tasks in the cycle point as - ``$CYLC_EXT_TRIGGER_ID``. Tasks can use - ``$CYLC_EXT_TRIGGER_ID``, for example, to + `$CYLC_EXT_TRIGGER_ID`. Tasks can use + `$CYLC_EXT_TRIGGER_ID`, for example, to identify a new data file that the external triggering system is responding to. @@ -2113,9 +2118,9 @@ class Meta: Setting outputs contributes to the task's completion, sets the corresponding prerequisites of child tasks, and sets any implied outputs: - - ``started`` implies ``submitted``. - - ``succeeded`` and ``failed`` imply ``started``. - - custom outputs and ``expired`` do not imply any other outputs. + - `started` implies `submitted`. + - `succeeded` and `failed` imply `started`. + - custom outputs and `expired` do not imply any other outputs. Valid for: paused, running, stopping workflows. """) @@ -2224,7 +2229,10 @@ def delta_subs(root, info: 'ResolveInfo', **args) -> AsyncGenerator[Any, None]: class Pruned(ObjectType): class Meta: - description = """WFS Nodes/Edges that have been removed.""" + description = sstrip(''' + Objects (e.g. workflows, tasks, jobs) which have been removed from + the store. + ''') workflow = String() families = graphene.List(String, default_value=[]) family_proxies = graphene.List(String, default_value=[]) @@ -2246,7 +2254,7 @@ class Delta(Interface): families = graphene.List( Family, - description="""Family definitions.""", + description='Family definitions.', args=DEF_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2255,7 +2263,7 @@ class Delta(Interface): ) family_proxies = graphene.List( FamilyProxy, - description="""Family cycle instances.""", + description='Family cycle instances.', args=PROXY_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2264,7 +2272,7 @@ class Delta(Interface): ) jobs = graphene.List( Job, - description="""Jobs.""", + description='Jobs.', args=JOB_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2273,7 +2281,7 @@ class Delta(Interface): ) tasks = graphene.List( Task, - description="""Task definitions.""", + description='Task definitions.', args=DEF_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2282,7 +2290,7 @@ class Delta(Interface): ) task_proxies = graphene.List( TaskProxy, - description="""Task cycle instances.""", + description='Task cycle instances.', args=PROXY_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2291,7 +2299,7 @@ class Delta(Interface): ) edges = graphene.List( Edge, - description="""Graph edges""", + description='Graph edges', args=EDGE_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2310,18 +2318,18 @@ class Delta(Interface): class Added(ObjectType): class Meta: - description = """Added node/edge deltas.""" + description = 'Added node/edge deltas.' interfaces = (Delta,) class Updated(ObjectType): class Meta: - description = """Updated node/edge deltas.""" + description = 'Updated node/edge deltas.' interfaces = (Delta,) families = graphene.List( Family, - description="""Family definitions.""", + description='Family definitions.', args=DEF_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2330,7 +2338,7 @@ class Meta: ) family_proxies = graphene.List( FamilyProxy, - description="""Family cycle instances.""", + description='Family cycle instances.', args=PROXY_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2339,7 +2347,7 @@ class Meta: ) jobs = graphene.List( Job, - description="""Jobs.""", + description='Jobs.', args=JOB_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2348,7 +2356,7 @@ class Meta: ) tasks = graphene.List( Task, - description="""Task definitions.""", + description='Task definitions.', args=DEF_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2357,7 +2365,7 @@ class Meta: ) task_proxies = graphene.List( TaskProxy, - description="""Task cycle instances.""", + description='Task cycle instances.', args=PROXY_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2366,7 +2374,7 @@ class Meta: ) edges = graphene.List( Edge, - description="""Graph edges""", + description='Graph edges.', args=EDGE_ARGS, strip_null=Boolean(), delta_store=Boolean(default_value=True), @@ -2385,7 +2393,7 @@ class Meta: class Deltas(ObjectType): class Meta: - description = """Grouped deltas of the WFS publish""" + description = 'Grouped deltas of the WFS publish.' id = ID() # noqa: A003 (required for schema definition) shutdown = Boolean(default_value=False) added = Field( @@ -2408,13 +2416,13 @@ class Meta: class Subscriptions(ObjectType): """Defines the subscriptions available in the schema.""" class Meta: - description = """Multi-Workflow root level subscriptions.""" + description = 'Multi-Workflow root level subscriptions.' deltas = Field( Deltas, description=Deltas._meta.description, workflows=graphene.List( - ID, description="List of full ID, i.e. `~user/workflow_id`" + ID, description="List of full ID, i.e. `~user/workflow_id`." ), strip_null=Boolean(default_value=False), initial_burst=Boolean(default_value=True), diff --git a/cylc/flow/network/ssh_client.py b/cylc/flow/network/ssh_client.py index d1dd4fb6da4..d2ed0dd33e9 100644 --- a/cylc/flow/network/ssh_client.py +++ b/cylc/flow/network/ssh_client.py @@ -90,7 +90,7 @@ async def async_request( f"Command exceeded the timeout {timeout}s. " "This could be due to network problems. " "Check the workflow log." - ) + ) from None def prepare_command( self, command: str, args: Optional[dict], timeout: Union[float, str] diff --git a/cylc/flow/param_expand.py b/cylc/flow/param_expand.py index 0707a46e1a3..22cf1aa8665 100644 --- a/cylc/flow/param_expand.py +++ b/cylc/flow/param_expand.py @@ -195,8 +195,9 @@ def _expand_name(self, results, tmpl, params, spec_vals=None): try: results.append((tmpl % current_values, current_values)) except KeyError as exc: - raise ParamExpandError('parameter %s is not ' - 'defined.' % str(exc.args[0])) + raise ParamExpandError( + 'parameter %s is not defined.' % str(exc.args[0]) + ) from None else: for param_val in params[0][1]: spec_vals[params[0][0]] = param_val @@ -306,8 +307,8 @@ def expand_parent_params(self, parent, param_values, origin): used[item] = param_values[item] except KeyError: raise ParamExpandError( - "parameter '%s' undefined in '%s'" % ( - item, origin)) + "parameter '%s' undefined in '%s'" % (item, origin) + ) from None # For each parameter substitute the param_tmpl_cfg. tmpl = tmpl.format(**self.param_tmpl_cfg) @@ -425,8 +426,9 @@ def _expand_graph(self, line, all_params, try: repl = tmpl % param_values except KeyError as exc: - raise ParamExpandError('parameter %s is not ' - 'defined.' % str(exc.args[0])) + raise ParamExpandError( + 'parameter %s is not defined.' % str(exc.args[0]) + ) from None line = line.replace('<' + p_group + '>', repl) if line: line_set.add(line) diff --git a/cylc/flow/parsec/config.py b/cylc/flow/parsec/config.py index 19f937d8e5b..29944c03b30 100644 --- a/cylc/flow/parsec/config.py +++ b/cylc/flow/parsec/config.py @@ -150,10 +150,12 @@ def get(self, keys: Optional[Iterable[str]] = None, sparse: bool = False): # setting not present in __MANY__ section: key in self.spec.get(*parents) ): - raise ItemNotFoundError(itemstr(parents, key)) + raise ItemNotFoundError( + itemstr(parents, key) + ) from None raise InvalidConfigError( itemstr(parents, key), self.spec.name - ) + ) from None else: parents.append(key) diff --git a/cylc/flow/parsec/empysupport.py b/cylc/flow/parsec/empysupport.py index b4164894e0f..e3dc5e28df4 100644 --- a/cylc/flow/parsec/empysupport.py +++ b/cylc/flow/parsec/empysupport.py @@ -66,7 +66,7 @@ def empyprocess( raise EmPyError( str(exc), lines={'