From f4055042e3ca878d0fae8a5d1d6686a3eb553ea7 Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Fri, 13 Dec 2019 14:08:32 +0100 Subject: [PATCH] Move `CalcJob.presubmit` call from `CalcJob.run` to `Waiting.execute` The `presubmit` calls through to `prepare_for_submission` which is the place where the `CalcJob` implementation writes the input files based on the inputs nodes in a temporary sandbox folder. The contents of which are then stored in the repository of the node before the process transitions to the waiting state in await of the upload task. This upload task will then copy over the contents of the node repository to the working directory on the remote computer. There is a use case to limit which files are copied to the node's repository from the sandbox folder used by `prepare_for_submission`. This means that the sandbox folder will have to be passed to the `upload_calculation`. However, since the creation of this happens in the `CalcJob.run` call, which is quite far removed from the eventual use, this opens it up for a lot of problems. The time between folder population by `prepare_for_submission` and eventual use in the upload task can be significant, among other things because the upload task will have to wait for transport. To make this feasible the creation of the sandbox folder has to be moved closer to the upload task. Here we move the `presubmit` call from `CalcJob.run` to the upload transport task which will create the sandbox folder and pass it into the `upload_calculation`. This limits the time frame in which there is a chance for the contents of the sandbox to get lost. In addition, this now gives additional freedom of deciding which files from the sandbox are permanently stored in the node's repository for extra provenance. --- aiida/engine/daemon/execmanager.py | 14 +-- aiida/engine/processes/calcjobs/calcjob.py | 117 +++++++++------------ aiida/engine/processes/calcjobs/tasks.py | 16 +-- 3 files changed, 65 insertions(+), 82 deletions(-) diff --git a/aiida/engine/daemon/execmanager.py b/aiida/engine/daemon/execmanager.py index e23edc49e4..5357221288 100644 --- a/aiida/engine/daemon/execmanager.py +++ b/aiida/engine/daemon/execmanager.py @@ -13,10 +13,8 @@ the routines make reference to the suitable plugins for all plugin-specific operations. """ - import os - from aiida.common import AIIDA_LOGGER, exceptions from aiida.common.datastructures import CalcJobState from aiida.common.folders import SandboxFolder @@ -31,14 +29,13 @@ execlogger = AIIDA_LOGGER.getChild('execmanager') -def upload_calculation(node, transport, calc_info, script_filename, inputs=None, dry_run=False): +def upload_calculation(node, transport, calc_info, folder, inputs=None, dry_run=False): """Upload a `CalcJob` instance :param node: the `CalcJobNode`. :param transport: an already opened transport to use to submit the calculation. - :param calc_info: the calculation info datastructure returned by `CalcJobNode.presubmit` - :param script_filename: the job launch script returned by `CalcJobNode.presubmit` - :return: tuple of ``calc_info`` and ``script_filename`` + :param calc_info: the calculation info datastructure returned by `CalcJob.presubmit` + :param folder: temporary local file system folder containing the inputs written by `CalcJob.prepare_for_submission` """ from logging import LoggerAdapter from tempfile import NamedTemporaryFile @@ -65,7 +62,8 @@ def upload_calculation(node, transport, calc_info, script_filename, inputs=None, raise ValueError('Cannot submit calculation {} because it has cached input links! If you just want to test the ' 'submission, set `metadata.dry_run` to True in the inputs.'.format(node.pk)) - folder = node._raw_input_folder + # After this call, no modifications to the folder should be done + node.put_object_from_tree(folder.abspath, force=True) # If we are performing a dry-run, the working directory should actually be a local folder that should already exist if dry_run: @@ -251,8 +249,6 @@ def find_data_node(inputs, uuid): remotedata.add_incoming(node, link_type=LinkType.CREATE, link_label='remote_folder') remotedata.store() - return calc_info, script_filename - def submit_calculation(calculation, transport, calc_info, script_filename): """Submit a previously uploaded `CalcJob` to the scheduler. diff --git a/aiida/engine/processes/calcjobs/calcjob.py b/aiida/engine/processes/calcjobs/calcjob.py index 469e3121c5..efa9112d75 100644 --- a/aiida/engine/processes/calcjobs/calcjob.py +++ b/aiida/engine/processes/calcjobs/calcjob.py @@ -214,55 +214,31 @@ def run(self): This means invoking the `presubmit` and storing the temporary folder in the node's repository. Then we move the process in the `Wait` state, waiting for the `UPLOAD` transport task to be started. """ - from aiida.orm import Code, load_node - from aiida.common.folders import SandboxFolder, SubmitTestFolder - from aiida.common.exceptions import InputValidationError - - # The following conditional is required for the caching to properly work. Even if the source node has a process - # state of `Finished` the cached process will still enter the running state. The process state will have then - # been overridden by the engine to `Running` so we cannot check that, but if the `exit_status` is anything other - # than `None`, it should mean this node was taken from the cache, so the process should not be rerun. - if self.node.exit_status is not None: - return self.node.exit_status - if self.inputs.metadata.dry_run: - folder_class = SubmitTestFolder - else: - folder_class = SandboxFolder - - with folder_class() as folder: - computer = self.node.computer - - if not self.inputs.metadata.dry_run and self.node.has_cached_links(): - raise exceptions.InvalidOperation('calculation node has unstored links in cache') - - calc_info, script_filename = self.presubmit(folder) - calc_info.uuid = str(self.uuid) - input_codes = [load_node(_.code_uuid, sub_classes=(Code,)) for _ in calc_info.codes_info] - - for code in input_codes: - if not code.can_run_on(computer): - raise InputValidationError( - 'The selected code {} for calculation {} cannot run on computer {}'.format( - code.pk, self.node.pk, computer.name)) - - # After this call, no modifications to the folder should be done - self.node.put_object_from_tree(folder.abspath, force=True) + from aiida.common.folders import SubmitTestFolder + from aiida.engine.daemon.execmanager import upload_calculation + from aiida.transports.plugins.local import LocalTransport - if self.inputs.metadata.dry_run: - from aiida.engine.daemon.execmanager import upload_calculation - from aiida.transports.plugins.local import LocalTransport - with LocalTransport() as transport: + with LocalTransport() as transport: + with SubmitTestFolder() as folder: + calc_info, script_filename = self.presubmit(folder) transport.chdir(folder.abspath) - upload_calculation(self.node, transport, calc_info, script_filename, self.inputs, dry_run=True) + upload_calculation(self.node, transport, calc_info, folder, inputs=self.inputs, dry_run=True) self.node.dry_run_info = { 'folder': folder.abspath, 'script_filename': script_filename } - return plumpy.Stop(None, True) + return plumpy.Stop(None, True) + + # The following conditional is required for the caching to properly work. Even if the source node has a process + # state of `Finished` the cached process will still enter the running state. The process state will have then + # been overridden by the engine to `Running` so we cannot check that, but if the `exit_status` is anything other + # than `None`, it should mean this node was taken from the cache, so the process should not be rerun. + if self.node.exit_status is not None: + return self.node.exit_status # Launch the upload operation - return plumpy.Wait(msg='Waiting to upload', data=(UPLOAD_COMMAND, calc_info, script_filename)) + return plumpy.Wait(msg='Waiting to upload', data=UPLOAD_COMMAND) def prepare_for_submission(self, folder): """Prepare files for submission of calculation.""" @@ -304,7 +280,7 @@ def presubmit(self, folder): # pylint: disable=too-many-locals,too-many-statements,too-many-branches import os - from aiida.common.exceptions import PluginInternalError, ValidationError + from aiida.common.exceptions import PluginInternalError, ValidationError, InvalidOperation, InputValidationError from aiida.common import json from aiida.common.utils import validate_list_of_string_tuples from aiida.common.datastructures import CodeInfo, CodeRunMode @@ -315,16 +291,23 @@ def presubmit(self, folder): computer = self.node.computer inputs = self.node.get_incoming(link_type=LinkType.INPUT_CALC) - codes = [_ for _ in inputs.all_nodes() if isinstance(_, Code)] + if not self.inputs.metadata.dry_run and self.node.has_cached_links(): + raise InvalidOperation('calculation node has unstored links in cache') - calcinfo = self.prepare_for_submission(folder) - scheduler = computer.get_scheduler() + codes = [_ for _ in inputs.all_nodes() if isinstance(_, Code)] for code in codes: - if code.is_local(): - if code.get_local_executable() in folder.get_content_list(): - raise PluginInternalError('The plugin created a file {} that is also ' - 'the executable name!'.format(code.get_local_executable())) + if not code.can_run_on(computer): + raise InputValidationError('The selected code {} for calculation {} cannot run on computer {}'.format( + code.pk, self.node.pk, computer.name)) + + if code.is_local() and code.get_local_executable() in folder.get_content_list(): + raise PluginInternalError('The plugin created a file {} that is also the executable name!'.format( + code.get_local_executable())) + + calc_info = self.prepare_for_submission(folder) + calc_info.uuid = str(self.node.uuid) + scheduler = computer.get_scheduler() # I create the job template to pass to the scheduler job_tmpl = JobTemplate() @@ -342,7 +325,7 @@ def presubmit(self, folder): job_tmpl.sched_join_files = False # Set retrieve path, add also scheduler STDOUT and STDERR - retrieve_list = (calcinfo.retrieve_list if calcinfo.retrieve_list is not None else []) + retrieve_list = (calc_info.retrieve_list if calc_info.retrieve_list is not None else []) if (job_tmpl.sched_output_path is not None and job_tmpl.sched_output_path not in retrieve_list): retrieve_list.append(job_tmpl.sched_output_path) if not job_tmpl.sched_join_files: @@ -350,8 +333,8 @@ def presubmit(self, folder): retrieve_list.append(job_tmpl.sched_error_path) self.node.set_retrieve_list(retrieve_list) - retrieve_singlefile_list = (calcinfo.retrieve_singlefile_list - if calcinfo.retrieve_singlefile_list is not None else []) + retrieve_singlefile_list = (calc_info.retrieve_singlefile_list + if calc_info.retrieve_singlefile_list is not None else []) # a validation on the subclasses of retrieve_singlefile_list for _, subclassname, _ in retrieve_singlefile_list: file_sub_class = DataFactory(subclassname) @@ -363,8 +346,8 @@ def presubmit(self, folder): self.node.set_retrieve_singlefile_list(retrieve_singlefile_list) # Handle the retrieve_temporary_list - retrieve_temporary_list = (calcinfo.retrieve_temporary_list - if calcinfo.retrieve_temporary_list is not None else []) + retrieve_temporary_list = (calc_info.retrieve_temporary_list + if calc_info.retrieve_temporary_list is not None else []) self.node.set_retrieve_temporary_list(retrieve_temporary_list) # the if is done so that if the method returns None, this is @@ -375,10 +358,10 @@ def presubmit(self, folder): # an exception prepend_texts = [computer.get_prepend_text()] + \ [code.get_prepend_text() for code in codes] + \ - [calcinfo.prepend_text, self.node.get_option('prepend_text')] + [calc_info.prepend_text, self.node.get_option('prepend_text')] job_tmpl.prepend_text = '\n\n'.join(prepend_text for prepend_text in prepend_texts if prepend_text) - append_texts = [self.node.get_option('append_text'), calcinfo.append_text] + \ + append_texts = [self.node.get_option('append_text'), calc_info.append_text] + \ [code.get_append_text() for code in codes] + \ [computer.get_append_text()] job_tmpl.append_text = '\n\n'.join(append_text for append_text in append_texts if append_text) @@ -398,11 +381,11 @@ def presubmit(self, folder): extra_mpirun_params = self.node.get_option('mpirun_extra_params') # same for all codes in the same calc # set the codes_info - if not isinstance(calcinfo.codes_info, (list, tuple)): + if not isinstance(calc_info.codes_info, (list, tuple)): raise PluginInternalError('codes_info passed to CalcInfo must be a list of CalcInfo objects') codes_info = [] - for code_info in calcinfo.codes_info: + for code_info in calc_info.codes_info: if not isinstance(code_info, CodeInfo): raise PluginInternalError('Invalid codes_info, must be a list of CodeInfo objects') @@ -415,7 +398,7 @@ def presubmit(self, folder): this_withmpi = code_info.withmpi # to decide better how to set the default if this_withmpi is None: - if len(calcinfo.codes_info) > 1: + if len(calc_info.codes_info) > 1: raise PluginInternalError('For more than one code, it is ' 'necessary to set withmpi in ' 'codes_info') @@ -439,7 +422,7 @@ def presubmit(self, folder): if len(codes) > 1: try: - job_tmpl.codes_run_mode = calcinfo.codes_run_mode + job_tmpl.codes_run_mode = calc_info.codes_run_mode except KeyError: raise PluginInternalError('Need to set the order of the code execution (parallel or serial?)') else: @@ -482,24 +465,24 @@ def presubmit(self, folder): subfolder = folder.get_subfolder('.aiida', create=True) subfolder.create_file_from_filelike(io.StringIO(json.dumps(job_tmpl)), 'job_tmpl.json', 'w', encoding='utf8') - subfolder.create_file_from_filelike(io.StringIO(json.dumps(calcinfo)), 'calcinfo.json', 'w', encoding='utf8') + subfolder.create_file_from_filelike(io.StringIO(json.dumps(calc_info)), 'calcinfo.json', 'w', encoding='utf8') - if calcinfo.local_copy_list is None: - calcinfo.local_copy_list = [] + if calc_info.local_copy_list is None: + calc_info.local_copy_list = [] - if calcinfo.remote_copy_list is None: - calcinfo.remote_copy_list = [] + if calc_info.remote_copy_list is None: + calc_info.remote_copy_list = [] # Some validation this_pk = self.node.pk if self.node.pk is not None else '[UNSTORED]' - local_copy_list = calcinfo.local_copy_list + local_copy_list = calc_info.local_copy_list try: validate_list_of_string_tuples(local_copy_list, tuple_length=3) except ValidationError as exc: raise PluginInternalError('[presubmission of calc {}] ' 'local_copy_list format problem: {}'.format(this_pk, exc)) - remote_copy_list = calcinfo.remote_copy_list + remote_copy_list = calc_info.remote_copy_list try: validate_list_of_string_tuples(remote_copy_list, tuple_length=3) except ValidationError as exc: @@ -519,4 +502,4 @@ def presubmit(self, folder): 'The destination path of the remote copy ' 'is absolute! ({})'.format(this_pk, dest_rel_path)) - return calcinfo, script_filename + return calc_info, script_filename diff --git a/aiida/engine/processes/calcjobs/tasks.py b/aiida/engine/processes/calcjobs/tasks.py index fc19d2df61..3accf7f142 100644 --- a/aiida/engine/processes/calcjobs/tasks.py +++ b/aiida/engine/processes/calcjobs/tasks.py @@ -17,6 +17,7 @@ from aiida.common.datastructures import CalcJobState from aiida.common.exceptions import FeatureNotAvailable, TransportTaskException +from aiida.common.folders import SandboxFolder from aiida.engine.daemon import execmanager from aiida.engine.utils import exponential_backoff_retry, interruptable_task from aiida.schedulers.datastructures import JobState @@ -36,7 +37,7 @@ @coroutine -def task_upload_job(node, transport_queue, calc_info, script_filename, cancellable): +def task_upload_job(process, transport_queue, cancellable): """Transport task that will attempt to upload the files of a job calculation to the remote. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager @@ -46,13 +47,13 @@ def task_upload_job(node, transport_queue, calc_info, script_filename, cancellab :param node: the node that represents the job calculation :param transport_queue: the TransportQueue from which to request a Transport - :param calc_info: the calculation info datastructure returned by `CalcJobNode._presubmit` - :param script_filename: the job launch script returned by `CalcJobNode._presubmit` :param cancellable: the cancelled flag that will be queried to determine whether the task was cancelled :type cancellable: :class:`aiida.engine.utils.InterruptableFuture` :raises: Return if the tasks was successfully completed :raises: TransportTaskException if after the maximum number of retries the transport task still excepted """ + node = process.node + if node.get_state() == CalcJobState.SUBMITTING: logger.warning('CalcJob<{}> already marked as SUBMITTING, skipping task_update_job'.format(node.pk)) raise Return(True) @@ -66,7 +67,10 @@ def task_upload_job(node, transport_queue, calc_info, script_filename, cancellab def do_upload(): with transport_queue.request_transport(authinfo) as request: transport = yield cancellable.with_interrupt(request) - raise Return(execmanager.upload_calculation(node, transport, calc_info, script_filename)) + with SandboxFolder() as folder: + calc_info, script_filename = process.presubmit(folder) + execmanager.upload_calculation(node, transport, calc_info, folder) + raise Return((calc_info, script_filename)) try: logger.info('scheduled request to upload CalcJob<{}>'.format(node.pk)) @@ -330,7 +334,7 @@ def execute(self): if command == UPLOAD_COMMAND: node.set_process_status(process_status) - calc_info, script_filename = yield self._launch_task(task_upload_job, node, transport_queue, *args) + calc_info, script_filename = yield self._launch_task(task_upload_job, self.process, transport_queue) raise Return(self.submit(calc_info, script_filename)) elif command == SUBMIT_COMMAND: @@ -390,7 +394,7 @@ def _launch_task(self, coro, *args, **kwargs): def upload(self, calc_info, script_filename): """Return the `Waiting` state that will `upload` the `CalcJob`.""" msg = 'Waiting for calculation folder upload' - return self.create_state(ProcessState.WAITING, None, msg=msg, data=(UPLOAD_COMMAND, calc_info, script_filename)) + return self.create_state(ProcessState.WAITING, None, msg=msg, data=UPLOAD_COMMAND) def submit(self, calc_info, script_filename): """Return the `Waiting` state that will `submit` the `CalcJob`."""