import submitit
def add(a, b):
return a + b
# the AutoExecutor class is your interface for submitting function to a cluster or run them locally.
# The specified folder is used to dump job information, logs and result when finished
# %j is replaced by the job id at runtime
log_folder = "log_test/%j"
executor = submitit.AutoExecutor(folder=log_folder)
# The AutoExecutor provides a simple abstraction over SLURM to simplify switching between local and slurm jobs (or other clusters if plugins are available).
# specify sbatch parameters (here it will timeout after 4min, and run on dev)
# This is where you would specify `gpus_per_node=1` for instance
# Cluster specific options must be appended by the cluster name:
# Eg.: slurm partition can be specified using `slurm_partition` argument. It
# will be ignored on other clusters:
executor.update_parameters(timeout_min=4, slurm_partition="dev")
# The submission interface is identical to concurrent.futures.Executor
job = executor.submit(add, 5, 7) # will compute add(5, 7)
print(job.job_id) # ID of your job
output = job.result() # waits for the submitted function to complete and returns its output
# if ever the job failed, job.result() will raise an error with the corresponding trace
assert output == 12 # 5 + 7 = 12... your addition was computed in the cluster
submitit
supports the submission of Slurm job arrays through the executor.map_array
method.
If you want to submit many jobs at once, this is the preferred way to go because:
- it can submit all jobs in only 1 call to slurm (avoids flooding it).
- it is faster than submitting all jobs independently.
- it lets you define a cap on how many jobs can run in parallel at any given time, so you can send thousands of jobs without breaking the scheduler, as long as you leave a reasonable value for this parallelism.
Here is an example on how to submit 4 additions at once, with at most 2 jobs running in parallel at any given time:
a = [1, 2, 3, 4]
b = [10, 20, 30, 40]
executor = submitit.AutoExecutor(folder=log_folder)
# the following line tells the scheduler to only run\
# at most 2 jobs at once. By default, this is several hundreds
executor.update_parameters(slurm_array_parallelism=2)
jobs = executor.map_array(add, a, b) # just a list of jobs
In comparison to standard jobs, job arrays have IDs like formatted as <array job id>_<array task id>
(Eg: 17390420_15
) where the job id is
common to all the submitted jobs, and the task id goes from 0 to the N - 1
where N
is the number of submitted jobs.
Note: map_array
has no equivalent in concurent.futures
(map
is similar but has a different return type)
Warning: when running map_array
, submitit
will create one pickle per job.
If you have big object in your functions (like a full pytorch model) you should serialize it once
and only pass its path to the submitted function.
If you submit multiple jobs through a for
loop like this one:
jobs = []
for arg in whatever:
job = executor.submit(myfunc, arg)
jobs.append(job)
You can easily update it to batch the jobs into one array with exactly one extra line, by adding a batch context manager:
jobs = []
with executor.batch():
for arg in whatever:
job = executor.submit(myfunc, arg)
jobs.append(job)
This way, adding the with
context to any existing code will convert it to an array submission,
the submission being triggered when leaving the context.
This allows to submit job arrays when the functions need many arguments and keywords arguments.
Disclaimers:
- within the context, you won't be allowed to interact with the jobs methods and attributes (nor even print it)! This is because the jobs are only submitted when leaving the context: inside the context, the jobs are like empty shells. You can however store the jobs in a list for instance, and access their attributes and methods after leaving the batch context.
- within the context, you can't update the executor parameters either (since all jobs must be submitted with the same settings)
- any error within the context will just cancel the whole submission.
- this option is still experimental and may undergo some changes in the future.
You can submit several jobs in parallel, and check their completion with the done
method:
import submitit
import time
executor = submitit.AutoExecutor(folder="log_test")
executor.update_parameters(timeout_min=1, slurm_partition="dev")
jobs = [executor.submit(time.sleep, k) for k in range(1, 11)]
# wait and check how many have finished
time.sleep(5)
num_finished = sum(job.done() for job in jobs)
print(num_finished) # probably around 2 have finished, given the overhead
# then you may want to wait until all jobs are completed:
outputs = [job.result() for job in jobs]
Notice that this is straightforward to convert to multi-threading:
import time
from concurrent import futures
with futures.ThreadPoolExecutor(max_workers=10) as executor: # This is the only real difference
jobs = [executor.submit(time.sleep, k) for k in range(1, 11)]
time.sleep(5)
print(sum(job.done() for job in jobs)) # around 4 or 5 should be over
[job.result() for job in jobs]
assert sum(job.done() for job in jobs) == 10 # all done
You can also use the asyncio coroutines if you want
import asyncio
import random
import submitit
import time
def slow_multiplication(x, y):
time.sleep(x*y)
return x*y
executor = submitit.AutoExecutor(folder="log_test")
executor.update_parameters(timeout_min=1, slurm_partition="dev")
# await a single result
job = executor.submit(slow_multiplication, 10, 2)
await job.awaitable().result()
# print results as they become available
jobs = [executor.submit(slow_multiplication, k, random.randint(1, 4)) for k in range(1, 5)]
for aws in asyncio.as_completed([j.awaitable().result() for j in jobs]):
result = await aws
print(result)
Note that you can also use submitit.helpers.as_completed
if you don't want to use coroutines
Errors are caught and their stacktrace is recorded. When calling job.result()
, a FailedJobError
is raised with the available information:
import submitit
from operator import truediv
executor = submitit.AutoExecutor(folder="log_test")
executor.update_parameters(timeout_min=1, slurm_partition="dev")
job = executor.submit(truediv, 1, 0)
job.result() # will raise a FailedJobError stating the ZeroDivisionError with its stacktrace
full_stderr = job.stderr() # recover the full stack trace if need be
# the stderr log is written in file job.get_logs_path("stderr")
You should preferably submit pure Python function whenever you can. This would probably save you a lot of hassle.
Still, this is not always feasible. The class submitit.helpers.CommandFunction
can help you in this case. It runs a
command in a subprocess and returns its stdout. It's main benefit is to be able to deal with errors and provide explicit errors.
(Note: CommandFunction
runs locally, so you still need to submit it with an Executor
if you want to run it on slurm, see "Understanding the environment" below).
Note however that, because we use subprocess
with shell=False
under the hood, the command must be provided as a list and not a string.
By default, the function hides stdout and returns it at the end:
import submitit
function = submitit.helpers.CommandFunction(["which", "python"]) # commands must be provided as a list!
print(function()) # This returns your python path (which you be inside your virtualenv)
# for me: /private/home/jrapin/.conda/envs/dfconda/bin/python
Some useful parameters of the CommandFunction
class:
cwd
: to choose from which directory the command is run.env
: to provide specific environment variables.verbose
: set toFalse
if you do not want any logging.
As an experimental feature, you can also provide arguments when calling the instance:
print(submitit.helpers.CommandFunction(["which"])("pip")) # will run "which pip"
Understanding the environment - Make sure you have everything you need installed in your conda environment. Indeed, for its computation, Slurm uses the active conda environment to submit your job:
import submitit
function = submitit.helpers.CommandFunction(["which", "python"])
executor = submitit.AutoExecutor(folder="log_test")
executor.update_parameters(timeout_min=1, slurm_partition="dev")
job = executor.submit(function)
# The returned python path is the one used in slurm.
# It should be the same as when running out of slurm!
# This means that everything that is installed in your
# conda environment should work just as well in the cluster
print(job.result())
submitit
support multi-tasks jobs (on one or several nodes).
You just need to use the tasks_per_node
and nodes
parameters.
import submitit
from operator import add
executor = submitit.AutoExecutor(folder="log_test")
# 3 * 2 = 6 tasks
executor.update_parameters(tasks_per_node=3, nodes=2, timeout_min=1, slurm_partition="dev")
job = executor.submit(add, 5, 7) # will compute add(5, 7)
print(job.result()) # return [12, 12, 12, 12, 12, 12]
The same method will be executed in each task. The typical usage is to use the task rank inside your submitted Callable to chunk the inputs, and attribute one chunk to each task.
We provide a JobEnvironment
class, that gives access to this information (in a cluster-agnostic way).
import submitit
from math import ceil
def my_func(inputs):
job_env = submitit.JobEnvironment()
print(f"There are {job_env.num_tasks} in this job")
print(f"I'm the task #{job_env.local_rank} on the node {job_env.node}")
print(f"I'm the task #{job_env.global_rank} in the job")
num_items_per_task = int(ceil(len(inputs) / job_env.num_tasks))
r = job_env.local_rank
task_chunk = inputs[r * num_items_per_task: (r + 1) * num_items_per_task]
return process(task_chunk) # process only this chunk.
You can use the task
method of a Job
instance to access task specific information. A task is also a Job, so the Job's methods are available.
import submitit
from operator import add
executor = submitit.AutoExecutor(folder="log_test")
# 3 * 2 = 6 tasks
executor.update_parameters(tasks_per_node=3, nodes=2, timeout_min=1, slurm_partition="dev")
job = executor.submit(add, 5, 7) # will compute add(5, 7)
print(job.task(2).result()) # Wait for task #2 result
print(job.task(2).stdout()) # Show task # stdout
print(job.result()) # Wait for all tasks and returns a list of results.
print(job.stdout()) # Concatenated stdout of all tasks
Call the export()
method of the submitit.helpers.TorchDistributedEnvironment
class to setup all the required environment variables for PyTorch distributed with the env://
initialization method. See this code example.
TODO: share more examples, eg grid search over CIFAR-10