Skip to content

Commit

Permalink
Merge branch 'pk'
Browse files Browse the repository at this point in the history
  • Loading branch information
pbk0 committed Nov 8, 2023
2 parents 2c61940 + 50b7b23 commit dc3abf8
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 111 deletions.
20 changes: 20 additions & 0 deletions toolcraft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,41 @@
__version__ = "0.1.4a23"


# todo: make sure that dearpygui works and opens in windowed mode on ibm lsf
# todo: improve load time ... have settings module overriden properly
# todo: for each subproject have a prompt to override settings and save in config file only when updated
# while also allow to override in settings module of each subproject
# only when they override the settings with UI only then add to respective config file in project

import time
_now = time.time()
from . import settings
print("settings", time.time() - _now)
from . import logger
print("logger", time.time() - _now)
from . import error
print("error", time.time() - _now)
from . import util
print("util", time.time() - _now)
from . import marshalling
print("marshalling", time.time() - _now)
from . import parallel
print("parallel", time.time() - _now)
from . import storage
print("storage", time.time() - _now)
from . import server
print("server", time.time() - _now)
from . import job
print("job", time.time() - _now)
from . import richy
print("richy", time.time() - _now)
from . import texipy
print("texipy", time.time() - _now)

try:
import dearpygui.dearpygui as _dpg
from . import gui
print("gui", time.time() - _now)
except ImportError:
...

Expand Down
42 changes: 26 additions & 16 deletions toolcraft/job/__base__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,17 @@
"""
import abc
import datetime
import enum
import os
import inspect
import pathlib
import shutil
import typing as t
import dataclasses
import subprocess
import itertools

import rich
import yaml
import sys
import asyncio
import hashlib
import types

from .. import logger
Expand All @@ -27,13 +23,15 @@
from .. import storage as s
from .. import richy
from .. import settings
from ..settings import Settings

_now = datetime.datetime.now

try:

if Settings.USE_NP_TF_KE_PA_MARSHALLING:
import tensorflow as tf
from tensorflow.python.training.tracking import util as tf_util
except ImportError:
else:
tf = None
tf_util = None

Expand Down Expand Up @@ -572,7 +570,7 @@ def path(self) -> s.Path:
todo: integrate this with storage with partition_columns ... (not important do only if necessary)
"""
_ret = self.runner.cwd
_ret = self.runner.results_dir
_ret /= self.method.__func__.__name__
if bool(self.experiment):
for _ in self.experiment.group_by:
Expand Down Expand Up @@ -1068,7 +1066,7 @@ class Monitor:
@property
@util.CacheResult
def path(self) -> s.Path:
_ret = self.runner.cwd / _MONITOR_FOLDER
_ret = self.runner.results_dir / _MONITOR_FOLDER
if not _ret.exists():
_ret.mkdir(create_parents=True)
return _ret
Expand Down Expand Up @@ -1396,9 +1394,9 @@ def init(self):
@dataclasses.dataclass(frozen=True)
@m.RuleChecker(
things_to_be_cached=[
'cwd', 'flow', 'monitor', 'registered_experiments',
'cwd', 'results_dir', 'flow', 'monitor', 'registered_experiments',
],
things_not_to_be_overridden=['cwd', 'py_script', 'monitor'],
things_not_to_be_overridden=['cwd', 'results_dir', 'py_script', 'monitor'],
# we do not want any fields for Runner class
restrict_dataclass_fields_to=[],
)
Expand Down Expand Up @@ -1459,18 +1457,31 @@ def copy_src_dst(self) -> t.Tuple[str, str]:
msgs=["Cannot use copy cli command", "Please implement property copy_src_dst to use copy cli command"]
)

@property
@util.CacheResult
def results_dir(self) -> s.Path:
"""
results dir where results will be stored for this runner
"""
_py_script = self.py_script
_folder_name = _py_script.name.replace(".py", "")
_folder_name += f"_{self.hex_hash[:5]}"
_ret = s.Path(suffix_path=_folder_name, fs_name='RESULTS')
if not _ret.exists():
_ret.mkdir(create_parents=True)
return _ret

@property
@util.CacheResult
def cwd(self) -> s.Path:
"""
todo: adapt code so that the cwd can be on any other file system instead of CWD
"""
_py_script = self.py_script
_folder_name = _py_script.name.replace(".py", "")
_ret = s.Path(suffix_path=_folder_name, fs_name='CWD')
_ret = s.Path(suffix_path=".", fs_name='CWD')
e.code.AssertError(
value1=_ret.local_path.absolute().as_posix(),
value2=(_py_script.parent / _folder_name).absolute().as_posix(),
value2=_py_script.parent.absolute().as_posix(),
msgs=[
f"This is unexpected ... ",
f"The cwd for job runner is {_ret.local_path.absolute().as_posix()}",
Expand Down Expand Up @@ -1499,15 +1510,14 @@ def init(self):
# setup logger
import logging
# note that this should always be local ... dont use `self.cwd`
_log_file = self.py_script.parent / self.py_script.name.replace(".py", "") / "runner.log"
_log_file.parent.mkdir(parents=True, exist_ok=True)
_log_file = self.results_dir / "runner.log"
logger.setup_logging(
propagate=False,
level=logging.NOTSET,
handlers=[
# logger.get_rich_handler(),
# logger.get_stream_handler(),
logger.get_file_handler(_log_file),
logger.get_file_handler(_log_file.local_path),
],
)

Expand Down
119 changes: 118 additions & 1 deletion toolcraft/job/cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import enum
import os
import pathlib
import zipfile
import time

import psutil
import typer
from typing_extensions import Annotated
import sys
import os
import shutil
import dataclasses
import typing as t
import subprocess
Expand Down Expand Up @@ -72,7 +76,13 @@ def nxdi():

@_APP.command(help="Run the job")
def run(
job: str = typer.Argument(..., help="Job ID in format <runner-hex-hash:method-name> or <runner-hex-hash:experi-hex-hash:method-name>", show_default=False, ),
job: Annotated[
str,
typer.Argument(
help="Job ID in format <runner-hex-hash:method-name> or <runner-hex-hash:experi-hex-hash:method-name>",
show_default=False,
)
],
):
"""
Run a job in runner.
Expand Down Expand Up @@ -261,6 +271,113 @@ def _j_view(_j: Job) -> gui.widget.Widget:
gui.Engine.run(_dashboard)


@_APP.command(help="Archive/partition/upload the results folder")
def archive(
part_size: Annotated[int, typer.Option(help="Max part size in MB to break the resulting archive file.")] = None,
transmft: Annotated[bool, typer.Option(help="Upload resulting files to cloud drive and make script to download them.")] = False,
):

# -------------------------------------------------------------- 01
# start
_rp = _RUNNER.richy_panel
# validation
if transmft:
if part_size is not None:
raise e.validation.NotAllowed(
msgs=["When using transmft do not supply part_size as we hardcode it to 399MB"]
)
part_size = 399

# -------------------------------------------------------------- 02
# make archive
_rp.update(
f"archiving results dir {_RUNNER.results_dir.local_path.as_posix()} "
f"{'' if part_size is None else 'and making parts '} ..."
)
_zip_base_name = _RUNNER.results_dir.name
_cwd = _RUNNER.cwd.local_path.resolve().absolute()
_archive_folder = _RUNNER.results_dir.local_path.parent / f"{_zip_base_name}_archive"
_archive_folder.mkdir()
_big_zip_file = _archive_folder / f"{_zip_base_name}.zip"
_src_dir = _RUNNER.results_dir.local_path.expanduser().resolve(strict=True)
_files_and_folders_to_compress = 0
for _file in _src_dir.rglob('*'):
_files_and_folders_to_compress += 1
_rp.update(f"zipping {_files_and_folders_to_compress} items")
_zipping_track = _rp.add_task(task_name="zipping", total=_files_and_folders_to_compress)
with zipfile.ZipFile(_big_zip_file, 'w', zipfile.ZIP_DEFLATED) as _zf:
for _file in _src_dir.rglob('*'):
_zipping_track.update(advance=1)
__file = _file.relative_to(_src_dir.parent)
_rp.update(f"zipping {__file} ...")
_zf.write(_file, __file)
_chapters = 1
if part_size is not None:
_BUF = 10 * 1024 * 1024 * 1024 # 10GB - max memory buffer size to use for read
_part_size_in_bytes = part_size * 1024 *1024
_ugly_buf = ''
with open(_big_zip_file, 'rb') as _src:
while True:
_rp.update(f"splitting large zip in part {_chapters}")
_part_file = _big_zip_file.parent / f"{_big_zip_file.name}.{_chapters:03d}"
with open(_part_file, 'wb') as _tgt:
_written = 0
while _written < _part_size_in_bytes:
if len(_ugly_buf) > 0:
_tgt.write(_ugly_buf)
_tgt.write(_src.read(min(_BUF, _part_size_in_bytes - _written)))
_written += min(_BUF, _part_size_in_bytes - _written)
_ugly_buf = _src.read(1)
if len(_ugly_buf) == 0:
break
if len(_ugly_buf) == 0:
if _chapters == 1:
_part_file.unlink()
break
_chapters += 1
if _chapters > 1:
_rp.update(f"removing large zip file")
_big_zip_file.unlink()

# -------------------------------------------------------------- 03
# look for archives and upload them
if transmft:
_rp.update(f"performing uploads to transcend")
_rp.stop()
if _chapters == 1:
print(f"Uploading file part {_big_zip_file.as_posix()}")
_cmd_tokens = [
"transmft", "-p", f"{_big_zip_file.as_posix()}",
]
subprocess.run(_cmd_tokens, shell=False)
elif _chapters > 1:
for _f in _archive_folder.glob(f"{_zip_base_name}.zip.*"):
print(f"Uploading file part {_f.as_posix()}")
_cmd_tokens = [
"transmft", "-p", f"{_f.as_posix()}",
]
subprocess.run(_cmd_tokens, shell=False)
else:
raise e.code.ShouldNeverHappen(msgs=[f"unknown value -- {_chapters}"])
_trans_log_file = _archive_folder / f"trans.log"
shutil.move(_cwd / "trans.log", _trans_log_file)
_trans_file_keys = [
_.split(" ")[0] for _ in _trans_log_file.read_text().split("\n") if
_ != ""
]
_ps1_script_file = _archive_folder / f"get.ps1"
_ps1_script_file.write_text(
"\n".join(
[f"transmft -g {_}" for _ in _trans_file_keys]
)
)
print("*"*30)
print(_ps1_script_file.read_text())
print("*"*30)
_rp.start()
_rp.set_final_message(_ps1_script_file.read_text())


@_APP.command(help="Copies from server to cwd.")
def copy():
"""
Expand Down
9 changes: 5 additions & 4 deletions toolcraft/job/cli_launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import time
import psutil
import typer
from typing_extensions import Annotated
import typing as t
import subprocess

Expand All @@ -40,9 +41,9 @@

@_APP.command(help="Launches all the jobs in runner on lsf infrastructure.")
def lsf(
email: bool = typer.Option(default=False, help="Set this if you want to receive email after lsf job completion."),
cpus: int = typer.Option(default=None, help="Number of processors to use for lsf job."),
memory: int = typer.Option(default=None, help="Amount of memory to reserve for lsf job."),
email: Annotated[bool, typer.Option(help="Set this if you want to receive email after lsf job completion.")] = False,
cpus: Annotated[int, typer.Option(help="Number of processors to use for lsf job.")] = None,
memory: Annotated[int, typer.Option(help="Amount of memory to reserve for lsf job.")] = None,
):
"""
Expand Down Expand Up @@ -115,7 +116,7 @@ def lsf(

@_APP.command(help="Launches all the jobs in runner on local machine.")
def local(
single_cpu: bool = typer.Option(default=False, help="Launches on single CPU in sequence (good for debugging)")
single_cpu: Annotated[bool, typer.Option(help="Launches on single CPU in sequence (good for debugging)")] = False
):
"""
todo: remote linux instances via wsl via ssh https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/WSL.html
Expand Down
Loading

0 comments on commit dc3abf8

Please sign in to comment.