Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/allow ensemble dir overwrite #1041

Merged
merged 15 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ jobs:
conda env update --file install/gen_deps_environment.yml

pip install ax-platform==0.2.8

- name: Install surmise
if: matrix.os != 'macos-latest' && steps.cache.outputs.cache-hit != 'true'
run: |
pip install --upgrade git+https://github.com/surmising/surmise.git@develop

- name: Build ytopt and dependencies
Expand Down
4 changes: 4 additions & 0 deletions docs/data_structures/libE_specs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ the ``LibeSpecs`` class. When provided as a Python class, options are validated
Whether to copy back directories within ``ensemble_dir_path`` back to launch
location. Useful if ``ensemble_dir_path`` located on node-local storage.

"reuse_ensemble_dir" [bool] = ``False``:
Whether to allow overwrites and access to previous ensemble directories in subsequent runs. ``False``
by default to protect results.

"use_worker_dirs" [bool] = ``False``:
Whether to organize calculation directories under worker-specific directories:

Expand Down
82 changes: 41 additions & 41 deletions libensemble/comms/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,11 @@ def set_directory(self, dirname: str) -> None:
dirname = Path(dirname)
if not dirname.exists():
dirname.mkdir(parents=True)
if self.logger_set:
logger = logging.getLogger(self.name)
logger.warning("Cannot set directory after loggers initialized")
else:
baselog = Path(self.filename).name
basestat = Path(self.stat_filename).name
self.filename = str(dirname / baselog)
self.stat_filename = str(dirname / basestat)

baselog = Path(self.filename).name
basestat = Path(self.stat_filename).name
self.filename = str(dirname / baselog)
self.stat_filename = str(dirname / basestat)


class CommLogHandler(logging.Handler):
Expand Down Expand Up @@ -163,40 +160,43 @@ def manager_logging_config(specs={}):
# Regular logging
logconfig = LogConfig.config

if not logconfig.logger_set:
if specs.get("use_workflow_dir"): # placing logfiles in separate directory
logconfig.set_directory(specs.get("workflow_dir_path"))

formatter = logging.Formatter(logconfig.fmt)
wfilter = WorkerIDFilter(0)
fh = logging.FileHandler(logconfig.filename, mode="w")
fh.addFilter(wfilter)
fh.setFormatter(formatter)
logger = logging.getLogger(logconfig.name)
logger.propagate = False
logger.setLevel(logconfig.log_level) # Formatter filters on top of this
logger.addHandler(fh)
logconfig.logger_set = True
if specs.get("use_workflow_dir"): # placing logfiles in separate directory
logconfig.set_directory(specs.get("workflow_dir_path"))

# Stats logging
# NB: Could add a specialized handler for immediate flushing
fhs = logging.FileHandler(logconfig.stat_filename, mode="w")
fhs.addFilter(wfilter)
fhs.setFormatter(logging.Formatter("%(prefix)s: %(message)s"))
stat_logger = logging.getLogger(logconfig.stats_name)
stat_logger.propagate = False
stat_logger.setLevel(logging.DEBUG)
stat_logger.addHandler(fhs)

# Mirror error-logging to stderr of user-specified level
fhe = logging.StreamHandler(stream=sys.stderr)
fhe.addFilter(wfilter)
efilter = ErrorFilter(logconfig.stderr_level)
fhe.addFilter(efilter)
fhe.setFormatter(formatter)
logger.addHandler(fhe)
else:
stat_logger = logging.getLogger(logconfig.stats_name)
formatter = logging.Formatter(logconfig.fmt)
wfilter = WorkerIDFilter(0)
fh = logging.FileHandler(logconfig.filename, mode="a")
fh.addFilter(wfilter)
fh.setFormatter(formatter)
logger = logging.getLogger(logconfig.name)
if logconfig.logger_set:
remove_handlers(logger)
logger.propagate = False
logger.setLevel(logconfig.log_level) # Formatter filters on top of this
logger.addHandler(fh)
logconfig.logger_set = True

# Stats logging
# NB: Could add a specialized handler for immediate flushing
fhs = logging.FileHandler(logconfig.stat_filename, mode="a")
fhs.addFilter(wfilter)
fhs.setFormatter(logging.Formatter("%(prefix)s: %(message)s"))
stat_logger = logging.getLogger(logconfig.stats_name)
if logconfig.logger_set:
remove_handlers(stat_logger)
stat_logger.propagate = False
stat_logger.setLevel(logging.DEBUG)
stat_logger.addHandler(fhs)

# Mirror error-logging to stderr of user-specified level
fhe = logging.StreamHandler(stream=sys.stderr)
fhe.addFilter(wfilter)
efilter = ErrorFilter(logconfig.stderr_level)
fhe.addFilter(efilter)
fhe.setFormatter(formatter)
logger.addHandler(fhe)
# else:
# stat_logger = logging.getLogger(logconfig.stats_name)

stat_logger.info(f"Starting ensemble at: {stat_timer.date_start}")

Expand Down
2 changes: 1 addition & 1 deletion libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def __init__(

try:
temp_EnsembleDirectory.make_copyback()
except OSError as e: # Ensemble dir exists and isn't empty.
except AssertionError as e: # Ensemble dir exists and isn't empty.
logger.manager_warning(_USER_CALC_DIR_WARNING.format(temp_EnsembleDirectory.ensemble_dir))
self._kill_workers()
raise ManagerException(
Expand Down
6 changes: 6 additions & 0 deletions libensemble/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,12 @@ class LibeSpecs(BaseModel):
in the workflow directory.
"""

reuse_ensemble_dir: Optional[bool] = False
"""
Whether to allow overwrites and access to previous ensemble directories in subsequent runs. ``False``
by default to protect results.
"""

workflow_dir_path: Optional[Union[str, Path]] = "."
"""
Optional path to the workflow directory. Autogenerated in the current directory if `use_workflow_dir`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
libE_specs["sim_dir_copy_files"] = [dir_to_copy]
libE_specs["sim_dir_symlink_files"] = [dir_to_symlink]
libE_specs["ensemble_copy_back"] = True
libE_specs["reuse_ensemble_dir"] = True

sim_specs = {
"sim_f": sim_f,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
e_ensemble = "./ensemble_ex_w" + str(nworkers) + "_" + libE_specs.get("comms")

if not os.path.isdir(e_ensemble):
os.makedirs(os.path.join(e_ensemble, "sim0_worker0"), exist_ok=True)
os.makedirs(os.path.join(e_ensemble, "sim0"), exist_ok=True)

libE_specs["sim_dirs_make"] = True
libE_specs["ensemble_dir_path"] = e_ensemble
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@

persis_info = add_unique_random_streams({}, nworkers + 1)

exit_criteria = {"sim_max": 21}
exit_criteria = {"sim_max": 20}

H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs)

Expand Down
93 changes: 93 additions & 0 deletions libensemble/tests/functionality_tests/test_workflow_dir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
Runs libEnsemble with uniform random sampling and writes results into sim dirs.
Tests sim_input_dir capabilities

Execute via one of the following commands (e.g. 3 workers):
mpiexec -np 4 python test_sim_input_dir_option.py
python test_sim_input_dir_option.py --nworkers 3 --comms local
python test_sim_input_dir_option.py --nworkers 3 --comms tcp

The number of concurrent evaluations of the objective function will be 4-1=3.
"""

# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: mpi local tcp
# TESTSUITE_NPROCS: 2 4

import os

import numpy as np

from libensemble.gen_funcs.sampling import uniform_random_sample as gen_f
from libensemble.libE import libE
from libensemble.tests.regression_tests.support import write_sim_func as sim_f
from libensemble.tools import add_unique_random_streams, parse_args

nworkers, is_manager, libE_specs, _ = parse_args()

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":
sim_input_dir = os.path.abspath("./sim_input_dir")
dir_to_copy = sim_input_dir + "/copy_this"

for dire in [sim_input_dir, dir_to_copy]:
if not os.path.isdir(dire):
os.makedirs(dire, exist_ok=True)

libE_specs["sim_input_dir"] = sim_input_dir
libE_specs["sim_dirs_make"] = False
libE_specs["sim_dir_symlink_files"] = [
os.path.abspath("./test_sim_input_dir_option.py")
] # to cover FileExistsError catch
libE_specs["ensemble_copy_back"] = True
libE_specs["use_workflow_dir"] = True

sim_specs = {
"sim_f": sim_f,
"in": ["x"],
"out": [("f", float)],
}

gen_specs = {
"gen_f": gen_f,
"out": [("x", float, (1,))],
"user": {
"gen_batch_size": 20,
"lb": np.array([-3]),
"ub": np.array([3]),
},
}

persis_info = add_unique_random_streams({}, nworkers + 1)

exit_criteria = {"sim_max": 21}

ensemble_lens = []
stats_lens = []

for i in range(2):

libE_specs["workflow_dir_path"] = (
"./test_workflow" + str(i) + "_nworkers" + str(nworkers) + "_comms-" + libE_specs["comms"]
)

H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs)

assert os.path.isdir(libE_specs["workflow_dir_path"]), "workflow_dir not created"
assert all(
[
i in os.listdir(libE_specs["workflow_dir_path"])
for i in ["ensemble.log", "libE_stats.txt", "ensemble", "ensemble_back"]
]
)

with open(os.path.join(libE_specs["workflow_dir_path"], "ensemble.log"), "r") as f:
lines = f.readlines()
ensemble_lens.append(len(lines))

with open(os.path.join(libE_specs["workflow_dir_path"], "libE_stats.txt"), "r") as f:
lines = f.readlines()
stats_lens.append(len(lines))

assert ensemble_lens[0] == ensemble_lens[1], "ensemble.log's didn't have same length"
assert stats_lens[0] == stats_lens[1], "libE_stats.txt's didn't have same length"
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# TESTSUITE_COMMS: mpi local tcp
# TESTSUITE_NPROCS: 3 4
# TESTSUITE_EXTRA: true
# TESTSUITE_OS_SKIP: OSX

# Requires:
# Install Surmise package
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# TESTSUITE_COMMS: mpi local tcp
# TESTSUITE_NPROCS: 3 4
# TESTSUITE_EXTRA: true
# TESTSUITE_OS_SKIP: OSX

# Requires:
# Install Surmise package
Expand Down
9 changes: 0 additions & 9 deletions libensemble/tests/unit_tests_logger/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def test_set_filename():


def test_set_directory(tmp_path):
from libensemble.comms.logs import manager_logging_config

logs = LogConfig.config
logs.logger_set = False
Expand All @@ -77,14 +76,6 @@ def test_set_directory(tmp_path):
assert logs.filename == os.path.join(tmp_path, "ensemble.log")
assert logs.stat_filename == os.path.join(tmp_path, "libE_stats.txt")

manager_logging_config()
logger.set_directory("toolate")
assert logs.filename == os.path.join(tmp_path, "ensemble.log")
assert logs.stat_filename == os.path.join(tmp_path, "libE_stats.txt")

assert os.path.isfile(os.path.join(tmp_path, "ensemble.log"))
assert os.path.isfile(os.path.join(tmp_path, "libE_stats.txt"))


def test_set_stderr_level():
stderr_level = logger.get_stderr_level()
Expand Down
30 changes: 25 additions & 5 deletions libensemble/utils/loc_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@ def __init__(self) -> None:
self.dirs = {}
self.stack = []

def copy_or_symlink(
self, destdir: str, copy_files: List[Path] = [], symlink_files: List[Path] = [], ignore_FileExists: bool = False
def copy_file(
self,
destdir: Path,
copy_files: List[Path] = [],
ignore_FileExists: bool = False,
allow_overwrite: bool = False,
) -> None:
"""Inspired by https://stackoverflow.com/a/9793699.
Determine paths, basenames, and conditions for copying/symlinking
"""
for file_path in copy_files:
file_path = Path(file_path).absolute()
dest_path = destdir / Path(file_path.name)
if allow_overwrite and dest_path.exists():
shutil.rmtree(dest_path)
try:
if file_path.is_dir():
shutil.copytree(file_path, dest_path)
shutil.copytree(file_path, dest_path, dirs_exist_ok=allow_overwrite)
else:
shutil.copy(file_path, dest_path)
except FileExistsError:
Expand All @@ -36,9 +42,19 @@ def copy_or_symlink(
else: # Indicates problem with unique sim_dirs
raise

def symlink_file(
self,
destdir: Path,
symlink_files: List[Path] = [],
ignore_FileExists: bool = False,
allow_overwrite: bool = False,
) -> None:

for file_path in symlink_files:
src_path = Path(file_path).absolute()
dest_path = destdir / Path(file_path.name)
if allow_overwrite and dest_path.exists():
dest_path.unlink(missing_ok=True)
try:
os.symlink(src_path, dest_path)
except FileExistsError:
Expand All @@ -55,6 +71,7 @@ def register_loc(
copy_files: List[Path] = [],
symlink_files: List[Path] = [],
ignore_FileExists: bool = False,
allow_overwrite: bool = False,
) -> str:
"""Register a new location in the dictionary.

Expand Down Expand Up @@ -84,8 +101,11 @@ def register_loc(
dirname.mkdir(parents=True, exist_ok=True)

self.dirs[key] = dirname
if len(copy_files) or len(symlink_files):
self.copy_or_symlink(dirname, copy_files, symlink_files, ignore_FileExists)
if len(copy_files):
self.copy_file(dirname, copy_files, ignore_FileExists, allow_overwrite)

if len(symlink_files):
self.symlink_file(dirname, symlink_files, ignore_FileExists, allow_overwrite)

return dirname

Expand Down
Loading