Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update to run_sorter_jobs() and slurm #3105

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Changes from 6 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
efa0751
added option to pass extra arguments to `sbatch` when using `run_sort…
Jun 29, 2024
4bdf244
added option to pass extra arguments to `sbatch` when using `run_sort…
Jun 29, 2024
2d0a83b
Merge remote-tracking branch 'origin/main'
Jun 29, 2024
ce87e68
mistake in Popen
Jun 29, 2024
ebabfec
Merge remote-tracking branch 'origin/main'
Jun 29, 2024
4e59f23
cleaned up code
MarinManuel Jun 29, 2024
407c488
Merge branch 'SpikeInterface:main' into slurm_updates
MarinManuel Jul 2, 2024
d204642
updated to use sbatch_kwargs instead of putting all slurm arguments d…
Jul 2, 2024
8fedd95
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 2, 2024
0733589
removed sbatch_executable from the list of kwargs
Jul 3, 2024
54a7b8f
Merge remote-tracking branch 'origin/slurm_updates' into slurm_updates
Jul 3, 2024
9c3ff1d
removed sbatch_executable from the list of kwargs
Jul 3, 2024
c86f3b2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jul 3, 2024
97198de
Merge branch 'main' into slurm_updates
MarinManuel Aug 16, 2024
f12461f
clarified docstring and added error for cpus_per_taks
MarinManuel Aug 16, 2024
002e959
added test
MarinManuel Aug 16, 2024
88ca2f1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 16, 2024
e4f9f1f
added test
MarinManuel Aug 16, 2024
e4b0b81
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 16, 2024
492c4d0
Merge branch 'SpikeInterface:main' into slurm_updates
MarinManuel Sep 11, 2024
5b6d560
docstring fix
MarinManuel Sep 11, 2024
35a2a7e
docstring fix
MarinManuel Sep 11, 2024
910fa61
docstring fix
MarinManuel Sep 11, 2024
6ba8423
added slurm_kwargs argument
MarinManuel Sep 11, 2024
0ec9af5
fixed test
MarinManuel Sep 11, 2024
e707170
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 11, 2024
e7a89c9
fixed test failing on Windows by limiting to slurm test to Linux
MarinManuel Sep 12, 2024
48aee1b
fixed test failing on Windows by limiting to slurm test to Linux
MarinManuel Sep 12, 2024
6bc8be2
reverted slurm_kwargs and improved docstring
MarinManuel Sep 12, 2024
d2ac504
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions src/spikeinterface/sorters/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
joblib=dict(n_jobs=-1, backend="loky"),
processpoolexecutor=dict(max_workers=2, mp_context=None),
dask=dict(client=None),
slurm=dict(tmp_script_folder=None, cpus_per_task=1, mem="1G"),
slurm={"tmp_script_folder": None, "sbatch_executable_path": "sbatch", "cpus-per-task": 1, "mem": "1G"},
JoeZiminski marked this conversation as resolved.
Show resolved Hide resolved
)


_implemented_engine = list(_default_engine_kwargs.keys())


def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=False):
def run_sorter_jobs(job_list, engine="loop", engine_kwargs=None, return_output=False):
JoeZiminski marked this conversation as resolved.
Show resolved Hide resolved
"""
Run several :py:func:`run_sorter()` sequentially or in parallel given a list of jobs.

Expand All @@ -55,11 +55,10 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal

Where *blocking* means that this function is blocking until the results are returned.
This is in opposition to *asynchronous*, where the function returns `None` almost immediately (aka non-blocking),
but the results must be retrieved by hand when jobs are finished. No mechanisim is provided here to be know
when jobs are finish.
but the results must be retrieved by hand when jobs are finished. No mechanism is provided here to know
when jobs are finished.
In this *asynchronous* case, the :py:func:`~spikeinterface.sorters.read_sorter_folder()` helps to retrieve individual results.


Parameters
----------
job_list : list of dict
Expand All @@ -68,10 +67,18 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal
The engine to run the list.
* "loop" : a simple loop. This engine is
engine_kwargs : dict

return_output : bool, dfault False
In the case of engine="slum", possible kwargs are:
- tmp_script_folder: str, default None
MarinManuel marked this conversation as resolved.
Show resolved Hide resolved
the folder in which the job scripts are created. Default: directory created by
JoeZiminski marked this conversation as resolved.
Show resolved Hide resolved
the `tempfile` library
- sbatch_executable_path: str, default 'sbatch'
JoeZiminski marked this conversation as resolved.
Show resolved Hide resolved
JoeZiminski marked this conversation as resolved.
Show resolved Hide resolved
the path to the `sbatch` executable
- other kwargs are interpreted as arguments to sbatch, and are translated to the --args to be passed to sbatch.
see the [documentation for `sbatch`](https://slurm.schedmd.com/sbatch.html) for a list of possible arguments

return_output : bool, default False
MarinManuel marked this conversation as resolved.
Show resolved Hide resolved
Return a sortings or None.
This also overwrite kwargs in in run_sorter(with_sorting=True/False)
This also overwrite kwargs in run_sorter(with_sorting=True/False)
MarinManuel marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
Expand All @@ -81,6 +88,8 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal

assert engine in _implemented_engine, f"engine must be in {_implemented_engine}"

if engine_kwargs is None:
engine_kwargs = dict()
engine_kwargs_ = dict()
engine_kwargs_.update(_default_engine_kwargs[engine])
engine_kwargs_.update(engine_kwargs)
Expand Down Expand Up @@ -146,12 +155,18 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal

elif engine == "slurm":
# generate python script for slurm
tmp_script_folder = engine_kwargs["tmp_script_folder"]
tmp_script_folder = engine_kwargs.pop("tmp_script_folder")
JoeZiminski marked this conversation as resolved.
Show resolved Hide resolved
if tmp_script_folder is None:
tmp_script_folder = tempfile.mkdtemp(prefix="spikeinterface_slurm_")
tmp_script_folder = Path(tmp_script_folder)
cpus_per_task = engine_kwargs["cpus_per_task"]
mem = engine_kwargs["mem"]
sbatch_executable = engine_kwargs.pop("sbatch_executable_path")

# for backward compatibility with previous version
if "cpus_per_task" in engine_kwargs:
warnings.warn("cpus_per_task is deprecated, use cpus-per-task instead", DeprecationWarning)
MarinManuel marked this conversation as resolved.
Show resolved Hide resolved
cpus_per_task = engine_kwargs.pop("cpus_per_task")
if "cpus-per-task" not in engine_kwargs:
MarinManuel marked this conversation as resolved.
Show resolved Hide resolved
engine_kwargs["cpus-per-task"] = cpus_per_task

tmp_script_folder.mkdir(exist_ok=True, parents=True)

Expand Down Expand Up @@ -181,7 +196,13 @@ def run_sorter_jobs(job_list, engine="loop", engine_kwargs={}, return_output=Fal
f.write(slurm_script)
os.fchmod(f.fileno(), mode=stat.S_IRWXU)

subprocess.Popen(["sbatch", str(script_name.absolute()), f"-cpus-per-task={cpus_per_task}", f"-mem={mem}"])
progr = [sbatch_executable]
for k, v in engine_kwargs.items():
progr.append(f"--{k}")
progr.append(f"{v}")
progr.append(str(script_name.absolute()))
p = subprocess.run(progr, capture_output=True, text=True)
JoeZiminski marked this conversation as resolved.
Show resolved Hide resolved
print(p.stdout)

return out

Expand Down