Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconstruct xtriggers at reload time. #6263

Merged
merged 3 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6263.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug that prevented changes to user-defined xtriggers taking effect after a reload.
2 changes: 1 addition & 1 deletion cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ async def reload_workflow(schd: 'Scheduler'):
# Reset the remote init map to trigger fresh file installation
schd.task_job_mgr.task_remote_mgr.remote_init_map.clear()
schd.task_job_mgr.task_remote_mgr.is_reload = True
schd.pool.reload_taskdefs(config)
schd.pool.reload(config)
# Load jobs from DB
schd.workflow_db_mgr.pri_dao.select_jobs_for_restart(
schd.data_store_mgr.insert_db_job
Expand Down
13 changes: 10 additions & 3 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,18 +995,25 @@ def set_max_future_offset(self):
if max_offset != orig and self.compute_runahead(force=True):
self.release_runahead_tasks()

def reload_taskdefs(self, config: 'WorkflowConfig') -> None:
def reload(self, config: 'WorkflowConfig') -> None:
self.config = config # store the updated config
self.xtrigger_mgr.add_xtriggers(
self.config.xtrigger_collator, reload=True)
self._reload_taskdefs()

def _reload_taskdefs(self) -> None:
"""Reload the definitions of task proxies in the pool.

Orphaned tasks (whose definitions were removed from the workflow):
- remove if not active yet
- if active, leave them but prevent them from spawning children on
subsequent outputs

Otherwise: replace task definitions but copy over existing outputs etc.

self.config should already be updated for the reload.
"""
self.config = config
self.stop_point = config.stop_point or config.final_point
self.stop_point = self.config.stop_point or self.config.final_point

# find any old tasks that have been removed from the workflow
old_task_name_list = self.task_name_list
Expand Down
9 changes: 6 additions & 3 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,11 +368,14 @@ def copy_to_reload_successor(self, reload_successor, check_output):
)

reload_successor.state.xtriggers.update({
# copy across any special "_cylc" xtriggers which were added
# dynamically at runtime (i.e. execution retry xtriggers)
# Copy across any auto-defined "_cylc" xtriggers runtime (retries),
# but avoid "_cylc_wallclock" xtriggers which are user-defined.
key: value
for key, value in self.state.xtriggers.items()
if key.startswith('_cylc')
if (
key.startswith('_cylc') and not
key.startswith('_cylc_wallclock')
)
})
reload_successor.jobs = self.jobs

Expand Down
33 changes: 29 additions & 4 deletions cylc/flow/xtrigger_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,29 @@ def update(self, xtriggers: 'XtriggerCollator'):
self.sequential_xtrigger_labels.update(
xtriggers.sequential_xtrigger_labels)

def purge_user_xtriggers(self):
"""Purge user-defined triggers before a reload.

User-defined triggers need to be recreated from the config file.
Auto-defined triggers (retries) need to be kept.

"""
nuke = []
for label in self.functx_map:
if (
label.startswith("_cylc_wallclock")
or not label.startswith("_cylc")
):
# _cylc_wallclock xtriggers are user-defined
# otherwise all _cylc xtriggers are automatic.
nuke.append(label)
for label in nuke:
del self.functx_map[label]
with suppress(KeyError):
self.wall_clock_labels.remove(label)
with suppress(KeyError):
self.sequential_xtrigger_labels.remove(label)

def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
"""Add a new xtrigger function.

Expand All @@ -201,8 +224,8 @@ def add_trig(self, label: str, fctx: 'SubFuncContext', fdir: str) -> None:
return

if (
not label.startswith('_cylc_retry_') and not
label.startswith('_cylc_submit_retry_')
not label.startswith('_cylc_retry_') and
not label.startswith('_cylc_submit_retry_')
):
# (the "_wall_clock" function fails "wall_clock" validation)
self._validate(label, fctx, fdir)
Expand Down Expand Up @@ -526,8 +549,10 @@ def __init__(
self.do_housekeeping = False
self.xtriggers = XtriggerCollator()

def add_xtriggers(self, xtriggers: 'XtriggerCollator'):
"""Add pre-collated and validated xtriggers."""
def add_xtriggers(self, xtriggers: 'XtriggerCollator', reload=False):
"""Add validated xtriggers, parsed from the workflow config."""
if reload:
self.xtriggers.purge_user_xtriggers()
self.xtriggers.update(xtriggers)
self.xtriggers.sequential_xtriggers_default = (
xtriggers.sequential_xtriggers_default
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_sequential_xtriggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async def test_reload(sequential, start):
assert pre_reload.is_xtrigger_sequential is True

# reload the workflow
sequential.pool.reload_taskdefs(sequential.config)
sequential.pool.reload(sequential.config)

# the original task proxy should have been replaced
post_reload = sequential.pool.get_task(ISO8601Point('2000'), 'foo')
Expand Down
79 changes: 79 additions & 0 deletions tests/integration/test_task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2100,7 +2100,86 @@ async def test_trigger_queue(one, run, db_select, complete):
await complete(one, timeout=2)
assert db_select(one, False, 'task_outputs', 'flow_nums') == [('[1, 2]',), ('[1]',)]

async def test_reload_xtriggers(flow, scheduler, start):
"""It should rebuild xtriggers when the workflow is reloaded.

See https://github.com/cylc/cylc-flow/pull/6263
"""
config = {
'scheduling': {
'initial cycle point': '2000',
'graph': {
'R1': '''
@a => foo
@b => foo
'''
},
'xtriggers': {
'a': 'wall_clock(offset="P0D")',
'b': 'wall_clock(offset="P5D")',
},
}
}
id_ = flow(config)
schd: Scheduler = scheduler(id_)

def list_xtrig_mgr():
"""List xtrigs from the xtrigger_mgr."""
nonlocal schd
return {
key: repr(value)
for key, value in schd.xtrigger_mgr.xtriggers.functx_map.items()
}

async def list_data_store():
"""List xtrigs from the data_store_mgr."""
nonlocal schd
await schd.update_data_structure()
return {
value.label: key
for key, value in schd.data_store_mgr.data[schd.tokens.id][
TASK_PROXIES
][
schd.tokens.duplicate(cycle='20000101T0000Z', task='foo').id
].xtriggers.items()
}

async with start(schd):
# check xtrigs on startup
assert list_xtrig_mgr() == {
'a': '<SubFuncContext wall_clock(offset=P0D):10.0>',
'b': '<SubFuncContext wall_clock(offset=P5D):10.0>',
}
assert await list_data_store() == {
'a': 'wall_clock(trigger_time=946684800)',
'b': 'wall_clock(trigger_time=947116800)',
}

# remove @a
config['scheduling']['xtriggers'].pop('a')
# modify @b
config['scheduling']['xtriggers']['b'] = 'wall_clock(offset="PT12H")'
# add @c
config['scheduling']['xtriggers']['c'] = 'wall_clock(offset="PT1H")'
config['scheduling']['graph']['R1'] = config['scheduling']['graph'][
'R1'
].replace('@a', '@c')

# reload
flow(config, id_=id_)
await commands.run_cmd(commands.reload_workflow, schd)

# check xtrigs post-reload
assert list_xtrig_mgr() == {
'b': '<SubFuncContext wall_clock(offset=PT12H):10.0>',
'c': '<SubFuncContext wall_clock(offset=PT1H):10.0>',
}
assert await list_data_store() == {
'b': 'wall_clock(trigger_time=946728000)',
'c': 'wall_clock(trigger_time=946688400)',
}


async def test_trigger_unqueued(flow, scheduler, start):
"""Test triggering an unqueued active task.

Expand Down