Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File staging #450

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3e89c0e
Initial staging commit
hategan Feb 2, 2024
57ea7e7
Staging updates
hategan Feb 24, 2024
e55aedb
Spelling
hategan Feb 24, 2024
1571200
Hmm...
hategan Feb 24, 2024
4f42c97
Print IP address of GitHub CI runner.
hategan Feb 25, 2024
21374af
Need to install curl
hategan Feb 25, 2024
988da9c
Try a different way of connecting to CI VM
hategan Feb 25, 2024
eb19fcf
Second try
hategan Feb 25, 2024
56c4d33
and more
hategan Feb 25, 2024
d668832
Moved name in the proper place
hategan Feb 25, 2024
02576d1
Need something that doesn't block
hategan Feb 25, 2024
6e5eeb4
Dump thread stacks on SIGUSR1 when running tests to troubleshoot
hategan Feb 25, 2024
3fb1035
Disable output capturing temporarily
hategan Feb 25, 2024
2af91b2
More robustness in status updater
hategan Feb 25, 2024
307820f
Some more debugging info when dumping stacks
hategan Feb 25, 2024
3e16808
A bit of cleanup and a warning for the future
hategan Feb 25, 2024
318c112
Removed debugging stuff
hategan Feb 25, 2024
0faccf5
Don't print spurious notifications, since they mostly pop up for canc…
hategan Feb 25, 2024
9478857
Fixed race condition in test.
hategan Feb 25, 2024
7656b1a
formatting
hategan Feb 25, 2024
adc7486
Added some debugging statements.
hategan Feb 25, 2024
1969888
Trying to remove some seemingly spurious options for better portability
hategan Feb 26, 2024
10a1c24
Install netcat to test status updates
hategan Feb 26, 2024
3afb094
There is, of course, more than one type of netcat.
hategan Feb 26, 2024
2b64908
A version that should work with netcat-classic in the flux container.
hategan Feb 26, 2024
c4721b4
Poll the file, too, on flush.
hategan Feb 26, 2024
94a2c0e
Don't bother with netcat, which may not be installed, when we can use
hategan Feb 27, 2024
4e7678c
Simplified the error logic in launcher/batch scripts.
hategan Feb 28, 2024
b438a48
Create test directories in ~/.psij/test, since CNs may not have acces…
hategan Mar 1, 2024
5fd72d7
Also canonicalize job dir before testing cleanup inclusion in it.
hategan Mar 1, 2024
69fa3e0
Merge branch 'main' into file_staging
hategan Mar 8, 2024
30d0d34
Fixed exit code
hategan Mar 9, 2024
2190c87
- renamed internal psij functions to have a `_psij_` prefix in order …
hategan Mar 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
#- uses: mxschmitt/action-tmate@v3
# with:
# detached: true
# limit-access-to-actor: true
- name: Test with pytest
uses: addnab/docker-run-action@v3
with:
Expand All @@ -46,7 +50,7 @@ jobs:
cd /workspace
echo "Running in ${PWD}"
sudo apt update
sudo apt install -y openssh-server openssh-client
sudo apt install -y openssh-server openssh-client netcat-traditional
mkdir -p "$HOME/.ssh"
chmod 0755 "$HOME"
chmod 0700 "$HOME/.ssh"
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ psi_j_python.egg-info/
venv*
.venv*
build/
.packages/
docs/.web-build
web-build/
web-build/
2 changes: 1 addition & 1 deletion src/psij-descriptors/cobalt_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
from psij.descriptor import Descriptor


__PSI_J_EXECUTORS__ = [Descriptor(name="cobalt", nice_name='Cobalt', version=StrictVersion("0.0.1"),
__PSI_J_EXECUTORS__ = [Descriptor(name="cobalt", nice_name='Cobalt', version=StrictVersion("0.2.0"),
cls='psij.executors.batch.cobalt.CobaltJobExecutor')]
8 changes: 4 additions & 4 deletions src/psij-descriptors/core_descriptors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@
from psij.descriptor import Descriptor

__PSI_J_EXECUTORS__ = [
Descriptor(name='local', nice_name='Local', version=StrictVersion('0.0.1'),
Descriptor(name='local', nice_name='Local', version=StrictVersion('0.2.0'),
cls='psij.executors.local.LocalJobExecutor')
]

__PSI_J_LAUNCHERS__ = [
Descriptor(name='single', version=StrictVersion('0.0.1'),
Descriptor(name='single', version=StrictVersion('0.2.0'),
cls='psij.launchers.single.SingleLauncher'),
Descriptor(name='multiple', version=StrictVersion('0.0.1'),
Descriptor(name='multiple', version=StrictVersion('0.2.0'),
cls='psij.launchers.multiple.MultipleLauncher'),
Descriptor(name='mpirun', version=StrictVersion('0.0.1'),
Descriptor(name='mpirun', version=StrictVersion('0.2.0'),
cls='psij.launchers.mpirun.MPILauncher'),
]
2 changes: 1 addition & 1 deletion src/psij-descriptors/lsf_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
from psij.descriptor import Descriptor


__PSI_J_EXECUTORS__ = [Descriptor(name='lsf', nice_name='LSF', version=StrictVersion('0.0.1'),
__PSI_J_EXECUTORS__ = [Descriptor(name='lsf', nice_name='LSF', version=StrictVersion('0.2.0'),
cls='psij.executors.batch.lsf.LsfJobExecutor')]
4 changes: 2 additions & 2 deletions src/psij-descriptors/pbs_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@


__PSI_J_EXECUTORS__ = [Descriptor(name='pbs', nice_name='PBS Pro', aliases=['pbspro'],
version=StrictVersion('0.0.2'),
version=StrictVersion('0.2.0'),
cls='psij.executors.batch.pbs.PBSJobExecutor'),
Descriptor(name='pbs_classic', nice_name='PBS Classic', aliases=['torque'],
version=StrictVersion('0.0.2'),
version=StrictVersion('0.2.0'),
cls='psij.executors.batch.pbs_classic.PBSClassicJobExecutor')]
2 changes: 1 addition & 1 deletion src/psij-descriptors/slurm_descriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
from psij.descriptor import Descriptor


__PSI_J_EXECUTORS__ = [Descriptor(name='slurm', nice_name='Slurm', version=StrictVersion('0.0.1'),
__PSI_J_EXECUTORS__ = [Descriptor(name='slurm', nice_name='Slurm', version=StrictVersion('0.2.0'),
cls='psij.executors.batch.slurm.SlurmJobExecutor')]
121 changes: 85 additions & 36 deletions src/psij/executors/batch/batch_scheduler_executor.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import logging
import os
import weakref

import subprocess
import time
import traceback
from abc import abstractmethod
from datetime import timedelta
from pathlib import Path
from threading import Thread, RLock
from typing import Optional, List, Dict, Collection, cast, Union, IO
from typing import Optional, List, Dict, Collection, cast, Union, IO, Set

from psij.launchers.script_based_launcher import ScriptBasedLauncher

from psij import JobExecutor, JobExecutorConfig, Launcher, Job, SubmitException, \
JobStatus, JobState
from psij.executors.batch.template_function_library import ALL as FUNCTION_LIBRARY
from psij.utils import _StatusUpdater

UNKNOWN_ERROR = 'PSIJ: Unknown error'

Expand Down Expand Up @@ -199,18 +202,14 @@
configuration is used.
"""
super().__init__(url=url, config=config if config else BatchSchedulerExecutorConfig())
self._queue_poll_thread = self._start_queue_poll_thread()
assert config
self.work_directory = config.work_directory / self.name
self._queue_poll_thread = self._start_queue_poll_thread()

def _ensure_work_dir(self) -> None:
self.work_directory.mkdir(parents=True, exist_ok=True)

def submit(self, job: Job) -> None:
"""See :func:`~psij.JobExecutor.submit`."""
logger.info('Job %s: submitting', job.id)
self._ensure_work_dir()

self._check_job(job)

context = self._create_script_context(job)
Expand Down Expand Up @@ -489,7 +488,10 @@
'psij': {
'lib': FUNCTION_LIBRARY,
'launch_command': launch_command,
'script_dir': str(self.work_directory)
'script_dir': str(self.work_directory),
'us_file': self._queue_poll_thread.status_updater.update_file_name,
'us_port': self._queue_poll_thread.status_updater.update_port,
'us_addrs': ', '.join(self._queue_poll_thread.status_updater.ips)
}
}
assert job.spec is not None
Expand Down Expand Up @@ -531,9 +533,11 @@
# is_greater_than returns T/F if the states are comparable and None if not, so
# we have to check explicitly for the boolean value rather than truthiness
return
if status.state.final and job.native_id:
self._clean_submit_script(job)
self._read_aux_files(job, status)
if status.state.final:
self._queue_poll_thread.unregister_job(job)
if job.native_id:
self._clean_submit_script(job)
self._read_aux_files(job, status)
super()._set_job_status(job, status)

def _clean_submit_script(self, job: Job) -> None:
Expand Down Expand Up @@ -573,9 +577,8 @@
# already present
out = self._read_aux_file(job, '.out')
if out:
launcher = self._get_launcher_from_job(job)
if launcher.is_launcher_failure(out):
status.message = launcher.get_launcher_failure_message(out)
if '_PSIJ_SCRIPT_DONE' not in out:
status.message = out

Check warning on line 581 in src/psij/executors/batch/batch_scheduler_executor.py

View check run for this annotation

Codecov / codecov/patch

src/psij/executors/batch/batch_scheduler_executor.py#L581

Added line #L581 was not covered by tests
logger.debug('Output from launcher: %s', status.message)
else:
self._delete_aux_file(job, '.out')
Expand Down Expand Up @@ -638,29 +641,46 @@
super().__init__()
self.name = name
self.daemon = True
self.config = config
self.executor = executor
# We don't at this time cache executor instances. Even if we did, it may be wise
# to shut down queue polling threads when their executors (the only entities that
# use them) are garbage collected. So we wrap the references to the executor and
# config in a weak ref and exit when the ref becomes invalid.
self.config = weakref.ref(config)
self.executor = weakref.ref(executor)
# native_id -> job
self._jobs: Dict[str, List[Job]] = {}
self._jobs: Dict[str, Set[Job]] = {}
# counts consecutive errors while invoking qstat or equivalent
self._poll_error_count = 0
self._jobs_lock = RLock()
self.status_updater = cast(_StatusUpdater, _StatusUpdater.get_instance())
self.active = True

def run(self) -> None:
logger.debug('Executor %s: queue poll thread started', self.executor)
time.sleep(self.config.initial_queue_polling_delay)
while True:
self._poll()
time.sleep(self.config.queue_polling_interval)
try:
time.sleep(self.get_config().initial_queue_polling_delay)
while self.active:
self._poll()
start = time.time()
now = start
while now - start < self.get_config().queue_polling_interval:
time.sleep(1)
now = time.time()
except StopIteration:
logger.info('Thread %s exiting due to executor collection' % self)

def stop(self) -> None:
self.active = False

Check warning on line 673 in src/psij/executors/batch/batch_scheduler_executor.py

View check run for this annotation

Codecov / codecov/patch

src/psij/executors/batch/batch_scheduler_executor.py#L673

Added line #L673 was not covered by tests

def _poll(self) -> None:
executor = self.get_executor()
with self._jobs_lock:
if len(self._jobs) == 0:
return
jobs_copy = dict(self._jobs)
logger.info('Polling for %s jobs', len(jobs_copy))
try:
out = self.executor._run_command(self.executor.get_status_command(jobs_copy.keys()))
out = executor._run_command(executor.get_status_command(jobs_copy.keys()))
except subprocess.CalledProcessError as ex:
out = ex.output
exit_code = ex.returncode
Expand All @@ -674,25 +694,23 @@
self._poll_error_count = 0
logger.debug('Output from status command: %s', out)
try:
status_map = self.executor.parse_status_output(exit_code, out)
status_map = executor.parse_status_output(exit_code, out)
except Exception as ex:
self._handle_poll_error(False,
ex,
f'Failed to poll for job status: {traceback.format_exc()}')
return
try:
for native_id, job_list in jobs_copy.items():
for native_id, job_set in jobs_copy.items():
try:
status = self._get_job_status(native_id, status_map)
except Exception:
status = JobStatus(JobState.FAILED,
message='Failed to update job status: %s' %
traceback.format_exc())
for job in job_list:
self.executor._set_job_status(job, status)
if status.state.final:
with self._jobs_lock:
del self._jobs[native_id]

for job in job_set:
executor._set_job_status(job, status)
except Exception as ex:
msg = traceback.format_exc()
self._handle_poll_error(True, ex, 'Error updating job statuses {}'.format(msg))
Expand All @@ -706,7 +724,7 @@
def _handle_poll_error(self, immediate: bool, ex: Exception, msg: str) -> None:
logger.warning('Polling error: %s', msg)
self._poll_error_count += 1
if immediate or (self._poll_error_count > self.config.queue_polling_error_threshold):
if immediate or (self._poll_error_count > self.get_config().queue_polling_error_threshold):

Check warning on line 727 in src/psij/executors/batch/batch_scheduler_executor.py

View check run for this annotation

Codecov / codecov/patch

src/psij/executors/batch/batch_scheduler_executor.py#L727

Added line #L727 was not covered by tests
self._poll_error_count = 0
# fail all jobs
with self._jobs_lock:
Expand All @@ -718,16 +736,47 @@
assert len(self._jobs) > 0
jobs_copy = dict(self._jobs)
self._jobs.clear()
for job_list in jobs_copy.values():
for job in job_list:
self.executor._set_job_status(job, JobStatus(JobState.FAILED, message=msg))
for job_set in jobs_copy.values():
for job in job_set:
self.unregister_job(job)
self.get_executor()._set_job_status(job, JobStatus(JobState.FAILED,

Check warning on line 742 in src/psij/executors/batch/batch_scheduler_executor.py

View check run for this annotation

Codecov / codecov/patch

src/psij/executors/batch/batch_scheduler_executor.py#L739-L742

Added lines #L739 - L742 were not covered by tests
message=msg))

def register_job(self, job: Job) -> None:
self.status_updater.register_job(job, self.get_executor())
assert job.native_id
logger.info('Job %s: registering', job.id)
with self._jobs_lock:
native_id = job.native_id
if native_id not in self._jobs:
self._jobs[native_id] = [job]
else:
self._jobs[job.native_id].append(job)
try:
self._jobs[native_id].add(job)
except KeyError:
self._jobs[native_id] = {job}

def unregister_job(self, job: Job) -> None:
self.status_updater.unregister_job(job)
assert job.native_id
logger.info('Job %s: unregistering', job.id)
with self._jobs_lock:
native_id = job.native_id
try:
del self._jobs[native_id]
except KeyError:
# If two or more jobs are attached to the same native ID, the
# first one being unregistered would already have removed
# the dict entry
pass

def get_config(self) -> BatchSchedulerExecutorConfig:
config = self.config()
if config:
return config
else:
raise StopIteration()

def get_executor(self) -> BatchSchedulerExecutor:
ex = self.executor()
if ex:
return ex
else:
raise StopIteration()
21 changes: 16 additions & 5 deletions src/psij/executors/batch/cobalt/cobalt.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,27 @@ only results in empty files that are not cleaned up}}
#COBALT -e /dev/null
#COBALT -o /dev/null

{{!like PBS, this is also cheap and there is not need to check setting}}
PSIJ_NODEFILE="$COBALT_NODEFILE"
export PSIJ_NODEFILE

{{!redirect output here instead of through #COBALT directive since COBALT_JOB_ID is not available
when the directives are evaluated; the reason for using the job id in the first place being the
same as for the exit code file.}}
exec &>> "{{psij.script_dir}}/$COBALT_JOBID.out"


{{> batch_lib}}

{{!like PBS, this is also cheap and there is not need to check setting}}
PSIJ_NODEFILE="$COBALT_NODEFILE"
export PSIJ_NODEFILE

{{> stagein}}
_psij_update_status ACTIVE

{{#psij.launch_command}}{{.}} {{/psij.launch_command}}
_PSIJ_JOB_EC=$?

{{> stageout}}
{{> cleanup}}

{{!we redirect to a file tied to the native ID so that we can reach the file with attach().}}
echo "$?" > "{{psij.script_dir}}/$COBALT_JOBID.ec"
echo $_PSIJ_JOB_EC > "{{psij.script_dir}}/$COBALT_JOBID.ec"
echo "_PSIJ_SCRIPT_DONE"
Loading