Skip to content

Commit

Permalink
Scheduler: abstract generation of submit script env variables
Browse files Browse the repository at this point in the history
The environment variables defined in the `JobTemplate` need to be
written to the submit script header. The `Scheduler.get_submit_script`
relied on the `__get_submit_script_header` abstract method to do this.
This forces each plugin to write this code, even though this is very
likely to be scheduler independent.

Therefore it is best to abstract this functionality to the base class
such that each scheduler plugin automatically has this implemented. If
really needed, the plugin can still override the behavior by explicitly
reimplementing the `_get_submit_script_environment_variables` method.

Note that it looks that `_get_submit_script_environment_variables` could
be a `staticmethod`, but unfortunately it is not possible to call the
super when overriding the staticmethod in a subclass.
  • Loading branch information
sphuber committed Dec 17, 2021
1 parent 7fad822 commit 6d9199e
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 105 deletions.
41 changes: 15 additions & 26 deletions aiida/schedulers/plugins/direct.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,32 +150,6 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

env_lines = []

if job_tmpl.job_resource and job_tmpl.job_resource.num_cores_per_mpiproc:
# since this was introduced after the environment injection below,
# it is intentionally put before it to avoid breaking current users script by overruling
# any explicit OMP_NUM_THREADS they may have set in their job_environment
env_lines.append(f'export OMP_NUM_THREADS={job_tmpl.job_resource.num_cores_per_mpiproc}')

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.
if job_tmpl.job_environment:
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
env_lines.append(f'export {key.strip()}={escape_for_bash(value)}')

if env_lines:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
lines += env_lines
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

if job_tmpl.rerunnable:
self.logger.warning(
"The 'rerunnable' option is set to 'True', but has no effect when using the direct scheduler."
Expand All @@ -199,6 +173,21 @@ def _get_submit_script_header(self, job_tmpl):

return '\n'.join(lines)

def _get_submit_script_environment_variables(self, template):
"""Return the part of the submit script header that defines environment variables.
:parameter template: a `aiida.schedulers.datastrutures.JobTemplate` instance.
:return: string containing environment variable declarations.
"""
result = super()._get_submit_script_environment_variables(template)

if template.job_resource and template.job_resource.num_cores_per_mpiproc:
# This should be prepended to the environment variables from the template, such that it does not overrule
# any explicit OMP_NUM_THREADS that may have been defined in the ``template.job_environment``.
result = f'export OMP_NUM_THREADS={template.job_resource.num_cores_per_mpiproc}\n{result}'

return result

def _get_submit_command(self, submit_script):
"""
Return the string to execute to submit a given script.
Expand Down
19 changes: 0 additions & 19 deletions aiida/schedulers/plugins/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,6 @@ def _get_submit_script_header(self, job_tmpl):
import re
import string

empty_line = ''

lines = []
if job_tmpl.submit_as_hold:
lines.append('#BSUB -H')
Expand Down Expand Up @@ -434,23 +432,6 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# hand.
if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

lines.append(empty_line)

# The following seems to be the only way to copy the input files
# to the node where the computation are actually launched (the
# -f option of bsub that does not always work...)
Expand Down
16 changes: 0 additions & 16 deletions aiida/schedulers/plugins/pbsbaseclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,22 +297,6 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.

if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

# Required to change directory to the working directory, that is
# the one from which the job was submitted
lines.append('cd "$PBS_O_WORKDIR"')
Expand Down
18 changes: 0 additions & 18 deletions aiida/schedulers/plugins/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ def _get_submit_script_header(self, job_tmpl):
import re
import string

empty_line = ''

lines = []

# SGE provides flags for wd and cwd
Expand Down Expand Up @@ -267,22 +265,6 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# TAKEN FROM PBSPRO:
# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.
if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

return '\n'.join(lines)

def _get_submit_command(self, submit_script):
Expand Down
21 changes: 0 additions & 21 deletions aiida/schedulers/plugins/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"""
import re

from aiida.common.escaping import escape_for_bash
from aiida.common.lang import type_check
from aiida.schedulers import Scheduler, SchedulerError
from aiida.schedulers.datastructures import JobInfo, JobState, NodeNumberJobResource
Expand Down Expand Up @@ -263,8 +262,6 @@ def _get_submit_script_header(self, job_tmpl):
# pylint: disable=too-many-statements,too-many-branches
import string

empty_line = ''

lines = []
if job_tmpl.submit_as_hold:
lines.append('#SBATCH -H')
Expand Down Expand Up @@ -398,24 +395,6 @@ def _get_submit_script_header(self, job_tmpl):
if job_tmpl.custom_scheduler_commands:
lines.append(job_tmpl.custom_scheduler_commands)

# Job environment variables are to be set on one single line.
# This is a tough job due to the escaping of commas, etc.
# moreover, I am having issues making it work.
# Therefore, I assume that this is bash and export variables by
# and.

if job_tmpl.job_environment:
lines.append(empty_line)
lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
if not isinstance(job_tmpl.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')
for key, value in job_tmpl.job_environment.items():
lines.append(f'export {key.strip()}={escape_for_bash(value)}')
lines.append('# ENVIRONMENT VARIABLES END ###')
lines.append(empty_line)

lines.append(empty_line)

return '\n'.join(lines)

def _get_submit_command(self, submit_script):
Expand Down
22 changes: 22 additions & 0 deletions aiida/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ def get_submit_script(self, job_tmpl):
script_lines.append(self._get_submit_script_header(job_tmpl))
script_lines.append(empty_line)

environment_variables = self._get_submit_script_environment_variables(job_tmpl)
if environment_variables:
script_lines.append('# ENVIRONMENT VARIABLES BEGIN ###')
script_lines.append(environment_variables)
script_lines.append('# ENVIRONMENT VARIABLES END ###')
script_lines.append(empty_line)

if job_tmpl.prepend_text:
script_lines.append(job_tmpl.prepend_text)
script_lines.append(empty_line)
Expand All @@ -170,6 +177,21 @@ def get_submit_script(self, job_tmpl):

return '\n'.join(script_lines)

def _get_submit_script_environment_variables(self, template): # pylint: disable=no-self-use
"""Return the part of the submit script header that defines environment variables.
:parameter template: a `aiida.schedulers.datastrutures.JobTemplate` instance.
:return: string containing environment variable declarations.
"""
if template.job_environment is None:
return ''

if not isinstance(template.job_environment, dict):
raise ValueError('If you provide job_environment, it must be a dictionary')

lines = [f'export {key.strip()}={escape_for_bash(value)}' for key, value in template.job_environment.items()]
return '\n'.join(lines)

@abc.abstractmethod
def _get_submit_script_header(self, job_tmpl):
"""Return the submit script header, using the parameters from the job template.
Expand Down
13 changes: 8 additions & 5 deletions tests/schedulers/test_sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import logging
import unittest

from aiida.common.datastructures import CodeRunMode
from aiida.schedulers.datastructures import JobState
from aiida.schedulers.plugins.sge import SgeScheduler
from aiida.schedulers.scheduler import SchedulerError, SchedulerParsingError
Expand Down Expand Up @@ -310,6 +311,8 @@ def test_submit_script(self):
sge = SgeScheduler()

job_tmpl = JobTemplate()
job_tmpl.codes_info = []
job_tmpl.codes_run_mode = CodeRunMode.SERIAL
job_tmpl.job_resource = sge.create_job_resource(parallel_env='mpi8', tot_num_mpiprocs=16)
job_tmpl.working_directory = '/home/users/dorigm7s/test'
job_tmpl.submit_as_hold = None
Expand All @@ -325,14 +328,12 @@ def test_submit_script(self):
job_tmpl.max_wallclock_seconds = '3600' # "23:59:59"
job_tmpl.job_environment = {'HOME': '/home/users/dorigm7s/', 'WIENROOT': '$HOME:/WIEN2k'}

submit_script_text = sge._get_submit_script_header(job_tmpl)
submit_script_text = sge.get_submit_script(job_tmpl)

self.assertTrue('#$ -wd /home/users/dorigm7s/test' in submit_script_text)
self.assertTrue('#$ -N BestJobEver' in submit_script_text)
self.assertTrue('#$ -q FavQ.q' in submit_script_text)
self.assertTrue('#$ -l h_rt=01:00:00' in submit_script_text)
# self.assertTrue( 'export HOME=/home/users/dorigm7s/'
# in submit_script_text )
self.assertTrue('# ENVIRONMENT VARIABLES BEGIN ###' in submit_script_text)
self.assertTrue("export HOME='/home/users/dorigm7s/'" in submit_script_text)
self.assertTrue("export WIENROOT='$HOME:/WIEN2k'" in submit_script_text)
Expand All @@ -345,15 +346,17 @@ def test_submit_script_rerunnable(self): # pylint: disable=no-self-use
sge = SgeScheduler()

job_tmpl = JobTemplate()
job_tmpl.codes_info = []
job_tmpl.codes_run_mode = CodeRunMode.SERIAL
job_tmpl.job_resource = sge.create_job_resource(parallel_env='mpi8', tot_num_mpiprocs=16)

job_tmpl.rerunnable = True
submit_script_text = sge._get_submit_script_header(job_tmpl)
submit_script_text = sge.get_submit_script(job_tmpl)
assert '#$ -r yes' in submit_script_text
assert '#$ -r no' not in submit_script_text

job_tmpl.rerunnable = False
submit_script_text = sge._get_submit_script_header(job_tmpl)
submit_script_text = sge.get_submit_script(job_tmpl)
assert '#$ -r yes' not in submit_script_text
assert '#$ -r no' in submit_script_text

Expand Down

0 comments on commit 6d9199e

Please sign in to comment.