Skip to content

Commit

Permalink
MAINT: handle terminating simulations
Browse files Browse the repository at this point in the history
Depends on terminate function in hnn-core 76162b10

Update exception handling for threads with a custom excepthook
that doesn't emit an extra done signal. Also protect SimThread.killed
variable with a lock because the main GUI thread accesses it
via the stop method.
  • Loading branch information
Blake Caldwell committed Mar 29, 2021
1 parent b2803ff commit afae29d
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 145 deletions.
19 changes: 2 additions & 17 deletions hnn/qt_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import multiprocessing
import numpy as np
import traceback
from collections import namedtuple
from copy import deepcopy
from psutil import cpu_count

Expand All @@ -36,7 +35,7 @@
write_gids_param, get_fname)
from .simdata import SimData
from .qt_sim import SIMCanvas
from .qt_thread import SimThread, OptThread
from .qt_thread import SimThread, OptThread, _add_missing_frames
from .qt_lib import (getmplDPI, getscreengeom, lookupresource,
setscalegeomcenter)
from .specfn import spec_dpl_kernel, save_spec_data
Expand Down Expand Up @@ -85,18 +84,6 @@ def getPyComm():
return 'python3'


def _add_missing_frames(tb):
fake_tb = namedtuple(
'fake_tb', ('tb_frame', 'tb_lasti', 'tb_lineno', 'tb_next')
)
result = fake_tb(tb.tb_frame, tb.tb_lasti, tb.tb_lineno, tb.tb_next)
frame = tb.tb_frame.f_back
while frame:
result = fake_tb(frame, frame.f_lasti, frame.f_lineno, result)
frame = frame.f_back
return result


def bringwintobot(win):
# win.show()
# win.lower()
Expand Down Expand Up @@ -966,9 +953,7 @@ def stopsim(self):
print('Terminating simulation. . .')
self.statusBar().showMessage('Terminating sim. . .')
self.runningsim = False
self.runthread.stop() # killed = True # terminate()
self.runthread.wait(1000)
self.runthread.terminate()
self.runthread.stop()

This comment has been minimized.

Copy link
@ntolley

ntolley Mar 31, 2021

Contributor

What does self.runthread.terminate() do, is it redundant with stop?

This comment has been minimized.

Copy link
@blakecaldwell

blakecaldwell Mar 31, 2021

Member

Only stop is overloaded in qt_thread.py. terminate was called as a failsafe, but I decided it was bad practice to call here (kind of like an except catch-all)

self.btnsim.setText("Run Simulation")
self.qbtn.setEnabled(True)
self.statusBar().showMessage('')
Expand Down
200 changes: 72 additions & 128 deletions hnn/qt_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import sys
from math import ceil, isclose
from contextlib import redirect_stdout
from psutil import wait_procs, process_iter, NoSuchProcess
import traceback
from queue import Queue
from threading import Event
from threading import Event, Lock
import numpy as np
from collections import namedtuple

import nlopt
from PyQt5 import QtCore
Expand Down Expand Up @@ -72,94 +72,37 @@ def __init__(self, data, params):
self.params = params


def _kill_list_of_procs(process_list):
"""Try killing processes
Parameters
----------
process_list : list of psutil.Process objects
List containing processes to terminate
Returns
----------
alive: list of psutil.Process objects
List containing processes that are still alive
"""

# try terminate first
for p in process_list:
try:
p.terminate()
except NoSuchProcess:
pass
_, alive = wait_procs(process_list, timeout=3)

# now try kill
for p in alive:
p.kill()
_, alive = wait_procs(process_list, timeout=3)

return alive
def _add_missing_frames(tb):

This comment has been minimized.

Copy link
@ntolley

ntolley Mar 31, 2021

Contributor

quick docstring to explain the function would be useful

This comment has been minimized.

Copy link
@blakecaldwell

blakecaldwell Mar 31, 2021

Member

do you know what it does? I don't!

fake_tb = namedtuple(
'fake_tb', ('tb_frame', 'tb_lasti', 'tb_lineno', 'tb_next')
)
result = fake_tb(tb.tb_frame, tb.tb_lasti, tb.tb_lineno, tb.tb_next)
frame = tb.tb_frame.f_back
while frame:
result = fake_tb(frame, frame.f_lasti, frame.f_lineno, result)
frame = frame.f_back
return result


def _get_nrniv_procs_running():
"""return a list of nrniv processes running
Returns
----------
process_list : list of psutil.Process objects
List containing processes matching name 'nrniv'
"""
process_list = []
name = 'nrniv'
for p in process_iter(attrs=["name", "exe", "cmdline"]):
if name == p.info['name'] or \
p.info['exe'] and os.path.basename(p.info['exe']) == name or \
p.info['cmdline'] and p.info['cmdline'][0] == name:
process_list.append(p)
return process_list


def _kill_and_check_nrniv_procs():
"""Kill all running processes named nrniv"""
procs = _get_nrniv_procs_running()
if len(procs) > 0:
running = _kill_list_of_procs(procs)
if len(running) > 0:
pids = [str(proc.pid) for proc in running]
print("ERROR: failed to kill nrniv process(es) %s" %
','.join(pids))


def simulate(params, n_procs=None):
def simulate(net):
"""Start the simulation with hnn_core.simulate
Parameters
----------
params : dict
The parameters
n_procs : int | None
The number of MPI processes requested by the user. If None, then will
attempt to detect number of cores (including hyperthreads) and start
parallel simulation over all of them.
net : Network object
The constructed Network object from hnn-core
"""

# create the network from the parameter file. note, NEURON objects haven't
# been created yet
net = Network(params, add_drives_from_params=True)

sim_data = {}
# run the simulation with MPIBackend for faster completion time
with MPIBackend(n_procs=n_procs, mpi_cmd='mpiexec'):
record_vsoma = bool(params['record_vsoma'])
sim_data['raw_dpls'] = simulate_dipole(net, params['N_trials'],
postproc=False,
record_vsoma=record_vsoma)
record_vsoma = bool(net.params['record_vsoma'])
sim_data['raw_dpls'] = simulate_dipole(net, net.params['N_trials'],
postproc=False,
record_vsoma=record_vsoma)

# hnn-core changes this to bool, change back to int
if isinstance(params['record_vsoma'], bool):
params['record_vsoma'] = int(params['record_vsoma'])
if isinstance(net.params['record_vsoma'], bool):
net.params['record_vsoma'] = int(record_vsoma)
sim_data['gid_ranges'] = net.gid_ranges
sim_data['spikes'] = net.cell_response
sim_data['vsoma'] = net.cell_response.vsoma
Expand Down Expand Up @@ -210,16 +153,23 @@ class SimThread(QtCore.QThread):
done_signal : TextSignal object
Signal to be emitted at completion of a simulation. Not emitted for
when running an optimization simulation.
killed_lock : threading.Lock
Lock to protect killed variable mutual exclusion
backend : MPIBackend
The hnn-core backend responsible for running simulations
"""

def __init__(self, ncore, params, result_callback, mainwin):
QtCore.QThread.__init__(self)
sys.excepthook = self._excepthook
self.ncore = ncore
self.params = params
self.mainwin = mainwin
self.is_optimization = self.mainwin.is_optimization
self.baseparamwin = self.mainwin.baseparamwin
self.killed = False
self.backend = None
self.killed_lock = Lock()

self.paramfn = os.path.join(get_output_dir(), 'param',
self.params['sim_prefix'] + '.param')
Expand All @@ -236,6 +186,10 @@ def __init__(self, ncore, params, result_callback, mainwin):
self.done_signal = TextSignal()
self.done_signal.tsig.connect(self.mainwin.done)

def _excepthook(self, exc_type, exc_value, exc_tb):
enriched_tb = _add_missing_frames(exc_tb) if exc_tb else exc_tb
traceback.print_exception(exc_type, exc_value, enriched_tb)

def _updatewaitsimwin(self, txt):
"""Used to write messages to simulation window"""
self.msg_signal.tsig.emit(txt)
Expand All @@ -257,8 +211,11 @@ def flush(self):

def stop(self):
"""Terminate running simulation"""
_kill_and_check_nrniv_procs()
self.killed = True
with self.killed_lock:
self.killed = True

if self.backend is not None:
self.backend.terminate()

def run(self, sim_length=None):
"""Start simulation
Expand All @@ -272,13 +229,7 @@ def run(self, sim_length=None):

msg = ''
banner = not self.is_optimization
try:
self._run(banner=banner, sim_length=sim_length) # run simulation
# update params in all windows (optimization)
except RuntimeError as e:
msg = str(e)
self.done_signal.tsig.emit(msg)
return
self._run(banner=banner, sim_length=sim_length) # run simulation

if not self.is_optimization:
self.param_signal.psig.emit(self.params)
Expand All @@ -288,7 +239,8 @@ def run(self, sim_length=None):
self.quit()

def _run(self, banner=True, sim_length=None):
self.killed = False
with self.killed_lock:
self.killed = False

sim_params = hnn_core_compat_params(self.params)
if sim_length is not None:
Expand All @@ -301,27 +253,35 @@ def _run(self, banner=True, sim_length=None):
try:
sim_log = self._log_sim_status(parent=self)
with redirect_stdout(sim_log):
sim_data = simulate(sim_params, self.ncore)
# create the network from the parameter file
# Note: NEURON objects haven't been created yet
net = Network(sim_params, add_drives_from_params=True)
with MPIBackend(
n_procs=self.ncore, mpi_cmd='mpiexec') as backend:
self.backend = backend
with self.killed_lock:
if self.killed:
raise RuntimeError("Terminated")
sim_data = simulate(net)
self.backend = None
break
except RuntimeError as e:
if self.ncore == 1:
# can't reduce ncore any more
print(str(e))
self._updatewaitsimwin(str(e))
_kill_and_check_nrniv_procs()
raise RuntimeError("Simulation failed to start")

# check if proc was killed before retrying with fewer cores
if self.killed:
# exit using RuntimeError
raise RuntimeError("Terminated")
with self.killed_lock:
if self.killed:
raise RuntimeError("Terminated")

self.ncore = ceil(self.ncore / 2)
txt = "INFO: Failed starting simulation, retrying with %d cores" \
% self.ncore
print(txt)
self._updatewaitsimwin(txt)
_kill_and_check_nrniv_procs()

# put sim_data into the val attribute of a ResultObj
self.result_signal.sig.emit(ResultObj(sim_data, self.params))
Expand Down Expand Up @@ -468,19 +428,15 @@ def __init__(self, ncore, params, num_steps, seed, sim_data,

def run(self):
msg = ''
try:
self._run() # run optimization
except RuntimeError as e:
msg = str(e)

self._run() # run optimization
self.done_signal.tsig.emit(msg)

def stop(self):
"""Terminate running simulation"""
self.sim_thread.stop()
self.sim_thread.terminate()
self.sim_thread.wait()
self.killed = True
with self.killed_lock:
self.killed = True
self.sim_thread.stop()

self.done_signal.tsig.emit("Optimization terminated")

def _run(self):
Expand Down Expand Up @@ -585,16 +541,12 @@ def _run(self):
self.result_callback,
mainwin=self.mainwin)
self.sim_running = True
try:
self.sim_thread.run()
self.sim_thread.wait()
self.sim_thread.run()
self.sim_thread.wait()
with self.killed_lock:
if self.killed:
self.quit()
self.sim_running = False
except Exception:
traceback.print_exc()
raise RuntimeError("Failed to run final simulation. "
"See previous traceback.")
self.sim_running = False

def _get_initial_data(self):
"""Run an initial simulation if necessary"""
Expand All @@ -617,16 +569,12 @@ def _get_initial_data(self):
self.result_callback,
mainwin=self.mainwin)
self.sim_running = True
try:
self.sim_thread.run()
self.sim_thread.wait()
self.sim_thread.run()
self.sim_thread.wait()

This comment has been minimized.

Copy link
@ntolley

ntolley Mar 31, 2021

Contributor

Is there a timeout for this wait or does it run until some signal is received?

This comment has been minimized.

Copy link
@blakecaldwell

blakecaldwell Mar 31, 2021

Member

It will run until the simulation completes. It is a bit problematic because the user cannot press 'stop simulation' at this point, but I don't have a better alternative because how would you choose a useful timeout value without knowing in advance how long the simulation runs for?

with self.killed_lock:
if self.killed:
self.quit()
self.sim_running = False
except Exception:
traceback.print_exc()
raise RuntimeError("Failed to run initial simulation. "
"See previous traceback.")
self.sim_running = False

# results are in self.sim_data now

Expand Down Expand Up @@ -686,17 +634,13 @@ def _opt_sim(self, new_params, grad=0):
mainwin=self.mainwin)

self.sim_running = True
try:
# may not need to run the entire simulation
self.sim_thread.run(sim_length=self.opt_end)
self.sim_thread.wait()
# may not need to run the entire simulation
self.sim_thread.run(sim_length=self.opt_end)
self.sim_thread.wait()
with self.killed_lock:
if self.killed:
self.quit()
self.sim_running = False
except Exception:
traceback.print_exc()
raise RuntimeError("Failed to run simulation. "
"See previous traceback.")
self.sim_running = False

# calculate wRMSE for all steps
err_queue = Queue()
Expand Down

0 comments on commit afae29d

Please sign in to comment.