From afae29dcdcd97082325b594da8cc84584d5b197c Mon Sep 17 00:00:00 2001 From: Blake Caldwell Date: Sun, 28 Mar 2021 23:26:53 -0400 Subject: [PATCH] MAINT: handle terminating simulations Depends on terminate function in hnn-core 76162b10 Update exception handling for threads with a custom excepthook that doesn't emit an extra done signal. Also protect SimThread.killed variable with a lock because the main GUI thread accesses it via the stop method. --- hnn/qt_main.py | 19 +---- hnn/qt_thread.py | 200 +++++++++++++++++------------------------------ 2 files changed, 74 insertions(+), 145 deletions(-) diff --git a/hnn/qt_main.py b/hnn/qt_main.py index 16505aca..ff55f0d8 100644 --- a/hnn/qt_main.py +++ b/hnn/qt_main.py @@ -10,7 +10,6 @@ import multiprocessing import numpy as np import traceback -from collections import namedtuple from copy import deepcopy from psutil import cpu_count @@ -36,7 +35,7 @@ write_gids_param, get_fname) from .simdata import SimData from .qt_sim import SIMCanvas -from .qt_thread import SimThread, OptThread +from .qt_thread import SimThread, OptThread, _add_missing_frames from .qt_lib import (getmplDPI, getscreengeom, lookupresource, setscalegeomcenter) from .specfn import spec_dpl_kernel, save_spec_data @@ -85,18 +84,6 @@ def getPyComm(): return 'python3' -def _add_missing_frames(tb): - fake_tb = namedtuple( - 'fake_tb', ('tb_frame', 'tb_lasti', 'tb_lineno', 'tb_next') - ) - result = fake_tb(tb.tb_frame, tb.tb_lasti, tb.tb_lineno, tb.tb_next) - frame = tb.tb_frame.f_back - while frame: - result = fake_tb(frame, frame.f_lasti, frame.f_lineno, result) - frame = frame.f_back - return result - - def bringwintobot(win): # win.show() # win.lower() @@ -966,9 +953,7 @@ def stopsim(self): print('Terminating simulation. . .') self.statusBar().showMessage('Terminating sim. . .') self.runningsim = False - self.runthread.stop() # killed = True # terminate() - self.runthread.wait(1000) - self.runthread.terminate() + self.runthread.stop() self.btnsim.setText("Run Simulation") self.qbtn.setEnabled(True) self.statusBar().showMessage('') diff --git a/hnn/qt_thread.py b/hnn/qt_thread.py index 8fe38ddd..50b10ad6 100755 --- a/hnn/qt_thread.py +++ b/hnn/qt_thread.py @@ -8,11 +8,11 @@ import sys from math import ceil, isclose from contextlib import redirect_stdout -from psutil import wait_procs, process_iter, NoSuchProcess import traceback from queue import Queue -from threading import Event +from threading import Event, Lock import numpy as np +from collections import namedtuple import nlopt from PyQt5 import QtCore @@ -72,94 +72,37 @@ def __init__(self, data, params): self.params = params -def _kill_list_of_procs(process_list): - """Try killing processes - - Parameters - ---------- - process_list : list of psutil.Process objects - List containing processes to terminate - - Returns - ---------- - alive: list of psutil.Process objects - List containing processes that are still alive - """ - - # try terminate first - for p in process_list: - try: - p.terminate() - except NoSuchProcess: - pass - _, alive = wait_procs(process_list, timeout=3) - - # now try kill - for p in alive: - p.kill() - _, alive = wait_procs(process_list, timeout=3) - - return alive +def _add_missing_frames(tb): + fake_tb = namedtuple( + 'fake_tb', ('tb_frame', 'tb_lasti', 'tb_lineno', 'tb_next') + ) + result = fake_tb(tb.tb_frame, tb.tb_lasti, tb.tb_lineno, tb.tb_next) + frame = tb.tb_frame.f_back + while frame: + result = fake_tb(frame, frame.f_lasti, frame.f_lineno, result) + frame = frame.f_back + return result -def _get_nrniv_procs_running(): - """return a list of nrniv processes running - - Returns - ---------- - process_list : list of psutil.Process objects - List containing processes matching name 'nrniv' - """ - process_list = [] - name = 'nrniv' - for p in process_iter(attrs=["name", "exe", "cmdline"]): - if name == p.info['name'] or \ - p.info['exe'] and os.path.basename(p.info['exe']) == name or \ - p.info['cmdline'] and p.info['cmdline'][0] == name: - process_list.append(p) - return process_list - - -def _kill_and_check_nrniv_procs(): - """Kill all running processes named nrniv""" - procs = _get_nrniv_procs_running() - if len(procs) > 0: - running = _kill_list_of_procs(procs) - if len(running) > 0: - pids = [str(proc.pid) for proc in running] - print("ERROR: failed to kill nrniv process(es) %s" % - ','.join(pids)) - - -def simulate(params, n_procs=None): +def simulate(net): """Start the simulation with hnn_core.simulate Parameters ---------- - params : dict - The parameters - - n_procs : int | None - The number of MPI processes requested by the user. If None, then will - attempt to detect number of cores (including hyperthreads) and start - parallel simulation over all of them. + net : Network object + The constructed Network object from hnn-core """ - # create the network from the parameter file. note, NEURON objects haven't - # been created yet - net = Network(params, add_drives_from_params=True) - sim_data = {} # run the simulation with MPIBackend for faster completion time - with MPIBackend(n_procs=n_procs, mpi_cmd='mpiexec'): - record_vsoma = bool(params['record_vsoma']) - sim_data['raw_dpls'] = simulate_dipole(net, params['N_trials'], - postproc=False, - record_vsoma=record_vsoma) + record_vsoma = bool(net.params['record_vsoma']) + sim_data['raw_dpls'] = simulate_dipole(net, net.params['N_trials'], + postproc=False, + record_vsoma=record_vsoma) # hnn-core changes this to bool, change back to int - if isinstance(params['record_vsoma'], bool): - params['record_vsoma'] = int(params['record_vsoma']) + if isinstance(net.params['record_vsoma'], bool): + net.params['record_vsoma'] = int(record_vsoma) sim_data['gid_ranges'] = net.gid_ranges sim_data['spikes'] = net.cell_response sim_data['vsoma'] = net.cell_response.vsoma @@ -210,16 +153,23 @@ class SimThread(QtCore.QThread): done_signal : TextSignal object Signal to be emitted at completion of a simulation. Not emitted for when running an optimization simulation. + killed_lock : threading.Lock + Lock to protect killed variable mutual exclusion + backend : MPIBackend + The hnn-core backend responsible for running simulations """ def __init__(self, ncore, params, result_callback, mainwin): QtCore.QThread.__init__(self) + sys.excepthook = self._excepthook self.ncore = ncore self.params = params self.mainwin = mainwin self.is_optimization = self.mainwin.is_optimization self.baseparamwin = self.mainwin.baseparamwin self.killed = False + self.backend = None + self.killed_lock = Lock() self.paramfn = os.path.join(get_output_dir(), 'param', self.params['sim_prefix'] + '.param') @@ -236,6 +186,10 @@ def __init__(self, ncore, params, result_callback, mainwin): self.done_signal = TextSignal() self.done_signal.tsig.connect(self.mainwin.done) + def _excepthook(self, exc_type, exc_value, exc_tb): + enriched_tb = _add_missing_frames(exc_tb) if exc_tb else exc_tb + traceback.print_exception(exc_type, exc_value, enriched_tb) + def _updatewaitsimwin(self, txt): """Used to write messages to simulation window""" self.msg_signal.tsig.emit(txt) @@ -257,8 +211,11 @@ def flush(self): def stop(self): """Terminate running simulation""" - _kill_and_check_nrniv_procs() - self.killed = True + with self.killed_lock: + self.killed = True + + if self.backend is not None: + self.backend.terminate() def run(self, sim_length=None): """Start simulation @@ -272,13 +229,7 @@ def run(self, sim_length=None): msg = '' banner = not self.is_optimization - try: - self._run(banner=banner, sim_length=sim_length) # run simulation - # update params in all windows (optimization) - except RuntimeError as e: - msg = str(e) - self.done_signal.tsig.emit(msg) - return + self._run(banner=banner, sim_length=sim_length) # run simulation if not self.is_optimization: self.param_signal.psig.emit(self.params) @@ -288,7 +239,8 @@ def run(self, sim_length=None): self.quit() def _run(self, banner=True, sim_length=None): - self.killed = False + with self.killed_lock: + self.killed = False sim_params = hnn_core_compat_params(self.params) if sim_length is not None: @@ -301,27 +253,35 @@ def _run(self, banner=True, sim_length=None): try: sim_log = self._log_sim_status(parent=self) with redirect_stdout(sim_log): - sim_data = simulate(sim_params, self.ncore) + # create the network from the parameter file + # Note: NEURON objects haven't been created yet + net = Network(sim_params, add_drives_from_params=True) + with MPIBackend( + n_procs=self.ncore, mpi_cmd='mpiexec') as backend: + self.backend = backend + with self.killed_lock: + if self.killed: + raise RuntimeError("Terminated") + sim_data = simulate(net) + self.backend = None break except RuntimeError as e: if self.ncore == 1: # can't reduce ncore any more print(str(e)) self._updatewaitsimwin(str(e)) - _kill_and_check_nrniv_procs() raise RuntimeError("Simulation failed to start") # check if proc was killed before retrying with fewer cores - if self.killed: - # exit using RuntimeError - raise RuntimeError("Terminated") + with self.killed_lock: + if self.killed: + raise RuntimeError("Terminated") self.ncore = ceil(self.ncore / 2) txt = "INFO: Failed starting simulation, retrying with %d cores" \ % self.ncore print(txt) self._updatewaitsimwin(txt) - _kill_and_check_nrniv_procs() # put sim_data into the val attribute of a ResultObj self.result_signal.sig.emit(ResultObj(sim_data, self.params)) @@ -468,19 +428,15 @@ def __init__(self, ncore, params, num_steps, seed, sim_data, def run(self): msg = '' - try: - self._run() # run optimization - except RuntimeError as e: - msg = str(e) - + self._run() # run optimization self.done_signal.tsig.emit(msg) def stop(self): """Terminate running simulation""" - self.sim_thread.stop() - self.sim_thread.terminate() - self.sim_thread.wait() - self.killed = True + with self.killed_lock: + self.killed = True + self.sim_thread.stop() + self.done_signal.tsig.emit("Optimization terminated") def _run(self): @@ -585,16 +541,12 @@ def _run(self): self.result_callback, mainwin=self.mainwin) self.sim_running = True - try: - self.sim_thread.run() - self.sim_thread.wait() + self.sim_thread.run() + self.sim_thread.wait() + with self.killed_lock: if self.killed: self.quit() - self.sim_running = False - except Exception: - traceback.print_exc() - raise RuntimeError("Failed to run final simulation. " - "See previous traceback.") + self.sim_running = False def _get_initial_data(self): """Run an initial simulation if necessary""" @@ -617,16 +569,12 @@ def _get_initial_data(self): self.result_callback, mainwin=self.mainwin) self.sim_running = True - try: - self.sim_thread.run() - self.sim_thread.wait() + self.sim_thread.run() + self.sim_thread.wait() + with self.killed_lock: if self.killed: self.quit() - self.sim_running = False - except Exception: - traceback.print_exc() - raise RuntimeError("Failed to run initial simulation. " - "See previous traceback.") + self.sim_running = False # results are in self.sim_data now @@ -686,17 +634,13 @@ def _opt_sim(self, new_params, grad=0): mainwin=self.mainwin) self.sim_running = True - try: - # may not need to run the entire simulation - self.sim_thread.run(sim_length=self.opt_end) - self.sim_thread.wait() + # may not need to run the entire simulation + self.sim_thread.run(sim_length=self.opt_end) + self.sim_thread.wait() + with self.killed_lock: if self.killed: self.quit() - self.sim_running = False - except Exception: - traceback.print_exc() - raise RuntimeError("Failed to run simulation. " - "See previous traceback.") + self.sim_running = False # calculate wRMSE for all steps err_queue = Queue()