Skip to content

Commit

Permalink
Merge pull request #6263 from hjoliver/reload-xtriggers
Browse files Browse the repository at this point in the history
Reconstruct xtriggers at reload time.
  • Loading branch information
hjoliver authored Oct 23, 2024
2 parents df89a77 + 72b663a commit 28ea786
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 12 deletions.
1 change: 1 addition & 0 deletions changes.d/6263.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug that prevented changes to user-defined xtriggers taking effect after a reload.
2 changes: 1 addition & 1 deletion cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ async def reload_workflow(schd: 'Scheduler'):
# Reset the remote init map to trigger fresh file installation
schd.task_job_mgr.task_remote_mgr.remote_init_map.clear()
schd.task_job_mgr.task_remote_mgr.is_reload = True
schd.pool.reload_taskdefs(config)
schd.pool.reload(config)
# Load jobs from DB
schd.workflow_db_mgr.pri_dao.select_jobs_for_restart(
schd.data_store_mgr.insert_db_job
Expand Down
13 changes: 10 additions & 3 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,18 +995,25 @@ def set_max_future_offset(self):
if max_offset != orig and self.compute_runahead(force=True):
self.release_runahead_tasks()

def reload_taskdefs(self, config: 'WorkflowConfig') -> None:
def reload(self, config: 'WorkflowConfig') -> None:
self.config = config # store the updated config
self.xtrigger_mgr.add_xtriggers(
self.config.xtrigger_collator, reload=True)
self._reload_taskdefs()

def _reload_taskdefs(self) -> None:
"""Reload the definitions of task proxies in the pool.
Orphaned tasks (whose definitions were removed from the workflow):
- remove if not active yet
- if active, leave them but prevent them from spawning children on
subsequent outputs
Otherwise: replace task definitions but copy over existing outputs etc.
self.config should already be updated for the reload.
"""
self.config = config
self.stop_point = config.stop_point or config.final_point
self.stop_point = self.config.stop_point or self.config.final_point

# find any old tasks that have been removed from the workflow
old_task_name_list = self.task_name_list
Expand Down
9 changes: 6 additions & 3 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,14 @@ def copy_to_reload_successor(self, reload_successor, check_output):
)

reload_successor.state.xtriggers.update({
# copy across any special "_cylc" xtriggers which were added
# dynamically at runtime (i.e. execution retry xtriggers)
# Copy across any auto-defined "_cylc" xtriggers runtime (retries),
# but avoid "_cylc_wallclock" xtriggers which are user-defined.
key: value
for key, value in self.state.xtriggers.items()
if key.startswith('_cylc')
if (
key.startswith('_cylc') and not
key.startswith('_cylc_wallclock')
)
})
reload_successor.jobs = self.jobs

Expand Down
33 changes: 29 additions & 4 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,29 @@ def update(self, xtriggers: 'XtriggerCollator'):
self.sequential_xtrigger_labels.update(
xtriggers.sequential_xtrigger_labels)

def purge_user_xtriggers(self):
"""Purge user-defined triggers before a reload.
User-defined triggers need to be recreated from the config file.
Auto-defined triggers (retries) need to be kept.
"""
nuke = []
for label in self.functx_map:
if (
label.startswith("_cylc_wallclock")
or not label.startswith("_cylc")
):
# _cylc_wallclock xtriggers are user-defined
# otherwise all _cylc xtriggers are automatic.
nuke.append(label)
for label in nuke:
del self.functx_map[label]
with suppress(KeyError):
self.wall_clock_labels.remove(label)
with suppress(KeyError):
self.sequential_xtrigger_labels.remove(label)

def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
"""Add a new xtrigger function.
Expand All @@ -201,8 +224,8 @@ def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
return

if (
not label.startswith('_cylc_retry_') and not
label.startswith('_cylc_submit_retry_')
not label.startswith('_cylc_retry_') and
not label.startswith('_cylc_submit_retry_')
):
# (the "_wall_clock" function fails "wall_clock" validation)
self._validate(label, fctx, fdir)
Expand Down Expand Up @@ -526,8 +549,10 @@ def __init__(
self.do_housekeeping = False
self.xtriggers = XtriggerCollator()

def add_xtriggers(self, xtriggers: 'XtriggerCollator'):
"""Add pre-collated and validated xtriggers."""
def add_xtriggers(self, xtriggers: 'XtriggerCollator', reload=False):
"""Add validated xtriggers, parsed from the workflow config."""
if reload:
self.xtriggers.purge_user_xtriggers()
self.xtriggers.update(xtriggers)
self.xtriggers.sequential_xtriggers_default = (
xtriggers.sequential_xtriggers_default
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_sequential_xtriggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async def test_reload(sequential, start):
assert pre_reload.is_xtrigger_sequential is True

# reload the workflow
sequential.pool.reload_taskdefs(sequential.config)
sequential.pool.reload(sequential.config)

# the original task proxy should have been replaced
post_reload = sequential.pool.get_task(ISO8601Point('2000'), 'foo')
Expand Down
79 changes: 79 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2100,7 +2100,86 @@ async def test_trigger_queue(one, run, db_select, complete):
await complete(one, timeout=2)
assert db_select(one, False, 'task_outputs', 'flow_nums') == [('[1, 2]',), ('[1]',)]

async def test_reload_xtriggers(flow, scheduler, start):
"""It should rebuild xtriggers when the workflow is reloaded.
See https://github.com/cylc/cylc-flow/pull/6263
"""
config = {
'scheduling': {
'initial cycle point': '2000',
'graph': {
'R1': '''
@a => foo
@b => foo
'''
},
'xtriggers': {
'a': 'wall_clock(offset="P0D")',
'b': 'wall_clock(offset="P5D")',
},
}
}
id_ = flow(config)
schd: Scheduler = scheduler(id_)

def list_xtrig_mgr():
"""List xtrigs from the xtrigger_mgr."""
nonlocal schd
return {
key: repr(value)
for key, value in schd.xtrigger_mgr.xtriggers.functx_map.items()
}

async def list_data_store():
"""List xtrigs from the data_store_mgr."""
nonlocal schd
await schd.update_data_structure()
return {
value.label: key
for key, value in schd.data_store_mgr.data[schd.tokens.id][
TASK_PROXIES
][
schd.tokens.duplicate(cycle='20000101T0000Z', task='foo').id
].xtriggers.items()
}

async with start(schd):
# check xtrigs on startup
assert list_xtrig_mgr() == {
'a': '<SubFuncContext wall_clock(offset=P0D):10.0>',
'b': '<SubFuncContext wall_clock(offset=P5D):10.0>',
}
assert await list_data_store() == {
'a': 'wall_clock(trigger_time=946684800)',
'b': 'wall_clock(trigger_time=947116800)',
}

# remove @a
config['scheduling']['xtriggers'].pop('a')
# modify @b
config['scheduling']['xtriggers']['b'] = 'wall_clock(offset="PT12H")'
# add @c
config['scheduling']['xtriggers']['c'] = 'wall_clock(offset="PT1H")'
config['scheduling']['graph']['R1'] = config['scheduling']['graph'][
'R1'
].replace('@a', '@c')

# reload
flow(config, id_=id_)
await commands.run_cmd(commands.reload_workflow, schd)

# check xtrigs post-reload
assert list_xtrig_mgr() == {
'b': '<SubFuncContext wall_clock(offset=PT12H):10.0>',
'c': '<SubFuncContext wall_clock(offset=PT1H):10.0>',
}
assert await list_data_store() == {
'b': 'wall_clock(trigger_time=946728000)',
'c': 'wall_clock(trigger_time=946688400)',
}


async def test_trigger_unqueued(flow, scheduler, start):
"""Test triggering an unqueued active task.
Expand Down

0 comments on commit 28ea786

Please sign in to comment.