Skip to content

Commit

Permalink
Merge branch 'pk'
Browse files Browse the repository at this point in the history
  • Loading branch information
pbk0 committed Nov 24, 2023
2 parents c83fe2c + 36f6f7b commit a745ec8
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 27 deletions.
32 changes: 12 additions & 20 deletions toolcraft/job/__base__.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,18 +637,11 @@ def __init__(
]
)

def launch_as_subprocess(self, shell: bool = True, cli_command: t.List[str] = None):
def launch_as_subprocess(self, cli_command: t.List[str] = None, shell: bool = False):
# ------------------------------------------------------------- 01
# make cli command
# make cli command if None
if cli_command is None:
_cli_command = self.cli_command
if shell:
if 'WSL2' in settings.PLATFORM.release:
_cli_command = ["gnome-terminal", "--", "bash", "-c", ] + ['"' + ' '.join(_cli_command) + '"']
else:
_cli_command = ["start", "cmd", "/c", ] + _cli_command
else:
_cli_command = cli_command
cli_command = self.cli_command

# ------------------------------------------------------------- 02
# check health
Expand All @@ -667,16 +660,7 @@ def launch_as_subprocess(self, shell: bool = True, cli_command: t.List[str] = No

# ------------------------------------------------------------- 04
# run in subprocess
# do not tempt to use this as it adds dead lock
# todo: debug only possible on windows not on wsl linux
# if single_cpu:
# if _job.experiment is None:
# return _job.method()
# else:
# return _job.method(experiment=_job.experiment)
# else:
# _ret = subprocess.run(_cli_command, shell=True, env=os.environ.copy())
_ret = subprocess.run(_cli_command, shell=shell, env=os.environ.copy())
_ret = subprocess.run(cli_command, env=os.environ.copy(), shell=shell)

def wait_on(self, wait_on: t.Union['Job', 'SequentialJobGroup', 'ParallelJobGroup']) -> "Job":
self._wait_on_jobs.append(wait_on)
Expand Down Expand Up @@ -1420,6 +1404,14 @@ class Experiment(_Common, abc.ABC):
# runner
runner: "Runner"

@property
def is_on_single_cpu(self) -> bool:
"""
Note that both `run` and `launch` can take `--single-cpu` arg
Also note that `launch` passes --single-cpu` arg to `run` automatically.
"""
return '--single-cpu' in sys.argv

def init(self):
# ------------------------------------------------------------------ 01
# call super
Expand Down
8 changes: 8 additions & 0 deletions toolcraft/job/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def get_app(runner: Runner):
global _RUNNER, _APP
if _RUNNER is None:
_RUNNER = runner
# noinspection PyProtectedMember
assert cli_launch._RUNNER is None, "was expecting this to be None"
cli_launch._RUNNER = runner
else:
raise e.code.CodingError(
Expand Down Expand Up @@ -84,6 +86,12 @@ def run(
show_default=False,
)
],
single_cpu: Annotated[
bool, typer.Option(
help="This `run` job was launched by `launch` with single cpu mode on (good for debugging). "
"This will be set only by `launch` command automatically."
)
] = False
):
"""
Run a job in runner.
Expand Down
30 changes: 23 additions & 7 deletions toolcraft/job/cli_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
IMP: see docstring we can even have dearpygui client if we can submit jobs
and track jobs via ssh
"""

import datetime
import os
import time
import psutil
Expand Down Expand Up @@ -54,6 +54,11 @@ def lsf(
"""

# --------------------------------------------------------- 01
# validate
if not settings.IS_LSF:
raise e.validation.NotAllowed(
msgs=["This is not LSF environment so cannot launch lsf jobs ..."]
)
# get some vars
_rp = _RUNNER.richy_panel
_rp.update(f"launching jobs on LSF cluster ...")
Expand Down Expand Up @@ -105,13 +110,11 @@ def lsf(
" && ".join([f"done({_.job_id})" for _ in _wait_on_jobs])
_nxdi_prefix += ["-w", f"{_wait_on}"]
_cli_command = _nxdi_prefix + _job.cli_command
print(">> ", " ".join(_cli_command))
# _rp.log([" ".join(_cli_command)])

# ------------------------------------------------- 02.03
# run job
# for bsub shell should be False
_job.launch_as_subprocess(shell=False, cli_command=_cli_command)
_job.launch_as_subprocess(cli_command=_cli_command)


@_APP.command(help="Launches all the jobs in runner on local machine.")
Expand Down Expand Up @@ -157,6 +160,7 @@ def local(

# --------------------------------------------------------- 04
# loop infinitely until all jobs complete
_start_time = datetime.datetime.now()
while bool(_all_jobs):

# loop over _all_jobs
Expand All @@ -166,6 +170,16 @@ def local(
# get job
_job = _all_jobs[_job_flow_id]
_job_short_name = _job.short_name
# make cli command
_cli_command = _job.cli_command
if single_cpu:
_cli_command += ["--single-cpu"]
else:
if 'WSL2' in settings.PLATFORM.release:
_cli_command = ["gnome-terminal", "--", "bash", "-c", ] + [
'"' + ' '.join(_cli_command) + '"']
else:
_cli_command = ["start", "cmd", "/c", ] + _cli_command

# ------------------------------------------------- 04.02
# if finished skip
Expand Down Expand Up @@ -206,7 +220,9 @@ def local(
_all_finished = False
break
if not _all_finished:
_rp.update(f"⏰ {_job_short_name} :: postponed wait_on jobs not completed")
if (datetime.datetime.now() - _start_time).total_seconds() > 10:
_rp.update(f"⏰ {_job_short_name} :: postponed wait_on jobs not completed")
_start_time = datetime.datetime.now()
continue

# ------------------------------------------------- 04.06
Expand All @@ -229,7 +245,7 @@ def local(
# ------------------------------------------------- 04.07.01
# for first job no need to check anything just launch
if len(_jobs_running_in_parallel) == 0:
_job.launch_as_subprocess(shell=not single_cpu)
_job.launch_as_subprocess(cli_command=_cli_command, shell=not single_cpu)
_jobs_running_in_parallel[_job.job_id] = _job
_rp.log([f"🏁 {_job_short_name} :: launching"])
del _all_jobs[_job_flow_id]
Expand All @@ -247,7 +263,7 @@ def local(
_rp.update(f"⏰ {_job_short_name} :: postponed not enough memory")
continue
# all is well launch
_job.launch_as_subprocess(shell=not single_cpu)
_job.launch_as_subprocess(cli_command=_cli_command, shell=not single_cpu)
_jobs_running_in_parallel[_job.job_id] = _job
_rp.log([f"🏁 {_job_short_name} :: launching"])
del _all_jobs[_job_flow_id]
Expand Down

0 comments on commit a745ec8

Please sign in to comment.