Skip to content

Commit

Permalink
Refactor task killing
Browse files Browse the repository at this point in the history
  • Loading branch information
MetRonnie committed Nov 4, 2024
1 parent 478b0d0 commit 00b2a97
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 51 deletions.
20 changes: 8 additions & 12 deletions cylc/flow/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
from cylc.flow.network.schema import WorkflowStopMode
from cylc.flow.parsec.exceptions import ParsecError
from cylc.flow.task_id import TaskID
from cylc.flow.task_state import TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED
from cylc.flow.workflow_status import RunMode, StopMode

from metomi.isodatetime.parsers import TimePointParser
Expand Down Expand Up @@ -256,19 +255,16 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):

@_command('kill_tasks')
async def kill_tasks(schd: 'Scheduler', tasks: Iterable[str]):
"""Kill all tasks or a task/family if options are provided."""
"""Kill tasks.
Args:
tasks: Tasks/families/globs to kill.
"""
validate.is_tasks(tasks)
yield
itasks, _, bad_items = schd.pool.filter_task_proxies(tasks)
if schd.get_run_mode() == RunMode.SIMULATION:
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
itask.state_reset(TASK_STATUS_FAILED)
schd.data_store_mgr.delta_task_state(itask)
yield len(bad_items)
else:
schd.task_job_mgr.kill_task_jobs(schd.workflow, itasks)
yield len(bad_items)
active, _, unmatched = schd.pool.filter_task_proxies(tasks)
num_unkillable = schd.kill_tasks(active)
yield len(unmatched) + num_unkillable


@_command('hold')
Expand Down
5 changes: 3 additions & 2 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2275,7 +2275,7 @@ def _generate_broadcast_node_deltas(self, node_data, node_type):
# -----------
# Task Deltas
# -----------
def delta_task_state(self, itask):
def delta_task_state(self, itask: 'TaskProxy') -> None:
"""Create delta for change in task proxy state.
Args:
Expand All @@ -2291,10 +2291,11 @@ def delta_task_state(self, itask):
update_time = time()

# update task instance
tp_delta = self.updated[TASK_PROXIES].setdefault(
tp_delta: PbTaskProxy = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
tp_delta.state = itask.state.status
tp_delta.is_held = itask.state.is_held
self.state_update_families.add(tproxy.first_parent)
if tp_delta.state in self.latest_state_tasks:
tp_ref = itask.identity
Expand Down
46 changes: 43 additions & 3 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@
REMOTE_INIT_FAILED,
)
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUSES_NEVER_ACTIVE,
TASK_STATUS_FAILED,
TASK_STATUS_PREPARING,
TASK_STATUS_RUNNING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_WAITING,
TASK_STATUSES_ACTIVE,
TASK_STATUSES_NEVER_ACTIVE,
)
from cylc.flow.taskdef import TaskDef
from cylc.flow.templatevars import eval_var
Expand All @@ -145,12 +146,15 @@
from cylc.flow.xtrigger_mgr import XtriggerManager

if TYPE_CHECKING:
from optparse import Values

# BACK COMPAT: typing_extensions.Literal
# FROM: Python 3.7
# TO: Python 3.8
from typing_extensions import Literal
from optparse import Values

from cylc.flow.network.resolvers import TaskMsg
from cylc.flow.task_proxy import TaskProxy


class SchedulerStop(CylcError):
Expand Down Expand Up @@ -1010,6 +1014,42 @@ def _set_stop(self, stop_mode: Optional[StopMode] = None) -> None:
self.stop_mode = stop_mode
self.update_data_store()

def kill_tasks(
self, itasks: 'Iterable[TaskProxy]', warn: bool = True
) -> int:
"""Kill tasks if they are in a killable state.
Args:
itasks: Tasks to kill.
warn: Whether to warn about tasks that are not in a killable state.
Returns number of tasks that could not be killed.
"""
jobless = self.get_run_mode() == RunMode.SIMULATION
to_kill: List[TaskProxy] = []
unkillable: List[TaskProxy] = []
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
itask.state_reset(
# directly reset to failed in sim mode, else let
# task_job_mgr handle it
status=(TASK_STATUS_FAILED if jobless else None),
is_held=True,
)
self.data_store_mgr.delta_task_state(itask)
to_kill.append(itask)
else:
unkillable.append(itask)
if warn:
LOG.warning(
"Tasks not killable: "
f"{', '.join(sorted(t.identity for t in unkillable))}"
)
if not jobless:
self.task_job_mgr.kill_task_jobs(self.workflow, to_kill)

return len(unkillable)

def get_restart_num(self) -> int:
"""Return the number of the restart, else 0 if not a restart.
Expand Down
29 changes: 12 additions & 17 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@
)
from shutil import rmtree
from time import time
from typing import TYPE_CHECKING, Any, Union, Optional
from typing import (
TYPE_CHECKING,
Any,
Iterable,
Optional,
Union,
)

from cylc.flow import LOG
from cylc.flow.job_runner_mgr import JobPollContext
Expand Down Expand Up @@ -99,7 +105,6 @@
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_WAITING,
TASK_STATUSES_ACTIVE
)
from cylc.flow.wallclock import (
get_current_time_string,
Expand Down Expand Up @@ -172,22 +177,12 @@ def check_task_jobs(self, workflow, task_pool):
if poll_tasks:
self.poll_task_jobs(workflow, poll_tasks)

def kill_task_jobs(self, workflow, itasks):
"""Kill jobs of active tasks, and hold the tasks.
If items is specified, kill active tasks matching given IDs.
"""
to_kill_tasks = []
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
itask.state_reset(is_held=True)
self.data_store_mgr.delta_task_held(itask)
to_kill_tasks.append(itask)
else:
LOG.warning(f"[{itask}] not killable")
def kill_task_jobs(
self, workflow: str, itasks: 'Iterable[TaskProxy]'
) -> None:
"""Issue the command to kill jobs of active tasks."""
self._run_job_cmd(
self.JOBS_KILL, workflow, to_kill_tasks,
self.JOBS_KILL, workflow, itasks,
self._kill_task_jobs_callback,
self._kill_task_jobs_callback_255
)
Expand Down
1 change: 0 additions & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,6 @@ def create_data_store_elements(self, itask):
itask=itask
)
self.data_store_mgr.delta_task_state(itask)
self.data_store_mgr.delta_task_held(itask)
self.data_store_mgr.delta_task_queued(itask)
self.data_store_mgr.delta_task_runahead(itask)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿● a </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊗ Y </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊗ b </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊘ 2 </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̎⊘ 2 </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊙ X </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊙ a </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊘ Y </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊘ Y1 </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊘ c </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊙ b </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̎⊘ Y </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̎⊘ Y1 </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̎⊘ c </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̎⊙ b </span>
<span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿◌ Y1 </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿◌ c </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊗ b </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊘ 2 </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̎⊘ 2 </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊙ X </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊙ a </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊘ Y </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊘ Y1 </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊘ c </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊙ b </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̎⊘ Y </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̎⊘ Y1 </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̎⊘ c </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̎⊙ b </span>
<span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"></span><span style="color:#e5e5e5;background:#000000">-</span><span style="color:#000000;background:#e5e5e5"></span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">~cylc </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5;font-weight:bold">test_task_states</span><span style="color:#000000;background:#e5e5e5"> - </span><span style="color:#cdcd00;background:#e5e5e5">paused</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#00cdcd;background:#e5e5e5">1■</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#5c5cff;background:#e5e5e5">1■</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#00cd00;background:#e5e5e5">1■</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#ff0000;background:#e5e5e5">1■</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#ff00ff;background:#e5e5e5">1■</span><span style="color:#000000;background:#e5e5e5"> </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊘ 2 </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̎⊘ 2 </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">-</span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊙ X </span>
<span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5"> </span><span style="color:#000000;background:#e5e5e5">̿⊙ a </span>
<span style="color:#000000;background:#e5e5e5"> </span>
Expand Down
Loading

0 comments on commit 00b2a97

Please sign in to comment.