From e93acba2034cd5be0025795d38368c87f87ad66f Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Tue, 22 Oct 2024 13:02:33 +1300 Subject: [PATCH 1/3] Reconstruct user-defined xtriggers at reload. --- cylc/flow/commands.py | 2 +- cylc/flow/task_pool.py | 13 ++- cylc/flow/task_proxy.py | 9 ++- cylc/flow/xtrigger_mgr.py | 32 +++++++- .../integration/test_sequential_xtriggers.py | 2 +- tests/integration/test_task_pool.py | 80 +++++++++++++++++++ 6 files changed, 126 insertions(+), 12 deletions(-) diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py index fef523b0fa6..73acda5a550 100644 --- a/cylc/flow/commands.py +++ b/cylc/flow/commands.py @@ -413,7 +413,7 @@ async def reload_workflow(schd: 'Scheduler'): # Reset the remote init map to trigger fresh file installation schd.task_job_mgr.task_remote_mgr.remote_init_map.clear() schd.task_job_mgr.task_remote_mgr.is_reload = True - schd.pool.reload_taskdefs(config) + schd.pool.reload(config) # Load jobs from DB schd.workflow_db_mgr.pri_dao.select_jobs_for_restart( schd.data_store_mgr.insert_db_job diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index e149630d322..79b8eba6d68 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -995,18 +995,25 @@ def set_max_future_offset(self): if max_offset != orig and self.compute_runahead(force=True): self.release_runahead_tasks() - def reload_taskdefs(self, config: 'WorkflowConfig') -> None: + def reload(self, config: 'WorkflowConfig') -> None: + self.config = config # store the updated config + self.xtrigger_mgr.add_xtriggers( + self.config.xtrigger_collator, reload=True) + self._reload_taskdefs() + + def _reload_taskdefs(self) -> None: """Reload the definitions of task proxies in the pool. Orphaned tasks (whose definitions were removed from the workflow): - remove if not active yet - if active, leave them but prevent them from spawning children on subsequent outputs + Otherwise: replace task definitions but copy over existing outputs etc. + self.config should already be updated for the reload. """ - self.config = config - self.stop_point = config.stop_point or config.final_point + self.stop_point = self.config.stop_point or self.config.final_point # find any old tasks that have been removed from the workflow old_task_name_list = self.task_name_list diff --git a/cylc/flow/task_proxy.py b/cylc/flow/task_proxy.py index 0ba67438fcd..5f834e1518d 100644 --- a/cylc/flow/task_proxy.py +++ b/cylc/flow/task_proxy.py @@ -368,11 +368,14 @@ def copy_to_reload_successor(self, reload_successor, check_output): ) reload_successor.state.xtriggers.update({ - # copy across any special "_cylc" xtriggers which were added - # dynamically at runtime (i.e. execution retry xtriggers) + # Copy across any auto-defined "_cylc" xtriggers runtime (retries), + # but avoid "_cylc_wallclock" xtriggers which are user-defined. key: value for key, value in self.state.xtriggers.items() - if key.startswith('_cylc') + if ( + key.startswith('_cylc') and not + key.startswith('_cylc_wallclock') + ) }) reload_successor.jobs = self.jobs diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 5ba90738a01..256f14e7d53 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -187,6 +187,28 @@ def update(self, xtriggers: 'XtriggerCollator'): self.sequential_xtrigger_labels.update( xtriggers.sequential_xtrigger_labels) + def purge_user_xtriggers(self): + """Purge user-defined triggers before a reload. + + User-defined triggers need to be recreated from the config file. + Auto-defined triggers (retries) need to be kept. + + """ + nuke = [] + for label in self.functx_map: + if ( + label.startswith("_cylc_wallclock") + or not label.startswith("_cylc") + ): + # _cylc_wallclock xtriggers are user-defined + # otherwise all _cylc xtriggers are automatic. + nuke.append(label) + for label in nuke: + del self.functx_map[label] + with suppress(KeyError): + self.wall_clock_labels.remove(label) + self.sequential_xtrigger_labels.remove(label) + def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None: """Add a new xtrigger function. @@ -201,8 +223,8 @@ def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None: return if ( - not label.startswith('_cylc_retry_') and not - label.startswith('_cylc_submit_retry_') + not label.startswith('_cylc_retry_') and + not label.startswith('_cylc_submit_retry_') ): # (the "_wall_clock" function fails "wall_clock" validation) self._validate(label, fctx, fdir) @@ -526,8 +548,10 @@ def __init__( self.do_housekeeping = False self.xtriggers = XtriggerCollator() - def add_xtriggers(self, xtriggers: 'XtriggerCollator'): - """Add pre-collated and validated xtriggers.""" + def add_xtriggers(self, xtriggers: 'XtriggerCollator', reload=False): + """Add validated xtriggers, parsed from the workflow config.""" + if reload: + self.xtriggers.purge_user_xtriggers() self.xtriggers.update(xtriggers) self.xtriggers.sequential_xtriggers_default = ( xtriggers.sequential_xtriggers_default diff --git a/tests/integration/test_sequential_xtriggers.py b/tests/integration/test_sequential_xtriggers.py index d8bfb99aa92..1c98437bc7c 100644 --- a/tests/integration/test_sequential_xtriggers.py +++ b/tests/integration/test_sequential_xtriggers.py @@ -116,7 +116,7 @@ async def test_reload(sequential, start): assert pre_reload.is_xtrigger_sequential is True # reload the workflow - sequential.pool.reload_taskdefs(sequential.config) + sequential.pool.reload(sequential.config) # the original task proxy should have been replaced post_reload = sequential.pool.get_task(ISO8601Point('2000'), 'foo') diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index c8ac305c09a..580e0fedf2e 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -2100,7 +2100,86 @@ async def test_trigger_queue(one, run, db_select, complete): await complete(one, timeout=2) assert db_select(one, False, 'task_outputs', 'flow_nums') == [('[1, 2]',), ('[1]',)] +async def test_reload_xtriggers(flow, scheduler, start): + """It should rebuild xtriggers when the workflow is reloaded. + See https://github.com/cylc/cylc-flow/pull/6263 + """ + config = { + 'scheduling': { + 'initial cycle point': '2000', + 'graph': { + 'R1': ''' + @a => foo + @b => foo + ''' + }, + 'xtriggers': { + 'a': 'wall_clock(offset="P0D")', + 'b': 'wall_clock(offset="P5D")', + }, + } + } + id_ = flow(config) + schd: Scheduler = scheduler(id_) + + def list_xtrig_mgr(): + """List xtrigs from the xtrigger_mgr.""" + nonlocal schd + return { + key: repr(value) + for key, value in schd.xtrigger_mgr.xtriggers.functx_map.items() + } + + async def list_data_store(): + """List xtrigs from the data_store_mgr.""" + nonlocal schd + await schd.update_data_structure() + return { + value.label: key + for key, value in schd.data_store_mgr.data[schd.tokens.id][ + TASK_PROXIES + ][ + schd.tokens.duplicate(cycle='20000101T0000Z', task='foo').id + ].xtriggers.items() + } + + async with start(schd): + # check xtrigs on startup + assert list_xtrig_mgr() == { + 'a': '', + 'b': '', + } + assert await list_data_store() == { + 'a': 'wall_clock(trigger_time=946684800)', + 'b': 'wall_clock(trigger_time=947116800)', + } + + # remove @a + config['scheduling']['xtriggers'].pop('a') + # modify @b + config['scheduling']['xtriggers']['b'] = 'wall_clock(offset="PT12H")' + # add @c + config['scheduling']['xtriggers']['c'] = 'wall_clock(offset="PT1H")' + config['scheduling']['graph']['R1'] = config['scheduling']['graph'][ + 'R1' + ].replace('@a', '@c') + + # reload + flow(config, id_=id_) + await commands.run_cmd(commands.reload_workflow, schd) + + # check xtrigs post-reload + assert list_xtrig_mgr() == { + 'b': '', + 'c': '', + } + assert await list_data_store() == { + 'b': 'wall_clock(trigger_time=946728000)', + 'c': 'wall_clock(trigger_time=946688400)', + } + + async def test_trigger_unqueued(flow, scheduler, start): """Test triggering an unqueued active task. @@ -2243,3 +2322,4 @@ async def test_downstream_complete_before_upstream( # 1/a should be removed from the pool (completed) # 1/b should not be re-spawned by the success of 1/a assert schd.pool.get_tasks() == [] + From 3b13d717cb59dab68fe9665d17750ec6fedbea10 Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Tue, 22 Oct 2024 13:09:56 +1300 Subject: [PATCH 2/3] Update change log. --- changes.d/6263.fix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes.d/6263.fix.md diff --git a/changes.d/6263.fix.md b/changes.d/6263.fix.md new file mode 100644 index 00000000000..499d1863113 --- /dev/null +++ b/changes.d/6263.fix.md @@ -0,0 +1 @@ +Fix bug preventing changes to user-defined xtriggers taking effect after a reload. From 72b663a13f6b9a35765670f1762a78cf69ca91bd Mon Sep 17 00:00:00 2001 From: Hilary James Oliver Date: Wed, 23 Oct 2024 18:35:54 +1300 Subject: [PATCH 3/3] Apply suggestions from code review [skip ci] Co-authored-by: Oliver Sanders --- changes.d/6263.fix.md | 2 +- cylc/flow/xtrigger_mgr.py | 1 + tests/integration/test_task_pool.py | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/changes.d/6263.fix.md b/changes.d/6263.fix.md index 499d1863113..86a6c01efe5 100644 --- a/changes.d/6263.fix.md +++ b/changes.d/6263.fix.md @@ -1 +1 @@ -Fix bug preventing changes to user-defined xtriggers taking effect after a reload. +Fix bug that prevented changes to user-defined xtriggers taking effect after a reload. diff --git a/cylc/flow/xtrigger_mgr.py b/cylc/flow/xtrigger_mgr.py index 256f14e7d53..7825beb0e73 100644 --- a/cylc/flow/xtrigger_mgr.py +++ b/cylc/flow/xtrigger_mgr.py @@ -207,6 +207,7 @@ def purge_user_xtriggers(self): del self.functx_map[label] with suppress(KeyError): self.wall_clock_labels.remove(label) + with suppress(KeyError): self.sequential_xtrigger_labels.remove(label) def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None: diff --git a/tests/integration/test_task_pool.py b/tests/integration/test_task_pool.py index 580e0fedf2e..b0ef1211b6f 100644 --- a/tests/integration/test_task_pool.py +++ b/tests/integration/test_task_pool.py @@ -2322,4 +2322,3 @@ async def test_downstream_complete_before_upstream( # 1/a should be removed from the pool (completed) # 1/b should not be re-spawned by the success of 1/a assert schd.pool.get_tasks() == [] -