Skip to content

Commit

Permalink
Merge branch 'RESTAPI-1185-fix-sbatch-export-parameter' into 'master'
Browse files Browse the repository at this point in the history
Replace --export-file in sbatch scheduler --export parameter

See merge request firecrest/firecrest!315
  • Loading branch information
Juan Pablo Dorsch committed Aug 22, 2024
2 parents 5d35ee6 + 88b1766 commit e00c3b7
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- SLURM scheduler now uses `--export` option for passing environment variables to a job
- Variable `F7T_REALM_RSA_PUBLIC_KEYS` changed to `F7T_AUTH_PUBLIC_KEYS`
- Variable `F7T_REALM_RSA_TYPE_` changed to `F7T_AUTH_ALGORITHMS`
- Added default values on helm charts
Expand Down
14 changes: 7 additions & 7 deletions src/common/schedulers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ spec = Job(
job_dir,
account,
additional_options=plugin_options,
env_file=env_file
job_env=job_env
)
```

Expand All @@ -189,7 +189,7 @@ def __init__(
dict job_script,
string job_dir,
string account=None,
string env_file=None,
dict job_env=None,
string [] additional_options=None
):
```
Expand All @@ -203,14 +203,14 @@ def __init__(
```
* *job_dir*: string with the path of the job to launch.
* *account*: string with the username that own the job.
* *env_file*: string with the environment variable source (typically a file or a file descriptor)
* *job_env*: dictionary with the environment variable list (example: {"var1": "value1", "var2": "value2"})
* *additional_options*: list of strings to provide more parameters to the scheduler.

#### attributes
* job_dir = job_dir
* job_script = job_script
* account = account
* env_file = env_file
* job_env = job_env
* opts = additional_options if additional_options else []

**********
Expand All @@ -227,7 +227,7 @@ def __init__(
string dependency_id=None,
string account=None,
string constraint=None,
string env_file=None
dict job_env=None
):
```
##### parameters
Expand All @@ -238,7 +238,7 @@ def __init__(
* *dependency_id*: string to specify the dependency rule for another job. A common implementation require that the job pointed by this ID should be successfully completed, before start the current one. Default None, meaning: no dependency.
* *account*: string to specify account used to access resourced within the running job. Default None.
* *constraint*: string type defining node features to assign the job. Default None.
* *env_file*: string type indicating the file containing the environment setup to be included in the Job default None.
* *job_env*: dictionary with a list of variables to include in the environment of the job. Example: {"var1": "value1", "var2": "value2"}. Default None.

**********
## Implementation Class SlurmScheduler
Expand Down Expand Up @@ -271,7 +271,7 @@ An example composing the sbatch command is the following, the attributes of Job
```python
cmd = ["sbatch"]
cmd.append(f"--account='{submission_spec.account}'")
cmd.append(f"--export-file='{submission_spec.env_file}'")
cmd.append(f"--export='{submission_spec.job_env}'")
cmd += [f"--chdir='{submission_spec.job_dir}'"]
cmd += self._opts
cmd += submission_spec.opts
Expand Down
9 changes: 5 additions & 4 deletions src/common/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ class Job:
"""Class with submission specifications
"""

def __init__(self, job_script, job_dir, account=None, env_file=None, additional_options=None):
def __init__(self, job_script, job_dir, account=None, job_env=None,
additional_options=None):
self.job_dir = job_dir
self.job_script = job_script
self.account = account
self.env_file = env_file
self.job_env = job_env
self.opts = additional_options if additional_options else []


Expand All @@ -47,7 +48,7 @@ def __init__(
dependency_id=None,
account=None,
constraint=None,
env_file=None
job_env=None
):
self.name = name
self.time = time
Expand All @@ -56,7 +57,7 @@ def __init__(
self.dependency_id = dependency_id
self.account = account
self.constraint = constraint
self.env_file = env_file
self.job_env = job_env


class JobScheduler(abc.ABC):
Expand Down
12 changes: 10 additions & 2 deletions src/common/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import re
import schedulers
import json


logger = logging.getLogger("compute")
Expand All @@ -25,8 +26,15 @@ def submit(self, submission_spec):
if submission_spec.account:
cmd.append(f"--account='{submission_spec.account}'")

if submission_spec.env_file:
cmd.append(f"--export-file='{submission_spec.env_file}'")
if submission_spec.job_env:
# convert to "key=value,key=value,.." for Slurm
j = json.loads(submission_spec.job_env)
text = ""
for k, v in j.items():
text += f"{k}={v},"
text = text[:-1]
job_env = text
cmd.append(f"--export='{job_env}'")

cmd += [f"--chdir='{submission_spec.job_dir}'"]
cmd += self._opts
Expand Down
76 changes: 21 additions & 55 deletions src/compute/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def submit_job_task(headers, system_name, system_addr, job_file, job_dir, accoun
update_task(task_id, headers, async_task.ERROR, retval["msg"])
return

# save the sbatch file in the target cluster FS
if job_file['content']:
action = f"ID={ID} cat > '{job_dir}/{job_file['filename']}'"
retval = exec_remote_command(headers, system_name, system_addr, action, file_transfer="upload", file_content=job_file['content'], no_home=use_plugin)
Expand All @@ -165,30 +166,13 @@ def submit_job_task(headers, system_name, system_addr, job_file, job_dir, accoun

plugin_options = [SPANK_PLUGIN_OPTION] if use_plugin else None

env_file = None
if job_env:
env_file = f"/dev/shm/firecret.{task_id}.env"
action = f"ID={ID} cat > '{env_file}'"
retval = exec_remote_command(headers, system_name, system_addr, action, file_transfer="upload", file_content=job_env, no_home=use_plugin)
if retval["error"] != 0:
app.logger.error(f"(Error uploading environment file: {retval['msg']}")
update_task(task_id, headers, async_task.ERROR, "Failed to upload enviroment file")
return

spec = Job(job_file['filename'], job_dir, account, additional_options=plugin_options, env_file=env_file)
spec = Job(job_file['filename'], job_dir, account, additional_options=plugin_options, job_env=job_env)
scheduler_command = scheduler.submit(spec)
action=f"ID={ID} {scheduler_command}"
action = f"ID={ID} {scheduler_command}"
app.logger.info(action)

retval = exec_remote_command(headers, system_name, system_addr, action, no_home=use_plugin)

if job_env:
# delete env file, it was read when submitted
action = f"ID={ID} timeout {UTILITIES_TIMEOUT} rm -f -- '{env_file}'"
retval2 = exec_remote_command(headers, system_name, system_addr, action, no_home=use_plugin)
if retval2["error"] != 0:
app.logger.error(f"(Error deleting environment file: {retval2['msg']}")

if retval["error"] != 0:
app.logger.error(f"(Error: {retval['msg']}")
update_task(task_id, headers, async_task.ERROR, retval["msg"])
Expand All @@ -201,10 +185,10 @@ def submit_job_task(headers, system_name, system_addr, job_file, job_dir, accoun

jobid = scheduler.extract_jobid(outlines)

msg = {"result" : "Job submitted", "jobid" : jobid}
msg = {"result": "Job submitted", "jobid": jobid}

# now look for log and err files location
job_extra_info = get_job_files(headers, system_name, system_addr, msg,use_plugin=use_plugin)
job_extra_info = get_job_files(headers, system_name, system_addr, msg, use_plugin=use_plugin)

update_task(task_id, headers, async_task.SUCCESS, job_extra_info, True)

Expand Down Expand Up @@ -296,30 +280,15 @@ def submit_job_path_task(headers, system_name, system_addr, fileName, job_dir, a

ID = headers.get(TRACER_HEADER, '')
plugin_options = [SPANK_PLUGIN_OPTION] if use_plugin else None
env_file = None
if job_env:
env_file = f"/dev/shm/firecret.{task_id}.env"
action = f"ID={ID} cat > '{env_file}'"
retval = exec_remote_command(headers, system_name, system_addr, action, file_transfer="upload", file_content=job_env, no_home=use_plugin)
if retval["error"] != 0:
app.logger.error(f"(Error uploading environment file: {retval['msg']}")
update_task(task_id, headers, async_task.ERROR, "Failed to upload enviroment file")
return

spec = Job(fileName, job_dir, account, additional_options=plugin_options, env_file=env_file)

spec = Job(fileName, job_dir, account, additional_options=plugin_options,
job_env=job_env)
scheduler_command = scheduler.submit(spec)
action=f"ID={ID} {scheduler_command}"
app.logger.info(action)

resp = exec_remote_command(headers, system_name, system_addr, action, no_home=use_plugin)

if job_env:
# delete env file, it was read when submitted
action = f"ID={ID} timeout {UTILITIES_TIMEOUT} rm -f -- '{env_file}'"
retval2 = exec_remote_command(headers, system_name, system_addr, action, no_home=use_plugin)
if retval2["error"] != 0:
app.logger.error(f"(Error deleting environment file: {retval2['msg']}")

# in case of error:
if resp["error"] != 0:
if resp["error"] == -2:
Expand Down Expand Up @@ -448,23 +417,23 @@ def submit_job_upload():
use_plugin = SPANK_PLUGIN_ENABLED[system_idx]
job_env = request.form.get("env", None)
if job_env:
#convert to text for Slurm: key=value ending with null caracter
# check that the env variable can be decoded as JSON
try:
j = json.loads(job_env)
text = ""
for k,v in j.items():
text += f"{k}={v}\0"
job_env = text
json.loads(job_env)
except Exception as e:
app.logger.warning("Invalid JSON provided")
return jsonify(description="Failed to submit job", error='Invalid JSON environment provided'), 404
logger.error(f"Invalid JSON provided ({e}) in job_env " +
f"({job_env})")
return jsonify(description="Failed to submit job",
error=f"Invalid JSON in 'env' variable ({job_env})"), 404

app.logger.info(f"Job dir: {job_dir}")

try:
# asynchronous task creation
aTask = threading.Thread(target=submit_job_task, name=ID,
args=(headers, system_name, system_addr, job_file, job_dir, account, use_plugin, job_env, task_id))
args=(headers, system_name, system_addr,
job_file, job_dir, account, use_plugin,
job_env, task_id))

aTask.start()
retval = update_task(task_id, headers, async_task.QUEUED)
Expand Down Expand Up @@ -543,15 +512,12 @@ def submit_job_path():
job_dir = os.path.dirname(targetPath)
job_env = request.form.get("env", None)
if job_env:
#convert to text for Slurm: key=value ending with null caracter
# check that the env variable can be decoded as JSON
try:
j = json.loads(job_env)
text = ""
for k,v in j.items():
text += f"{k}={v}\0"
job_env = text
json.loads(job_env)
except Exception as e:
app.logger.warning("Invalid JSON provided")
logger.warning(f"Invalid JSON provided ({e}) in job_env " +
f"({job_env})")
return jsonify(description="Failed to submit job", error='Invalid JSON environment provided'), 404

try:
Expand Down
4 changes: 2 additions & 2 deletions src/tests/automated_tests/integration/test_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
JOBS_URL = COMPUTE_URL + "/jobs"
SERVER_COMPUTE = os.environ.get("F7T_SYSTEMS_PUBLIC_NAME").strip('\'"').split(";")[0]

JOB_ENV = json.dumps({'F7T_TEST_JOB_ENV': 'a', 'F7T_TEST_JOB_ENV2': '"b 1"'})
JOB_ENV_OUTPUT = 'a\n"b 1"\n'
JOB_ENV = json.dumps({'F7T_TEST_JOB_ENV': 'a', 'F7T_TEST_JOB_ENV2': 'b 1'})
JOB_ENV_OUTPUT = 'a\nb 1\n'



Expand Down

0 comments on commit e00c3b7

Please sign in to comment.