Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON output for hq job list and hq submit #24

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 36 additions & 35 deletions aiida_hyperqueue/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
###########################################################################
"""Plugin for the HyperQueue meta scheduler."""

import re
import json
import typing as t

from aiida.common.extendeddicts import AttributeDict
Expand Down Expand Up @@ -148,7 +148,7 @@ def _get_submit_command(self, submit_script: str) -> str:
submit_script: the path of the submit script relative to the working
directory.
"""
submit_command = f"hq submit {submit_script}"
submit_command = f"hq submit {submit_script} --output-mode=json"

self.logger.info(f"Submitting with: {submit_command}")

Expand Down Expand Up @@ -178,23 +178,18 @@ def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str:
f"in _parse_submit_output{transport_string}: there was some text in stderr: {stderr}"
)

job_id_pattern = re.compile(
r"Job\ssubmitted\ssuccessfully,\sjob\sID:\s(?P<jobid>\d+)"
)

for line in stdout.split("\n"):
match = job_id_pattern.match(line.strip())
if match:
return match.group("jobid")

# If no valid line is found, log and raise an error
self.logger.error(
f"in _parse_submit_output{transport_string}: unable to find the job id: {stdout}"
)
raise SchedulerError(
"Error during submission, could not retrieve the jobID from "
"hq submit output; see log for more info."
)
hq_job_dict = json.loads(stdout)
try:
return str(hq_job_dict["id"])
except KeyError:
# If no valid line is found, log and raise an error
self.logger.error(
f"in _parse_submit_output{transport_string}: unable to find the job id: {stdout}"
)
raise SchedulerError(
"Error during submission, could not retrieve the jobID from "
"hq submit output; see log for more info."
)

def _get_joblist_command(
self, jobs: t.Optional[list] = None, user: t.Optional[str] = None
Expand All @@ -205,7 +200,7 @@ def _get_joblist_command(
These could in principle be passed to the ``hq job`` command, but this has an entirely different format.
"""

return "hq job list --filter waiting,running"
return "hq job list --filter waiting,running --output-mode=json"

def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list:
"""Parse the stdout for the joblist command.
Expand All @@ -222,24 +217,30 @@ def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list:

if stderr.strip():
self.logger.warning(
f"hq job list returned exit code 0 (_parse_joblist_output function) but non-empty stderr='{stderr.strip()}'"
f"`hq job list` returned exit code 0 (_parse_joblist_output function) but non-empty stderr='{stderr.strip()}'"
)

job_info_pattern = re.compile(
r"\|\s+(?P<id>[\d]+)\s\|\s+(?P<name>[^|]+)\s+\|\s(?P<state>[\w]+)\s+\|\s(?P<tasks>[\d]+)\s+\|"
)
job_info_list = []
# convert hq returned job list to job info list
# HQ support 1 hq job with multiple tasks.
# Since the way aiida-hq using hq is 1-1 match between hq job and hq task, we only parse 1 task as aiida job.
hq_job_info_list = json.loads(stdout)

for line in stdout.split("\n"):
match = job_info_pattern.match(line)
if match:
job_dict = match.groupdict()
job_info = JobInfo()
job_info.job_id = job_dict["id"]
job_info.title = job_dict["name"]
job_info.job_state = _MAP_STATUS_HYPERQUEUE[job_dict["state"].upper()]
# TODO: In principle more detailed information can be parsed for each job by `hq job info`, such as cpu, wall_time etc.
job_info_list.append(job_info)
job_info_list = []
for hq_job_dict in hq_job_info_list:
job_info = JobInfo()
job_info.job_id = hq_job_dict["id"]
job_info.title = hq_job_dict["name"]
stats: t.List[str] = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly out of curiosity, why do you explicitly add a type hint here? I'm mainly thinking about consistency, as I didn't see it anywhere else in the plugin apart from the function definitions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make my IDE happy, I am using neovim with ruff check, which is quite strict on typing. The type of elements in the list is deducted from the key of hq_job_dict["task_stats"].
By tell my LSP the type of element explicitly, stats[0] knows it is a str and help me to hint upper() is a valid method to call.

stat for stat, v in hq_job_dict["task_stats"].items() if v > 0
]
if hq_job_dict["task_count"] != 1 or len(stats) != 1:
self.logger.error("not able to parse hq job with multiple tasks.")
else:
job_info.job_state = _MAP_STATUS_HYPERQUEUE[stats[0].upper()]

job_info_list.append(job_info)

# TODO: In principle more detailed information can be parsed for each job by `hq job info`, such as cpu, wall_time etc.

return job_info_list

Expand Down
6 changes: 4 additions & 2 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_submit_command():
"""Test submit command"""
scheduler = HyperQueueScheduler()

assert scheduler._get_submit_command("job.sh") == "hq submit job.sh"
assert "hq submit job.sh" in scheduler._get_submit_command("job.sh")


def test_parse_submit_command_output(hq_env: HqEnv, valid_submit_script):
Expand All @@ -79,7 +79,9 @@ def test_parse_submit_command_output(hq_env: HqEnv, valid_submit_script):
Path("_aiidasubmit.sh").write_text(valid_submit_script)

process = hq_env.command(
["submit", "_aiidasubmit.sh"], wait=False, ignore_stderr=True
["submit", "--output-mode=json", "_aiidasubmit.sh"],
wait=False,
ignore_stderr=True,
)
stdout = process.communicate()[0].decode()
stderr = ""
Expand Down
Loading