From 4a0e0d2f88c69e08bc53514fc2d3565174d97b76 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Mon, 10 Oct 2016 21:40:31 -0400 Subject: [PATCH 1/5] Refactor toward allowing extensions required in #93. --- cwltool/job.py | 104 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 69 insertions(+), 35 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index 52b344ae0..54a02ecf3 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -151,10 +151,6 @@ def run(self, dry_run=False, pull_image=True, rm_container=True, stageFiles(self.pathmapper, os.symlink) - stdin = None # type: Union[IO[Any], int] - stderr = None # type: IO[Any] - stdout = None # type: IO[Any] - scr, _ = get_feature(self, "ShellCommandRequirement") if scr: @@ -191,51 +187,34 @@ def linkoutdir(src, tgt): break stageFiles(generatemapper, linkoutdir) + stdin_path = None if self.stdin: - stdin = open(self.pathmapper.reversemap(self.stdin)[1], "rb") - else: - stdin = subprocess.PIPE + stdin_path = self.pathmapper.reversemap(self.stdin)[1] + stderr_path = None if self.stderr: abserr = os.path.join(self.outdir, self.stderr) dnerr = os.path.dirname(abserr) if dnerr and not os.path.exists(dnerr): os.makedirs(dnerr) - stderr = open(abserr, "wb") - else: - stderr = sys.stderr + stderr_path = abserr + stdout_path = None if self.stdout: absout = os.path.join(self.outdir, self.stdout) dn = os.path.dirname(absout) if dn and not os.path.exists(dn): os.makedirs(dn) - stdout = open(absout, "wb") - else: - stdout = sys.stderr - - sp = subprocess.Popen([Text(x).encode('utf-8') for x in runtime + self.command_line], - shell=False, - close_fds=True, - stdin=stdin, - stderr=stderr, - stdout=stdout, - env=env, - cwd=self.outdir) - - if sp.stdin: - sp.stdin.close() + stdout_path = absout - rcode = sp.wait() - - if isinstance(stdin, file): - stdin.close() - - if stderr is not sys.stderr: - stderr.close() - - if stdout is not sys.stderr: - stdout.close() + rcode = _job_popen( + [Text(x).encode('utf-8') for x in runtime + self.command_line], + stdin_path=stdin_path, + stdout_path=stdout_path, + stderr_path=stderr_path, + env=env, + cwd=self.outdir, + ) if self.successCodes and rcode in self.successCodes: processStatus = "success" @@ -294,3 +273,58 @@ def linkoutdir(src, tgt): if move_outputs == "move" and empty_subtree(self.outdir): _logger.debug(u"[job %s] Removing empty output directory %s", self.name, self.outdir) shutil.rmtree(self.outdir, True) + + +def _job_popen( + commands, + stdin_path, + stdout_path, + stderr_path, + env, + cwd, +): + # type: (List[str], Text, Text, Text, Union[MutableMapping[Text, Text], MutableMapping[str, str]], Text) -> int + + stdin = None # type: Union[IO[Any], int] + stderr = None # type: IO[Any] + stdout = None # type: IO[Any] + + if stdin_path is not None: + stdin = open(stdin_path, "rb") + else: + stdin = subprocess.PIPE + + if stdout_path is not None: + stdout = open(stdout_path, "wb") + else: + stdout = sys.stderr + + if stderr_path is not None: + stderr = open(stderr_path, "wb") + else: + stderr = sys.stderr + + sp = subprocess.Popen(commands, + shell=False, + close_fds=True, + stdin=stdin, + stdout=stdout, + stderr=stderr, + env=env, + cwd=cwd) + + if sp.stdin: + sp.stdin.close() + + rcode = sp.wait() + + if isinstance(stdin, file): + stdin.close() + + if stdout is not sys.stderr: + stdout.close() + + if stderr is not sys.stderr: + stderr.close() + + return rcode From a39e0cdb5ed3b6dbd5638a6d2a794b54d695b340 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 18 Oct 2016 17:06:29 -0400 Subject: [PATCH 2/5] Make hints available on builder object. For runtime modification based on hints. --- cwltool/builder.py | 1 + cwltool/process.py | 1 + 2 files changed, 2 insertions(+) diff --git a/cwltool/builder.py b/cwltool/builder.py index 5616699e5..94002b9f1 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -26,6 +26,7 @@ def __init__(self): # type: () -> None self.fs_access = None # type: StdFsAccess self.job = None # type: Dict[Text, Union[Dict[Text, Any], List, Text]] self.requirements = None # type: List[Dict[Text, Any]] + self.hints = None # type: List[Dict[Text, Any]] self.outdir = None # type: Text self.tmpdir = None # type: Text self.resources = None # type: Dict[Text, Union[int, Text]] diff --git a/cwltool/process.py b/cwltool/process.py index ac3418a3d..b15ff3778 100644 --- a/cwltool/process.py +++ b/cwltool/process.py @@ -442,6 +442,7 @@ def _init_job(self, joborder, **kwargs): builder.schemaDefs = self.schemaDefs builder.names = self.names builder.requirements = self.requirements + builder.hints = self.hints builder.resources = {} builder.timeout = kwargs.get("eval_timeout") From c628dea4ed95b00b0a0876a20c27582d6c8fa1a0 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Mon, 10 Oct 2016 22:55:48 -0400 Subject: [PATCH 3/5] Add an extension point allowing building scripts for cwltool jobs. --- cwltool/builder.py | 1 + cwltool/job.py | 176 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 143 insertions(+), 34 deletions(-) diff --git a/cwltool/builder.py b/cwltool/builder.py index 94002b9f1..8ddc7975b 100644 --- a/cwltool/builder.py +++ b/cwltool/builder.py @@ -35,6 +35,7 @@ def __init__(self): # type: () -> None self.pathmapper = None # type: PathMapper self.stagedir = None # type: Text self.make_fs_access = None # type: Type[StdFsAccess] + self.build_job_script = None # type: Callable[[List[str]], Text] def bind_input(self, schema, datum, lead_pos=[], tail_pos=[]): # type: (Dict[Text, Any], Any, Union[int, List[int]], List[int]) -> List[Dict[Text, Any]] diff --git a/cwltool/job.py b/cwltool/job.py index 54a02ecf3..4995a447b 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -14,6 +14,7 @@ import stat import re import shellescape +import string from .docker_uid import docker_vm_uid from .builder import Builder from typing import (Any, Callable, Union, Iterable, Mapping, MutableMapping, @@ -25,6 +26,58 @@ needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""") +FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "1") == "1" + +SHELL_COMMAND_TEMPLATE = """#!/bin/bash +python "run_job.py" "job.json" +""" + +PYTHON_RUN_SCRIPT = """ +import json +import sys +import subprocess + +with open(sys.argv[1], "r") as f: + popen_description = json.load(f) + commands = popen_description["commands"] + cwd = popen_description["cwd"] + env = popen_description["env"] + stdin_path = popen_description["stdin_path"] + stdout_path = popen_description["stdout_path"] + stderr_path = popen_description["stderr_path"] + if stdin_path is not None: + stdin = open(stdin_path, "rb") + else: + stdin = subprocess.PIPE + if stdout_path is not None: + stdout = open(stdout_path, "wb") + else: + stdout = sys.stderr + if stderr_path is not None: + stderr = open(stderr_path, "wb") + else: + stderr = sys.stderr + sp = subprocess.Popen(commands, + shell=False, + close_fds=True, + stdin=stdin, + stdout=stdout, + stderr=stderr, + env=env, + cwd=cwd) + if sp.stdin: + sp.stdin.close() + rcode = sp.wait() + if isinstance(stdin, file): + stdin.close() + if stdout is not sys.stderr: + stdout.close() + if stderr is not sys.stderr: + stderr.close() + sys.exit(rcode) +""" + + def deref_links(outputs): # type: (Any) -> None if isinstance(outputs, dict): if outputs.get("class") == "File": @@ -207,6 +260,7 @@ def linkoutdir(src, tgt): os.makedirs(dn) stdout_path = absout + build_job_script = self.builder.build_job_script # type: Callable[[List[str]], Text] rcode = _job_popen( [Text(x).encode('utf-8') for x in runtime + self.command_line], stdin_path=stdin_path, @@ -214,6 +268,7 @@ def linkoutdir(src, tgt): stderr_path=stderr_path, env=env, cwd=self.outdir, + build_job_script=build_job_script, ) if self.successCodes and rcode in self.successCodes: @@ -282,49 +337,102 @@ def _job_popen( stderr_path, env, cwd, + job_dir=None, + build_job_script=None, ): - # type: (List[str], Text, Text, Text, Union[MutableMapping[Text, Text], MutableMapping[str, str]], Text) -> int + # type: (List[str], Text, Text, Text, Union[MutableMapping[Text, Text], MutableMapping[str, str]], Text, Text, Callable[[List[str]], Text]) -> int - stdin = None # type: Union[IO[Any], int] - stderr = None # type: IO[Any] - stdout = None # type: IO[Any] + job_script_contents = None # type: Text + if build_job_script: + job_script_contents = build_job_script(commands) - if stdin_path is not None: - stdin = open(stdin_path, "rb") - else: - stdin = subprocess.PIPE + if not job_script_contents and not FORCE_SHELLED_POPEN: - if stdout_path is not None: - stdout = open(stdout_path, "wb") - else: - stdout = sys.stderr + stdin = None # type: Union[IO[Any], int] + stderr = None # type: IO[Any] + stdout = None # type: IO[Any] - if stderr_path is not None: - stderr = open(stderr_path, "wb") - else: - stderr = sys.stderr + if stdin_path is not None: + stdin = open(stdin_path, "rb") + else: + stdin = subprocess.PIPE - sp = subprocess.Popen(commands, - shell=False, - close_fds=True, - stdin=stdin, - stdout=stdout, - stderr=stderr, - env=env, - cwd=cwd) + if stdout_path is not None: + stdout = open(stdout_path, "wb") + else: + stdout = sys.stderr - if sp.stdin: - sp.stdin.close() + if stderr_path is not None: + stderr = open(stderr_path, "wb") + else: + stderr = sys.stderr - rcode = sp.wait() + sp = subprocess.Popen(commands, + shell=False, + close_fds=True, + stdin=stdin, + stdout=stdout, + stderr=stderr, + env=env, + cwd=cwd) - if isinstance(stdin, file): - stdin.close() + if sp.stdin: + sp.stdin.close() - if stdout is not sys.stderr: - stdout.close() + rcode = sp.wait() - if stderr is not sys.stderr: - stderr.close() + if isinstance(stdin, file): + stdin.close() + + if stdout is not sys.stderr: + stdout.close() + + if stderr is not sys.stderr: + stderr.close() + + return rcode + else: + if job_dir is None: + job_dir = tempfile.mkdtemp(prefix="cwltooljob") + + if not job_script_contents: + job_script_contents = SHELL_COMMAND_TEMPLATE + + env_copy = {} + for key in env: + key = key.encode("utf-8") + env_copy[key] = env[key] + + job_description = dict( + commands=commands, + cwd=cwd, + env=env_copy, + stdout_path=stdout_path, + stderr_path=stderr_path, + stdin_path=stdin_path, + ) + with open(os.path.join(job_dir, "job.json"), "w") as f: + json.dump(job_description, f) + try: + job_script = os.path.join(job_dir, "run_job.bash") + with open(job_script, "w") as f: + f.write(job_script_contents) + job_run = os.path.join(job_dir, "run_job.py") + with open(job_run, "w") as f: + f.write(PYTHON_RUN_SCRIPT) + sp = subprocess.Popen( + ["bash", job_script.encode("utf-8")], + shell=False, + cwd=job_dir, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + ) + if sp.stdin: + sp.stdin.close() + + rcode = sp.wait() - return rcode + return rcode + finally: + shutil.rmtree(job_dir) From 77520b676937bcf88997913bd77daf949ac91060 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Tue, 18 Oct 2016 17:13:00 -0400 Subject: [PATCH 4/5] Disable job script stuff by default to restore old behavior if not using job script extension. --- cwltool/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cwltool/job.py b/cwltool/job.py index 4995a447b..2a514193b 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -26,7 +26,7 @@ needs_shell_quoting_re = re.compile(r"""(^$|[\s|&;()<>\'"$@])""") -FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "1") == "1" +FORCE_SHELLED_POPEN = os.getenv("CWLTOOL_FORCE_SHELL_POPEN", "0") == "1" SHELL_COMMAND_TEMPLATE = """#!/bin/bash python "run_job.py" "job.json" From 879c254d711de2679bbe41a6e5fbe944dc52474e Mon Sep 17 00:00:00 2001 From: John Chilton Date: Thu, 20 Oct 2016 15:29:27 -0400 Subject: [PATCH 5/5] Convert new type signature to multi-line variant. Thanks to suggestion by @mr-c (https://github.com/common-workflow-language/cwltool/pull/212#pullrequestreview-5032666). --- cwltool/job.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cwltool/job.py b/cwltool/job.py index 2a514193b..3ab479d45 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -331,16 +331,16 @@ def linkoutdir(src, tgt): def _job_popen( - commands, - stdin_path, - stdout_path, - stderr_path, - env, - cwd, - job_dir=None, - build_job_script=None, + commands, # type: List[str] + stdin_path, # type: Text + stdout_path, # type: Text + stderr_path, # type: Text + env, # type: Union[MutableMapping[Text, Text], MutableMapping[str, str]] + cwd, # type: Text + job_dir=None, # type: Text + build_job_script=None, # type: Callable[[List[str]], Text] ): - # type: (List[str], Text, Text, Text, Union[MutableMapping[Text, Text], MutableMapping[str, str]], Text, Text, Callable[[List[str]], Text]) -> int + # type: (...) -> int job_script_contents = None # type: Text if build_job_script: