Skip to content

Commit

Permalink
work on spawn_job
Browse files Browse the repository at this point in the history
  • Loading branch information
rythorpe committed Jul 17, 2022
1 parent ec728b4 commit 283847d
Showing 1 changed file with 50 additions and 21 deletions.
71 changes: 50 additions & 21 deletions hnn_core/parallel_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,54 @@ def _get_mpi_env():
return my_env


def spawn_job(parent_comm, command, n_procs, info=None):
"""Spawn child simulation job from an existing MPI communicator"""
def spawn_job(command, obj, n_procs, info=None):
"""Spawn child simulation jobs from an existing MPI communicator
Parameters
----------
command : list of str | str
Command to spawn job from new MPI communicator.
obj : object
The object containing network and other simulation-specific information
to broadcast to each child process via MPI.
n_procs : int
The number of MPI processes to parallelize (by cell, see Neuron
documentation) the simulation over.
info: mpi4py.MPI.Info | None
Appliable only when mpi_comm_spawn is True: an mpi4py.MPI.Info instance
that grants the user control over how openMPI configures spawned jobs.
Returns
-------
child_data : object
The data returned by the child process.
"""

from mpi4py import MPI

if info is None:
subcomm = parent_comm.Spawn('nrniv', args=command,
maxprocs=n_procs)
intercomm = MPI.COMM_SELF.Spawn('nrniv', args=command,
maxprocs=n_procs)
else:
subcomm = parent_comm.Spawn('nrniv', args=command,
info=info,
maxprocs=n_procs)
intercomm = MPI.COMM_SELF.Spawn('nrniv', args=command,
info=info,
maxprocs=n_procs)

return subcomm
# send Network + sim param objects to each child process
intercomm.bcast(obj, root=MPI.ROOT)

# receive data
child_data = intercomm.recv(source=0)

# close spawned communicator
intercomm.Disconnect()

return child_data


def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs):
"""Run process and communicate with it.
"""Run simulation process and communicate with it.
Parameters
----------
command : list of str | str
Expand All @@ -119,6 +151,7 @@ def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs):
The number of seconds to wait for a process without output.
*args, **kwargs : arguments
Additional arguments to pass to subprocess.Popen.
Returns
-------
child_data : object
Expand Down Expand Up @@ -621,7 +654,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False,
self.mpi_comm_spawn_info = mpi_comm_spawn_info
self._intracomm = None
self._selfcomm = None
self._child_intercomm = None
self._intercomm = None

n_logical_cores = multiprocessing.cpu_count()
if n_procs is None:
Expand All @@ -641,6 +674,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False,
hyperthreading = False

if _has_mpi4py() and _has_psutil():
from mpi4py import MPI
import psutil

n_physical_cores = psutil.cpu_count(logical=False)
Expand All @@ -659,14 +693,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False,
warn(f'{packages} not installed. Will run on single processor')
self.n_procs = 1

self.command = 'nrniv -python -mpi -nobanner ' + \
sys.executable + ' ' + \
os.path.join(os.path.dirname(sys.modules[__name__].__file__),
'mpi_child.py')

if self.mpi_comm_spawn and _has_mpi4py():
from mpi4py import MPI

if self.mpi_comm_spawn:
self._intracomm = MPI.COMM_WORLD
self._selfcomm = MPI.COMM_SELF
if self._intracomm.Get_rank() != 0:
Expand Down Expand Up @@ -750,17 +777,19 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False):

print("Running %d trials..." % (n_trials))

sim_objs = [net, tstop, dt, n_trials]

if self.mpi_comm_spawn:
self._child_intercomm = spawn_job(
sim_data = spawn_job(
parent_comm=self._selfcomm,
command=self.command,
obj=sim_objs,
n_procs=self.n_procs,
info=self.mpi_comm_spawn_info)

else:
env = _get_mpi_env()
self.proc, sim_data = run_subprocess(
command=self.command, obj=[net, tstop, dt, n_trials], timeout=30,
command=self.command, obj=sim_objs, timeout=30,
proc_queue=self.proc_queue, env=env, cwd=os.getcwd(),
universal_newlines=True)

Expand Down

0 comments on commit 283847d

Please sign in to comment.