Skip to content

Commit

Permalink
Move CalcJob.presubmit call from CalcJob.run to Waiting.execute
Browse files Browse the repository at this point in the history
The `presubmit` calls through to `prepare_for_submission` which is the
place where the `CalcJob` implementation writes the input files based on
the inputs nodes in a temporary sandbox folder. The contents of which
are then stored in the repository of the node before the process
transitions to the waiting state in await of the upload task. This
upload task will then copy over the contents of the node repository to
the working directory on the remote computer.

There is a use case to limit which files are copied to the node's
repository from the sandbox folder used by `prepare_for_submission`.
This means that the sandbox folder will have to be passed to the
`upload_calculation`. However, since the creation of this happens in the
`CalcJob.run` call, which is quite far removed from the eventual use,
this opens it up for a lot of problems. The time between folder
population by `prepare_for_submission` and eventual use in the upload
task can be significant, among other things because the upload task will
have to wait for transport. To make this feasible the creation of the
sandbox folder has to be moved closer to the upload task.

Here we move the `presubmit` call from `CalcJob.run` to the upload
transport task which will create the sandbox folder and pass it into the
`upload_calculation`. This limits the time frame in which there is a
chance for the contents of the sandbox to get lost. In addition, this
now gives additional freedom of deciding which files from the sandbox
are permanently stored in the node's repository for extra provenance.
  • Loading branch information
sphuber committed Dec 14, 2019
1 parent bb82010 commit f405504
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 82 deletions.
14 changes: 5 additions & 9 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
the routines make reference to the suitable plugins for all
plugin-specific operations.
"""

import os


from aiida.common import AIIDA_LOGGER, exceptions
from aiida.common.datastructures import CalcJobState
from aiida.common.folders import SandboxFolder
Expand All @@ -31,14 +29,13 @@
execlogger = AIIDA_LOGGER.getChild('execmanager')


def upload_calculation(node, transport, calc_info, script_filename, inputs=None, dry_run=False):
def upload_calculation(node, transport, calc_info, folder, inputs=None, dry_run=False):
"""Upload a `CalcJob` instance
:param node: the `CalcJobNode`.
:param transport: an already opened transport to use to submit the calculation.
:param calc_info: the calculation info datastructure returned by `CalcJobNode.presubmit`
:param script_filename: the job launch script returned by `CalcJobNode.presubmit`
:return: tuple of ``calc_info`` and ``script_filename``
:param calc_info: the calculation info datastructure returned by `CalcJob.presubmit`
:param folder: temporary local file system folder containing the inputs written by `CalcJob.prepare_for_submission`
"""
from logging import LoggerAdapter
from tempfile import NamedTemporaryFile
Expand All @@ -65,7 +62,8 @@ def upload_calculation(node, transport, calc_info, script_filename, inputs=None,
raise ValueError('Cannot submit calculation {} because it has cached input links! If you just want to test the '
'submission, set `metadata.dry_run` to True in the inputs.'.format(node.pk))

folder = node._raw_input_folder
# After this call, no modifications to the folder should be done
node.put_object_from_tree(folder.abspath, force=True)

# If we are performing a dry-run, the working directory should actually be a local folder that should already exist
if dry_run:
Expand Down Expand Up @@ -251,8 +249,6 @@ def find_data_node(inputs, uuid):
remotedata.add_incoming(node, link_type=LinkType.CREATE, link_label='remote_folder')
remotedata.store()

return calc_info, script_filename


def submit_calculation(calculation, transport, calc_info, script_filename):
"""Submit a previously uploaded `CalcJob` to the scheduler.
Expand Down
117 changes: 50 additions & 67 deletions aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,55 +214,31 @@ def run(self):
This means invoking the `presubmit` and storing the temporary folder in the node's repository. Then we move the
process in the `Wait` state, waiting for the `UPLOAD` transport task to be started.
"""
from aiida.orm import Code, load_node
from aiida.common.folders import SandboxFolder, SubmitTestFolder
from aiida.common.exceptions import InputValidationError

# The following conditional is required for the caching to properly work. Even if the source node has a process
# state of `Finished` the cached process will still enter the running state. The process state will have then
# been overridden by the engine to `Running` so we cannot check that, but if the `exit_status` is anything other
# than `None`, it should mean this node was taken from the cache, so the process should not be rerun.
if self.node.exit_status is not None:
return self.node.exit_status

if self.inputs.metadata.dry_run:
folder_class = SubmitTestFolder
else:
folder_class = SandboxFolder

with folder_class() as folder:
computer = self.node.computer

if not self.inputs.metadata.dry_run and self.node.has_cached_links():
raise exceptions.InvalidOperation('calculation node has unstored links in cache')

calc_info, script_filename = self.presubmit(folder)
calc_info.uuid = str(self.uuid)
input_codes = [load_node(_.code_uuid, sub_classes=(Code,)) for _ in calc_info.codes_info]

for code in input_codes:
if not code.can_run_on(computer):
raise InputValidationError(
'The selected code {} for calculation {} cannot run on computer {}'.format(
code.pk, self.node.pk, computer.name))

# After this call, no modifications to the folder should be done
self.node.put_object_from_tree(folder.abspath, force=True)
from aiida.common.folders import SubmitTestFolder
from aiida.engine.daemon.execmanager import upload_calculation
from aiida.transports.plugins.local import LocalTransport

if self.inputs.metadata.dry_run:
from aiida.engine.daemon.execmanager import upload_calculation
from aiida.transports.plugins.local import LocalTransport
with LocalTransport() as transport:
with LocalTransport() as transport:
with SubmitTestFolder() as folder:
calc_info, script_filename = self.presubmit(folder)
transport.chdir(folder.abspath)
upload_calculation(self.node, transport, calc_info, script_filename, self.inputs, dry_run=True)
upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True)
self.node.dry_run_info = {
'folder': folder.abspath,
'script_filename': script_filename
}
return plumpy.Stop(None, True)
return plumpy.Stop(None, True)

# The following conditional is required for the caching to properly work. Even if the source node has a process
# state of `Finished` the cached process will still enter the running state. The process state will have then
# been overridden by the engine to `Running` so we cannot check that, but if the `exit_status` is anything other
# than `None`, it should mean this node was taken from the cache, so the process should not be rerun.
if self.node.exit_status is not None:
return self.node.exit_status

# Launch the upload operation
return plumpy.Wait(msg='Waiting to upload', data=(UPLOAD_COMMAND, calc_info, script_filename))
return plumpy.Wait(msg='Waiting to upload', data=UPLOAD_COMMAND)

def prepare_for_submission(self, folder):
"""Prepare files for submission of calculation."""
Expand Down Expand Up @@ -304,7 +280,7 @@ def presubmit(self, folder):
# pylint: disable=too-many-locals,too-many-statements,too-many-branches
import os

from aiida.common.exceptions import PluginInternalError, ValidationError
from aiida.common.exceptions import PluginInternalError, ValidationError, InvalidOperation, InputValidationError
from aiida.common import json
from aiida.common.utils import validate_list_of_string_tuples
from aiida.common.datastructures import CodeInfo, CodeRunMode
Expand All @@ -315,16 +291,23 @@ def presubmit(self, folder):
computer = self.node.computer
inputs = self.node.get_incoming(link_type=LinkType.INPUT_CALC)

codes = [_ for _ in inputs.all_nodes() if isinstance(_, Code)]
if not self.inputs.metadata.dry_run and self.node.has_cached_links():
raise InvalidOperation('calculation node has unstored links in cache')

calcinfo = self.prepare_for_submission(folder)
scheduler = computer.get_scheduler()
codes = [_ for _ in inputs.all_nodes() if isinstance(_, Code)]

for code in codes:
if code.is_local():
if code.get_local_executable() in folder.get_content_list():
raise PluginInternalError('The plugin created a file {} that is also '
'the executable name!'.format(code.get_local_executable()))
if not code.can_run_on(computer):
raise InputValidationError('The selected code {} for calculation {} cannot run on computer {}'.format(
code.pk, self.node.pk, computer.name))

if code.is_local() and code.get_local_executable() in folder.get_content_list():
raise PluginInternalError('The plugin created a file {} that is also the executable name!'.format(
code.get_local_executable()))

calc_info = self.prepare_for_submission(folder)
calc_info.uuid = str(self.node.uuid)
scheduler = computer.get_scheduler()

# I create the job template to pass to the scheduler
job_tmpl = JobTemplate()
Expand All @@ -342,16 +325,16 @@ def presubmit(self, folder):
job_tmpl.sched_join_files = False

# Set retrieve path, add also scheduler STDOUT and STDERR
retrieve_list = (calcinfo.retrieve_list if calcinfo.retrieve_list is not None else [])
retrieve_list = (calc_info.retrieve_list if calc_info.retrieve_list is not None else [])
if (job_tmpl.sched_output_path is not None and job_tmpl.sched_output_path not in retrieve_list):
retrieve_list.append(job_tmpl.sched_output_path)
if not job_tmpl.sched_join_files:
if (job_tmpl.sched_error_path is not None and job_tmpl.sched_error_path not in retrieve_list):
retrieve_list.append(job_tmpl.sched_error_path)
self.node.set_retrieve_list(retrieve_list)

retrieve_singlefile_list = (calcinfo.retrieve_singlefile_list
if calcinfo.retrieve_singlefile_list is not None else [])
retrieve_singlefile_list = (calc_info.retrieve_singlefile_list
if calc_info.retrieve_singlefile_list is not None else [])
# a validation on the subclasses of retrieve_singlefile_list
for _, subclassname, _ in retrieve_singlefile_list:
file_sub_class = DataFactory(subclassname)
Expand All @@ -363,8 +346,8 @@ def presubmit(self, folder):
self.node.set_retrieve_singlefile_list(retrieve_singlefile_list)

# Handle the retrieve_temporary_list
retrieve_temporary_list = (calcinfo.retrieve_temporary_list
if calcinfo.retrieve_temporary_list is not None else [])
retrieve_temporary_list = (calc_info.retrieve_temporary_list
if calc_info.retrieve_temporary_list is not None else [])
self.node.set_retrieve_temporary_list(retrieve_temporary_list)

# the if is done so that if the method returns None, this is
Expand All @@ -375,10 +358,10 @@ def presubmit(self, folder):
# an exception
prepend_texts = [computer.get_prepend_text()] + \
[code.get_prepend_text() for code in codes] + \
[calcinfo.prepend_text, self.node.get_option('prepend_text')]
[calc_info.prepend_text, self.node.get_option('prepend_text')]
job_tmpl.prepend_text = '\n\n'.join(prepend_text for prepend_text in prepend_texts if prepend_text)

append_texts = [self.node.get_option('append_text'), calcinfo.append_text] + \
append_texts = [self.node.get_option('append_text'), calc_info.append_text] + \
[code.get_append_text() for code in codes] + \
[computer.get_append_text()]
job_tmpl.append_text = '\n\n'.join(append_text for append_text in append_texts if append_text)
Expand All @@ -398,11 +381,11 @@ def presubmit(self, folder):
extra_mpirun_params = self.node.get_option('mpirun_extra_params') # same for all codes in the same calc

# set the codes_info
if not isinstance(calcinfo.codes_info, (list, tuple)):
if not isinstance(calc_info.codes_info, (list, tuple)):
raise PluginInternalError('codes_info passed to CalcInfo must be a list of CalcInfo objects')

codes_info = []
for code_info in calcinfo.codes_info:
for code_info in calc_info.codes_info:

if not isinstance(code_info, CodeInfo):
raise PluginInternalError('Invalid codes_info, must be a list of CodeInfo objects')
Expand All @@ -415,7 +398,7 @@ def presubmit(self, folder):

this_withmpi = code_info.withmpi # to decide better how to set the default
if this_withmpi is None:
if len(calcinfo.codes_info) > 1:
if len(calc_info.codes_info) > 1:
raise PluginInternalError('For more than one code, it is '
'necessary to set withmpi in '
'codes_info')
Expand All @@ -439,7 +422,7 @@ def presubmit(self, folder):

if len(codes) > 1:
try:
job_tmpl.codes_run_mode = calcinfo.codes_run_mode
job_tmpl.codes_run_mode = calc_info.codes_run_mode
except KeyError:
raise PluginInternalError('Need to set the order of the code execution (parallel or serial?)')
else:
Expand Down Expand Up @@ -482,24 +465,24 @@ def presubmit(self, folder):

subfolder = folder.get_subfolder('.aiida', create=True)
subfolder.create_file_from_filelike(io.StringIO(json.dumps(job_tmpl)), 'job_tmpl.json', 'w', encoding='utf8')
subfolder.create_file_from_filelike(io.StringIO(json.dumps(calcinfo)), 'calcinfo.json', 'w', encoding='utf8')
subfolder.create_file_from_filelike(io.StringIO(json.dumps(calc_info)), 'calcinfo.json', 'w', encoding='utf8')

if calcinfo.local_copy_list is None:
calcinfo.local_copy_list = []
if calc_info.local_copy_list is None:
calc_info.local_copy_list = []

if calcinfo.remote_copy_list is None:
calcinfo.remote_copy_list = []
if calc_info.remote_copy_list is None:
calc_info.remote_copy_list = []

# Some validation
this_pk = self.node.pk if self.node.pk is not None else '[UNSTORED]'
local_copy_list = calcinfo.local_copy_list
local_copy_list = calc_info.local_copy_list
try:
validate_list_of_string_tuples(local_copy_list, tuple_length=3)
except ValidationError as exc:
raise PluginInternalError('[presubmission of calc {}] '
'local_copy_list format problem: {}'.format(this_pk, exc))

remote_copy_list = calcinfo.remote_copy_list
remote_copy_list = calc_info.remote_copy_list
try:
validate_list_of_string_tuples(remote_copy_list, tuple_length=3)
except ValidationError as exc:
Expand All @@ -519,4 +502,4 @@ def presubmit(self, folder):
'The destination path of the remote copy '
'is absolute! ({})'.format(this_pk, dest_rel_path))

return calcinfo, script_filename
return calc_info, script_filename
16 changes: 10 additions & 6 deletions aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from aiida.common.datastructures import CalcJobState
from aiida.common.exceptions import FeatureNotAvailable, TransportTaskException
from aiida.common.folders import SandboxFolder
from aiida.engine.daemon import execmanager
from aiida.engine.utils import exponential_backoff_retry, interruptable_task
from aiida.schedulers.datastructures import JobState
Expand All @@ -36,7 +37,7 @@


@coroutine
def task_upload_job(node, transport_queue, calc_info, script_filename, cancellable):
def task_upload_job(process, transport_queue, cancellable):
"""Transport task that will attempt to upload the files of a job calculation to the remote.
The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
Expand All @@ -46,13 +47,13 @@ def task_upload_job(node, transport_queue, calc_info, script_filename, cancellab
:param node: the node that represents the job calculation
:param transport_queue: the TransportQueue from which to request a Transport
:param calc_info: the calculation info datastructure returned by `CalcJobNode._presubmit`
:param script_filename: the job launch script returned by `CalcJobNode._presubmit`
:param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled
:type cancellable: :class:`aiida.engine.utils.InterruptableFuture`
:raises: Return if the tasks was successfully completed
:raises: TransportTaskException if after the maximum number of retries the transport task still excepted
"""
node = process.node

if node.get_state() == CalcJobState.SUBMITTING:
logger.warning('CalcJob<{}> already marked as SUBMITTING, skipping task_update_job'.format(node.pk))
raise Return(True)
Expand All @@ -66,7 +67,10 @@ def task_upload_job(node, transport_queue, calc_info, script_filename, cancellab
def do_upload():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)
raise Return(execmanager.upload_calculation(node, transport, calc_info, script_filename))
with SandboxFolder() as folder:
calc_info, script_filename = process.presubmit(folder)
execmanager.upload_calculation(node, transport, calc_info, folder)
raise Return((calc_info, script_filename))

try:
logger.info('scheduled request to upload CalcJob<{}>'.format(node.pk))
Expand Down Expand Up @@ -330,7 +334,7 @@ def execute(self):

if command == UPLOAD_COMMAND:
node.set_process_status(process_status)
calc_info, script_filename = yield self._launch_task(task_upload_job, node, transport_queue, *args)
calc_info, script_filename = yield self._launch_task(task_upload_job, self.process, transport_queue)
raise Return(self.submit(calc_info, script_filename))

elif command == SUBMIT_COMMAND:
Expand Down Expand Up @@ -390,7 +394,7 @@ def _launch_task(self, coro, *args, **kwargs):
def upload(self, calc_info, script_filename):
"""Return the `Waiting` state that will `upload` the `CalcJob`."""
msg = 'Waiting for calculation folder upload'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=(UPLOAD_COMMAND, calc_info, script_filename))
return self.create_state(ProcessState.WAITING, None, msg=msg, data=UPLOAD_COMMAND)

def submit(self, calc_info, script_filename):
"""Return the `Waiting` state that will `submit` the `CalcJob`."""
Expand Down

0 comments on commit f405504

Please sign in to comment.