From 00b2a97aeec8e2aada5918230c7bd727a8168c9f Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Mon, 4 Nov 2024 17:48:18 +0000
Subject: [PATCH] Refactor task killing
---
cylc/flow/commands.py | 20 ++++----
cylc/flow/data_store_mgr.py | 5 +-
cylc/flow/scheduler.py | 46 +++++++++++++++++--
cylc/flow/task_job_mgr.py | 29 +++++-------
cylc/flow/task_pool.py | 1 -
..._states.filter-not-waiting-or-expired.html | 10 ++--
.../test_task_states.filter-not-waiting.html | 10 ++--
.../test_task_states.filter-submitted.html | 2 +-
.../test_task_states.unfiltered.html | 10 ++--
9 files changed, 82 insertions(+), 51 deletions(-)
diff --git a/cylc/flow/commands.py b/cylc/flow/commands.py
index b8b777e0957..3759f119a1b 100644
--- a/cylc/flow/commands.py
+++ b/cylc/flow/commands.py
@@ -77,7 +77,6 @@
from cylc.flow.network.schema import WorkflowStopMode
from cylc.flow.parsec.exceptions import ParsecError
from cylc.flow.task_id import TaskID
-from cylc.flow.task_state import TASK_STATUSES_ACTIVE, TASK_STATUS_FAILED
from cylc.flow.workflow_status import RunMode, StopMode
from metomi.isodatetime.parsers import TimePointParser
@@ -256,19 +255,16 @@ async def poll_tasks(schd: 'Scheduler', tasks: Iterable[str]):
@_command('kill_tasks')
async def kill_tasks(schd: 'Scheduler', tasks: Iterable[str]):
- """Kill all tasks or a task/family if options are provided."""
+ """Kill tasks.
+
+ Args:
+ tasks: Tasks/families/globs to kill.
+ """
validate.is_tasks(tasks)
yield
- itasks, _, bad_items = schd.pool.filter_task_proxies(tasks)
- if schd.get_run_mode() == RunMode.SIMULATION:
- for itask in itasks:
- if itask.state(*TASK_STATUSES_ACTIVE):
- itask.state_reset(TASK_STATUS_FAILED)
- schd.data_store_mgr.delta_task_state(itask)
- yield len(bad_items)
- else:
- schd.task_job_mgr.kill_task_jobs(schd.workflow, itasks)
- yield len(bad_items)
+ active, _, unmatched = schd.pool.filter_task_proxies(tasks)
+ num_unkillable = schd.kill_tasks(active)
+ yield len(unmatched) + num_unkillable
@_command('hold')
diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py
index a4b28d44fdc..e7252549f4e 100644
--- a/cylc/flow/data_store_mgr.py
+++ b/cylc/flow/data_store_mgr.py
@@ -2275,7 +2275,7 @@ def _generate_broadcast_node_deltas(self, node_data, node_type):
# -----------
# Task Deltas
# -----------
- def delta_task_state(self, itask):
+ def delta_task_state(self, itask: 'TaskProxy') -> None:
"""Create delta for change in task proxy state.
Args:
@@ -2291,10 +2291,11 @@ def delta_task_state(self, itask):
update_time = time()
# update task instance
- tp_delta = self.updated[TASK_PROXIES].setdefault(
+ tp_delta: PbTaskProxy = self.updated[TASK_PROXIES].setdefault(
tp_id, PbTaskProxy(id=tp_id))
tp_delta.stamp = f'{tp_id}@{update_time}'
tp_delta.state = itask.state.status
+ tp_delta.is_held = itask.state.is_held
self.state_update_families.add(tproxy.first_parent)
if tp_delta.state in self.latest_state_tasks:
tp_ref = itask.identity
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index d3f7fd18a36..a6893b6213e 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -122,12 +122,13 @@
REMOTE_INIT_FAILED,
)
from cylc.flow.task_state import (
- TASK_STATUSES_ACTIVE,
- TASK_STATUSES_NEVER_ACTIVE,
+ TASK_STATUS_FAILED,
TASK_STATUS_PREPARING,
TASK_STATUS_RUNNING,
TASK_STATUS_SUBMITTED,
TASK_STATUS_WAITING,
+ TASK_STATUSES_ACTIVE,
+ TASK_STATUSES_NEVER_ACTIVE,
)
from cylc.flow.taskdef import TaskDef
from cylc.flow.templatevars import eval_var
@@ -145,12 +146,15 @@
from cylc.flow.xtrigger_mgr import XtriggerManager
if TYPE_CHECKING:
+ from optparse import Values
+
# BACK COMPAT: typing_extensions.Literal
# FROM: Python 3.7
# TO: Python 3.8
from typing_extensions import Literal
- from optparse import Values
+
from cylc.flow.network.resolvers import TaskMsg
+ from cylc.flow.task_proxy import TaskProxy
class SchedulerStop(CylcError):
@@ -1010,6 +1014,42 @@ def _set_stop(self, stop_mode: Optional[StopMode] = None) -> None:
self.stop_mode = stop_mode
self.update_data_store()
+ def kill_tasks(
+ self, itasks: 'Iterable[TaskProxy]', warn: bool = True
+ ) -> int:
+ """Kill tasks if they are in a killable state.
+
+ Args:
+ itasks: Tasks to kill.
+ warn: Whether to warn about tasks that are not in a killable state.
+
+ Returns number of tasks that could not be killed.
+ """
+ jobless = self.get_run_mode() == RunMode.SIMULATION
+ to_kill: List[TaskProxy] = []
+ unkillable: List[TaskProxy] = []
+ for itask in itasks:
+ if itask.state(*TASK_STATUSES_ACTIVE):
+ itask.state_reset(
+ # directly reset to failed in sim mode, else let
+ # task_job_mgr handle it
+ status=(TASK_STATUS_FAILED if jobless else None),
+ is_held=True,
+ )
+ self.data_store_mgr.delta_task_state(itask)
+ to_kill.append(itask)
+ else:
+ unkillable.append(itask)
+ if warn:
+ LOG.warning(
+ "Tasks not killable: "
+ f"{', '.join(sorted(t.identity for t in unkillable))}"
+ )
+ if not jobless:
+ self.task_job_mgr.kill_task_jobs(self.workflow, to_kill)
+
+ return len(unkillable)
+
def get_restart_num(self) -> int:
"""Return the number of the restart, else 0 if not a restart.
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 500aa830b2c..8691a5efb5f 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -35,7 +35,13 @@
)
from shutil import rmtree
from time import time
-from typing import TYPE_CHECKING, Any, Union, Optional
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Iterable,
+ Optional,
+ Union,
+)
from cylc.flow import LOG
from cylc.flow.job_runner_mgr import JobPollContext
@@ -99,7 +105,6 @@
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING,
TASK_STATUS_WAITING,
- TASK_STATUSES_ACTIVE
)
from cylc.flow.wallclock import (
get_current_time_string,
@@ -172,22 +177,12 @@ def check_task_jobs(self, workflow, task_pool):
if poll_tasks:
self.poll_task_jobs(workflow, poll_tasks)
- def kill_task_jobs(self, workflow, itasks):
- """Kill jobs of active tasks, and hold the tasks.
-
- If items is specified, kill active tasks matching given IDs.
-
- """
- to_kill_tasks = []
- for itask in itasks:
- if itask.state(*TASK_STATUSES_ACTIVE):
- itask.state_reset(is_held=True)
- self.data_store_mgr.delta_task_held(itask)
- to_kill_tasks.append(itask)
- else:
- LOG.warning(f"[{itask}] not killable")
+ def kill_task_jobs(
+ self, workflow: str, itasks: 'Iterable[TaskProxy]'
+ ) -> None:
+ """Issue the command to kill jobs of active tasks."""
self._run_job_cmd(
- self.JOBS_KILL, workflow, to_kill_tasks,
+ self.JOBS_KILL, workflow, itasks,
self._kill_task_jobs_callback,
self._kill_task_jobs_callback_255
)
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index f645403df15..ee093c92dcc 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -245,7 +245,6 @@ def create_data_store_elements(self, itask):
itask=itask
)
self.data_store_mgr.delta_task_state(itask)
- self.data_store_mgr.delta_task_held(itask)
self.data_store_mgr.delta_task_queued(itask)
self.data_store_mgr.delta_task_runahead(itask)
diff --git a/tests/integration/tui/screenshots/test_task_states.filter-not-waiting-or-expired.html b/tests/integration/tui/screenshots/test_task_states.filter-not-waiting-or-expired.html
index 3f9d6ee97e5..8cc7056da94 100644
--- a/tests/integration/tui/screenshots/test_task_states.filter-not-waiting-or-expired.html
+++ b/tests/integration/tui/screenshots/test_task_states.filter-not-waiting-or-expired.html
@@ -8,13 +8,13 @@
̿● a
- ̿⊗ Y
̿⊗ b
- - ̿⊘ 2
+ - ̎⊘ 2
- ̿⊙ X
̿⊙ a
- - ̿⊘ Y
- - ̿⊘ Y1
- ̿⊘ c
- ̿⊙ b
+ - ̎⊘ Y
+ - ̎⊘ Y1
+ ̎⊘ c
+ ̎⊙ b
diff --git a/tests/integration/tui/screenshots/test_task_states.filter-not-waiting.html b/tests/integration/tui/screenshots/test_task_states.filter-not-waiting.html
index 40bb126756b..78ec577a43a 100644
--- a/tests/integration/tui/screenshots/test_task_states.filter-not-waiting.html
+++ b/tests/integration/tui/screenshots/test_task_states.filter-not-waiting.html
@@ -10,13 +10,13 @@
- ̿◌ Y1
̿◌ c
̿⊗ b
- - ̿⊘ 2
+ - ̎⊘ 2
- ̿⊙ X
̿⊙ a
- - ̿⊘ Y
- - ̿⊘ Y1
- ̿⊘ c
- ̿⊙ b
+ - ̎⊘ Y
+ - ̎⊘ Y1
+ ̎⊘ c
+ ̎⊙ b
diff --git a/tests/integration/tui/screenshots/test_task_states.filter-submitted.html b/tests/integration/tui/screenshots/test_task_states.filter-submitted.html
index 7c00522a291..2cf989253d4 100644
--- a/tests/integration/tui/screenshots/test_task_states.filter-submitted.html
+++ b/tests/integration/tui/screenshots/test_task_states.filter-submitted.html
@@ -3,7 +3,7 @@
- ~cylc
- test_task_states - paused 1■ 1■ 1■ 1■ 1■
- - ̿⊘ 2
+ - ̎⊘ 2
- ̿⊙ X
̿⊙ a
diff --git a/tests/integration/tui/screenshots/test_task_states.unfiltered.html b/tests/integration/tui/screenshots/test_task_states.unfiltered.html
index 9a9161d8ca7..a5fdb9fd128 100644
--- a/tests/integration/tui/screenshots/test_task_states.unfiltered.html
+++ b/tests/integration/tui/screenshots/test_task_states.unfiltered.html
@@ -9,13 +9,13 @@
- ̿◌ Y1
̿◌ c
̿⊗ b
- - ̿⊘ 2
+ - ̎⊘ 2
- ̿⊙ X
̿⊙ a
- - ̿⊘ Y
- - ̿⊘ Y1
- ̿⊘ c
- ̿⊙ b
+ - ̎⊘ Y
+ - ̎⊘ Y1
+ ̎⊘ c
+ ̎⊙ b
- ̊○ 3
- ̊○ X
̊○ a