diff --git a/hnn_core/parallel_backends.py b/hnn_core/parallel_backends.py index 99318ba79..3f1394efd 100644 --- a/hnn_core/parallel_backends.py +++ b/hnn_core/parallel_backends.py @@ -92,22 +92,54 @@ def _get_mpi_env(): return my_env -def spawn_job(parent_comm, command, n_procs, info=None): - """Spawn child simulation job from an existing MPI communicator""" +def spawn_job(command, obj, n_procs, info=None): + """Spawn child simulation jobs from an existing MPI communicator + + Parameters + ---------- + command : list of str | str + Command to spawn job from new MPI communicator. + obj : object + The object containing network and other simulation-specific information + to broadcast to each child process via MPI. + n_procs : int + The number of MPI processes to parallelize (by cell, see Neuron + documentation) the simulation over. + info: mpi4py.MPI.Info | None + Appliable only when mpi_comm_spawn is True: an mpi4py.MPI.Info instance + that grants the user control over how openMPI configures spawned jobs. + + Returns + ------- + child_data : object + The data returned by the child process. + """ + + from mpi4py import MPI if info is None: - subcomm = parent_comm.Spawn('nrniv', args=command, - maxprocs=n_procs) + intercomm = MPI.COMM_SELF.Spawn('nrniv', args=command, + maxprocs=n_procs) else: - subcomm = parent_comm.Spawn('nrniv', args=command, - info=info, - maxprocs=n_procs) + intercomm = MPI.COMM_SELF.Spawn('nrniv', args=command, + info=info, + maxprocs=n_procs) - return subcomm + # send Network + sim param objects to each child process + intercomm.bcast(obj, root=MPI.ROOT) + + # receive data + child_data = intercomm.recv(source=0) + + # close spawned communicator + intercomm.Disconnect() + + return child_data def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs): - """Run process and communicate with it. + """Run simulation process and communicate with it. + Parameters ---------- command : list of str | str @@ -119,6 +151,7 @@ def run_subprocess(command, obj, timeout, proc_queue=None, *args, **kwargs): The number of seconds to wait for a process without output. *args, **kwargs : arguments Additional arguments to pass to subprocess.Popen. + Returns ------- child_data : object @@ -621,7 +654,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, self.mpi_comm_spawn_info = mpi_comm_spawn_info self._intracomm = None self._selfcomm = None - self._child_intercomm = None + self._intercomm = None n_logical_cores = multiprocessing.cpu_count() if n_procs is None: @@ -641,6 +674,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, hyperthreading = False if _has_mpi4py() and _has_psutil(): + from mpi4py import MPI import psutil n_physical_cores = psutil.cpu_count(logical=False) @@ -659,14 +693,7 @@ def __init__(self, n_procs=None, mpi_cmd='mpiexec', mpi_comm_spawn=False, warn(f'{packages} not installed. Will run on single processor') self.n_procs = 1 - self.command = 'nrniv -python -mpi -nobanner ' + \ - sys.executable + ' ' + \ - os.path.join(os.path.dirname(sys.modules[__name__].__file__), - 'mpi_child.py') - - if self.mpi_comm_spawn and _has_mpi4py(): - from mpi4py import MPI - + if self.mpi_comm_spawn: self._intracomm = MPI.COMM_WORLD self._selfcomm = MPI.COMM_SELF if self._intracomm.Get_rank() != 0: @@ -750,17 +777,19 @@ def simulate(self, net, tstop, dt, n_trials, postproc=False): print("Running %d trials..." % (n_trials)) + sim_objs = [net, tstop, dt, n_trials] + if self.mpi_comm_spawn: - self._child_intercomm = spawn_job( + sim_data = spawn_job( parent_comm=self._selfcomm, command=self.command, + obj=sim_objs, n_procs=self.n_procs, info=self.mpi_comm_spawn_info) - else: env = _get_mpi_env() self.proc, sim_data = run_subprocess( - command=self.command, obj=[net, tstop, dt, n_trials], timeout=30, + command=self.command, obj=sim_objs, timeout=30, proc_queue=self.proc_queue, env=env, cwd=os.getcwd(), universal_newlines=True)