diff --git a/docs/data_structures/libE_specs.rst b/docs/data_structures/libE_specs.rst index 41cab5572..ee814e10e 100644 --- a/docs/data_structures/libE_specs.rst +++ b/docs/data_structures/libE_specs.rst @@ -77,6 +77,10 @@ the ``LibeSpecs`` class. When provided as a Python class, options are validated Whether to copy back directories within ``ensemble_dir_path`` back to launch location. Useful if ``ensemble_dir_path`` located on node-local storage. + "reuse_output_dir" [bool] = ``False``: + Whether to allow overwrites and access to previous ensemble and workflow directories in subsequent runs. + ``False`` by default to protect results. + "use_worker_dirs" [bool] = ``False``: Whether to organize calculation directories under worker-specific directories: diff --git a/libensemble/comms/logs.py b/libensemble/comms/logs.py index 8cd6acfbc..10acbae07 100644 --- a/libensemble/comms/logs.py +++ b/libensemble/comms/logs.py @@ -53,14 +53,11 @@ def set_directory(self, dirname: str) -> None: dirname = Path(dirname) if not dirname.exists(): dirname.mkdir(parents=True) - if self.logger_set: - logger = logging.getLogger(self.name) - logger.warning("Cannot set directory after loggers initialized") - else: - baselog = Path(self.filename).name - basestat = Path(self.stat_filename).name - self.filename = str(dirname / baselog) - self.stat_filename = str(dirname / basestat) + + baselog = Path(self.filename).name + basestat = Path(self.stat_filename).name + self.filename = str(dirname / baselog) + self.stat_filename = str(dirname / basestat) class CommLogHandler(logging.Handler): @@ -163,40 +160,43 @@ def manager_logging_config(specs={}): # Regular logging logconfig = LogConfig.config - if not logconfig.logger_set: - if specs.get("use_workflow_dir"): # placing logfiles in separate directory - logconfig.set_directory(specs.get("workflow_dir_path")) - - formatter = logging.Formatter(logconfig.fmt) - wfilter = WorkerIDFilter(0) - fh = logging.FileHandler(logconfig.filename, mode="w") - fh.addFilter(wfilter) - fh.setFormatter(formatter) - logger = logging.getLogger(logconfig.name) - logger.propagate = False - logger.setLevel(logconfig.log_level) # Formatter filters on top of this - logger.addHandler(fh) - logconfig.logger_set = True + if specs.get("use_workflow_dir"): # placing logfiles in separate directory + logconfig.set_directory(specs.get("workflow_dir_path")) - # Stats logging - # NB: Could add a specialized handler for immediate flushing - fhs = logging.FileHandler(logconfig.stat_filename, mode="w") - fhs.addFilter(wfilter) - fhs.setFormatter(logging.Formatter("%(prefix)s: %(message)s")) - stat_logger = logging.getLogger(logconfig.stats_name) - stat_logger.propagate = False - stat_logger.setLevel(logging.DEBUG) - stat_logger.addHandler(fhs) - - # Mirror error-logging to stderr of user-specified level - fhe = logging.StreamHandler(stream=sys.stderr) - fhe.addFilter(wfilter) - efilter = ErrorFilter(logconfig.stderr_level) - fhe.addFilter(efilter) - fhe.setFormatter(formatter) - logger.addHandler(fhe) - else: - stat_logger = logging.getLogger(logconfig.stats_name) + formatter = logging.Formatter(logconfig.fmt) + wfilter = WorkerIDFilter(0) + fh = logging.FileHandler(logconfig.filename, mode="a") + fh.addFilter(wfilter) + fh.setFormatter(formatter) + logger = logging.getLogger(logconfig.name) + if logconfig.logger_set: + remove_handlers(logger) + logger.propagate = False + logger.setLevel(logconfig.log_level) # Formatter filters on top of this + logger.addHandler(fh) + logconfig.logger_set = True + + # Stats logging + # NB: Could add a specialized handler for immediate flushing + fhs = logging.FileHandler(logconfig.stat_filename, mode="a") + fhs.addFilter(wfilter) + fhs.setFormatter(logging.Formatter("%(prefix)s: %(message)s")) + stat_logger = logging.getLogger(logconfig.stats_name) + if logconfig.logger_set: + remove_handlers(stat_logger) + stat_logger.propagate = False + stat_logger.setLevel(logging.DEBUG) + stat_logger.addHandler(fhs) + + # Mirror error-logging to stderr of user-specified level + fhe = logging.StreamHandler(stream=sys.stderr) + fhe.addFilter(wfilter) + efilter = ErrorFilter(logconfig.stderr_level) + fhe.addFilter(efilter) + fhe.setFormatter(formatter) + logger.addHandler(fhe) + # else: + # stat_logger = logging.getLogger(logconfig.stats_name) stat_logger.info(f"Starting ensemble at: {stat_timer.date_start}") diff --git a/libensemble/manager.py b/libensemble/manager.py index 3ad650ca6..e308b6d11 100644 --- a/libensemble/manager.py +++ b/libensemble/manager.py @@ -218,7 +218,7 @@ def __init__( try: temp_EnsembleDirectory.make_copyback() - except OSError as e: # Ensemble dir exists and isn't empty. + except AssertionError as e: # Ensemble dir exists and isn't empty. logger.manager_warning(_USER_CALC_DIR_WARNING.format(temp_EnsembleDirectory.ensemble_dir)) self._kill_workers() raise ManagerException( diff --git a/libensemble/specs.py b/libensemble/specs.py index 1b07f5db5..0b9060ad8 100644 --- a/libensemble/specs.py +++ b/libensemble/specs.py @@ -254,6 +254,12 @@ class LibeSpecs(BaseModel): in the workflow directory. """ + reuse_output_dir: Optional[bool] = False + """ + Whether to allow overwrites and access to previous ensemble and workflow directories in subsequent runs. + ``False`` by default to protect results. + """ + workflow_dir_path: Optional[Union[str, Path]] = "." """ Optional path to the workflow directory. Autogenerated in the current directory if `use_workflow_dir` diff --git a/libensemble/tests/functionality_tests/test_persistent_sampling_CUDA_variable_resources.py b/libensemble/tests/functionality_tests/test_persistent_sampling_CUDA_variable_resources.py index caebe8d54..327cff0d7 100644 --- a/libensemble/tests/functionality_tests/test_persistent_sampling_CUDA_variable_resources.py +++ b/libensemble/tests/functionality_tests/test_persistent_sampling_CUDA_variable_resources.py @@ -38,8 +38,9 @@ # libE_specs["zero_resource_workers"] = [1] # If first worker must be gen, use this instead libE_specs["sim_dirs_make"] = True - libE_specs["workflow_dir_path"] = "./CUDA_intermediate/workflow" + str(nworkers) + libE_specs["workflow_dir_path"] = "./ensemble_CUDA/workflow_" + libE_specs["comms"] + "_w" + str(nworkers) + "_N" libE_specs["sim_dir_copy_files"] = [".gitignore"] + libE_specs["reuse_output_dir"] = True if libE_specs["comms"] == "tcp": sys.exit("This test only runs with MPI or local -- aborting...") @@ -82,9 +83,13 @@ exit_criteria = {"sim_max": 40, "wallclock_max": 300} # Perform the run - H, persis_info, flag = libE( - sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs, alloc_specs=alloc_specs - ) + + for i in range(2): + persis_info = add_unique_random_streams({}, nworkers + 1) + libE_specs["workflow_dir_path"] = libE_specs["workflow_dir_path"][:-1] + str(i) + H, persis_info, flag = libE( + sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs, alloc_specs=alloc_specs + ) if is_manager: assert flag == 0 diff --git a/libensemble/tests/functionality_tests/test_sim_dirs_per_calc.py b/libensemble/tests/functionality_tests/test_sim_dirs_per_calc.py index a1655ad66..3f8380e6b 100644 --- a/libensemble/tests/functionality_tests/test_sim_dirs_per_calc.py +++ b/libensemble/tests/functionality_tests/test_sim_dirs_per_calc.py @@ -43,6 +43,7 @@ libE_specs["sim_dir_copy_files"] = [dir_to_copy] libE_specs["sim_dir_symlink_files"] = [dir_to_symlink] libE_specs["ensemble_copy_back"] = True + libE_specs["reuse_output_dir"] = True sim_specs = { "sim_f": sim_f, diff --git a/libensemble/tests/functionality_tests/test_sim_dirs_with_exception.py b/libensemble/tests/functionality_tests/test_sim_dirs_with_exception.py index 0048c95e5..448772531 100644 --- a/libensemble/tests/functionality_tests/test_sim_dirs_with_exception.py +++ b/libensemble/tests/functionality_tests/test_sim_dirs_with_exception.py @@ -31,7 +31,7 @@ e_ensemble = "./ensemble_ex_w" + str(nworkers) + "_" + libE_specs.get("comms") if not os.path.isdir(e_ensemble): - os.makedirs(os.path.join(e_ensemble, "sim0_worker0"), exist_ok=True) + os.makedirs(os.path.join(e_ensemble, "sim0"), exist_ok=True) libE_specs["sim_dirs_make"] = True libE_specs["ensemble_dir_path"] = e_ensemble diff --git a/libensemble/tests/functionality_tests/test_sim_dirs_with_gen_dirs.py b/libensemble/tests/functionality_tests/test_sim_dirs_with_gen_dirs.py index 5bb28d115..46e3f6266 100644 --- a/libensemble/tests/functionality_tests/test_sim_dirs_with_gen_dirs.py +++ b/libensemble/tests/functionality_tests/test_sim_dirs_with_gen_dirs.py @@ -74,7 +74,7 @@ persis_info = add_unique_random_streams({}, nworkers + 1) - exit_criteria = {"sim_max": 21} + exit_criteria = {"sim_max": 20} H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs) diff --git a/libensemble/tests/functionality_tests/test_workflow_dir.py b/libensemble/tests/functionality_tests/test_workflow_dir.py new file mode 100644 index 000000000..a0bc45413 --- /dev/null +++ b/libensemble/tests/functionality_tests/test_workflow_dir.py @@ -0,0 +1,93 @@ +""" +Runs libEnsemble with uniform random sampling and writes results into sim dirs. +Tests sim_input_dir capabilities + +Execute via one of the following commands (e.g. 3 workers): + mpiexec -np 4 python test_sim_input_dir_option.py + python test_sim_input_dir_option.py --nworkers 3 --comms local + python test_sim_input_dir_option.py --nworkers 3 --comms tcp + +The number of concurrent evaluations of the objective function will be 4-1=3. +""" + +# Do not change these lines - they are parsed by run-tests.sh +# TESTSUITE_COMMS: mpi local tcp +# TESTSUITE_NPROCS: 2 4 + +import os + +import numpy as np + +from libensemble.gen_funcs.sampling import uniform_random_sample as gen_f +from libensemble.libE import libE +from libensemble.tests.regression_tests.support import write_sim_func as sim_f +from libensemble.tools import add_unique_random_streams, parse_args + +nworkers, is_manager, libE_specs, _ = parse_args() + +# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows). +if __name__ == "__main__": + sim_input_dir = os.path.abspath("./sim_input_dir") + dir_to_copy = sim_input_dir + "/copy_this" + + for dire in [sim_input_dir, dir_to_copy]: + if not os.path.isdir(dire): + os.makedirs(dire, exist_ok=True) + + libE_specs["sim_input_dir"] = sim_input_dir + libE_specs["sim_dirs_make"] = False + libE_specs["sim_dir_symlink_files"] = [ + os.path.abspath("./test_sim_input_dir_option.py") + ] # to cover FileExistsError catch + libE_specs["ensemble_copy_back"] = True + libE_specs["use_workflow_dir"] = True + + sim_specs = { + "sim_f": sim_f, + "in": ["x"], + "out": [("f", float)], + } + + gen_specs = { + "gen_f": gen_f, + "out": [("x", float, (1,))], + "user": { + "gen_batch_size": 20, + "lb": np.array([-3]), + "ub": np.array([3]), + }, + } + + persis_info = add_unique_random_streams({}, nworkers + 1) + + exit_criteria = {"sim_max": 21} + + ensemble_lens = [] + stats_lens = [] + + for i in range(2): + + libE_specs["workflow_dir_path"] = ( + "./test_workflow" + str(i) + "_nworkers" + str(nworkers) + "_comms-" + libE_specs["comms"] + ) + + H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs) + + assert os.path.isdir(libE_specs["workflow_dir_path"]), "workflow_dir not created" + assert all( + [ + i in os.listdir(libE_specs["workflow_dir_path"]) + for i in ["ensemble.log", "libE_stats.txt", "ensemble", "ensemble_back"] + ] + ) + + with open(os.path.join(libE_specs["workflow_dir_path"], "ensemble.log"), "r") as f: + lines = f.readlines() + ensemble_lens.append(len(lines)) + + with open(os.path.join(libE_specs["workflow_dir_path"], "libE_stats.txt"), "r") as f: + lines = f.readlines() + stats_lens.append(len(lines)) + + assert ensemble_lens[0] == ensemble_lens[1], "ensemble.log's didn't have same length" + assert stats_lens[0] == stats_lens[1], "libE_stats.txt's didn't have same length" diff --git a/libensemble/tests/unit_tests_logger/test_logger.py b/libensemble/tests/unit_tests_logger/test_logger.py index 48db136e0..dc5bb7cd8 100644 --- a/libensemble/tests/unit_tests_logger/test_logger.py +++ b/libensemble/tests/unit_tests_logger/test_logger.py @@ -66,7 +66,6 @@ def test_set_filename(): def test_set_directory(tmp_path): - from libensemble.comms.logs import manager_logging_config logs = LogConfig.config logs.logger_set = False @@ -77,14 +76,6 @@ def test_set_directory(tmp_path): assert logs.filename == os.path.join(tmp_path, "ensemble.log") assert logs.stat_filename == os.path.join(tmp_path, "libE_stats.txt") - manager_logging_config() - logger.set_directory("toolate") - assert logs.filename == os.path.join(tmp_path, "ensemble.log") - assert logs.stat_filename == os.path.join(tmp_path, "libE_stats.txt") - - assert os.path.isfile(os.path.join(tmp_path, "ensemble.log")) - assert os.path.isfile(os.path.join(tmp_path, "libE_stats.txt")) - def test_set_stderr_level(): stderr_level = logger.get_stderr_level() diff --git a/libensemble/tools/tools.py b/libensemble/tools/tools.py index 96c1aa4fd..68cbcea6a 100644 --- a/libensemble/tools/tools.py +++ b/libensemble/tools/tools.py @@ -52,7 +52,8 @@ + "\n" + "libEnsemble attempted to reuse {} as a parent directory for calc dirs.\n" + "If allowed to continue, previous results may have been overwritten!\n" - + "Resolve this by ensuring libE_specs['ensemble_dir_path'] is unique for each run." + + "Resolve this either by ensuring libE_specs['ensemble_dir_path'] is unique for each run\n" + + "or by setting libE_specs['reuse_output_dir'] = True.\n" + "\n" + 79 * "*" + "\n\n" diff --git a/libensemble/utils/loc_stack.py b/libensemble/utils/loc_stack.py index 09fc4ec73..acfb08427 100644 --- a/libensemble/utils/loc_stack.py +++ b/libensemble/utils/loc_stack.py @@ -16,8 +16,12 @@ def __init__(self) -> None: self.dirs = {} self.stack = [] - def copy_or_symlink( - self, destdir: str, copy_files: List[Path] = [], symlink_files: List[Path] = [], ignore_FileExists: bool = False + def copy_file( + self, + destdir: Path, + copy_files: List[Path] = [], + ignore_FileExists: bool = False, + allow_overwrite: bool = False, ) -> None: """Inspired by https://stackoverflow.com/a/9793699. Determine paths, basenames, and conditions for copying/symlinking @@ -25,9 +29,14 @@ def copy_or_symlink( for file_path in copy_files: file_path = Path(file_path).absolute() dest_path = destdir / Path(file_path.name) + if allow_overwrite and dest_path.exists(): + if dest_path.is_dir(): + shutil.rmtree(dest_path) + else: + dest_path.unlink() try: if file_path.is_dir(): - shutil.copytree(file_path, dest_path) + shutil.copytree(file_path, dest_path, dirs_exist_ok=allow_overwrite) else: shutil.copy(file_path, dest_path) except FileExistsError: @@ -36,9 +45,19 @@ def copy_or_symlink( else: # Indicates problem with unique sim_dirs raise + def symlink_file( + self, + destdir: Path, + symlink_files: List[Path] = [], + ignore_FileExists: bool = False, + allow_overwrite: bool = False, + ) -> None: + for file_path in symlink_files: src_path = Path(file_path).absolute() dest_path = destdir / Path(file_path.name) + if allow_overwrite and dest_path.exists(): + dest_path.unlink(missing_ok=True) try: os.symlink(src_path, dest_path) except FileExistsError: @@ -55,6 +74,7 @@ def register_loc( copy_files: List[Path] = [], symlink_files: List[Path] = [], ignore_FileExists: bool = False, + allow_overwrite: bool = False, ) -> str: """Register a new location in the dictionary. @@ -84,8 +104,11 @@ def register_loc( dirname.mkdir(parents=True, exist_ok=True) self.dirs[key] = dirname - if len(copy_files) or len(symlink_files): - self.copy_or_symlink(dirname, copy_files, symlink_files, ignore_FileExists) + if len(copy_files): + self.copy_file(dirname, copy_files, ignore_FileExists, allow_overwrite) + + if len(symlink_files): + self.symlink_file(dirname, symlink_files, ignore_FileExists, allow_overwrite) return dirname diff --git a/libensemble/utils/output_directory.py b/libensemble/utils/output_directory.py index 6ce4265f1..b4499939d 100644 --- a/libensemble/utils/output_directory.py +++ b/libensemble/utils/output_directory.py @@ -48,6 +48,7 @@ def __init__(self, libE_specs: dict, loc_stack: Optional[LocationStack] = None): self.workflow_dir = Path(self.specs.get("workflow_dir_path", "")) self.use_worker_dirs = self.specs.get("use_worker_dirs", False) self.ensemble_copy_back = self.specs.get("ensemble_copy_back", False) + self.allow_overwrite = self.specs.get("reuse_output_dir", False) self.sim_use = any([self.specs.get(i) for i in libE_spec_sim_dir_keys + libE_spec_calc_dir_misc]) self.sim_input_dir = Path(self.specs.get("sim_input_dir")) if self.specs.get("sim_input_dir") else "" @@ -70,13 +71,17 @@ def __init__(self, libE_specs: dict, loc_stack: Optional[LocationStack] = None): def make_copyback(self) -> None: """Check for existing ensemble dir and copybackdir, make copyback if doesn't exist""" try: - self.ensemble_dir.rmdir() - except FileNotFoundError: - pass + assert not self.ensemble_dir.exists() + except AssertionError: + if not self.allow_overwrite: + raise except Exception: raise if self.ensemble_copy_back: - self.copybackdir.mkdir() + if not self.allow_overwrite: + self.copybackdir.mkdir() + else: + self.copybackdir.mkdir(exist_ok=True) def use_calc_dirs(self, typelabel: int) -> bool: """Determines calc_dirs enabling for each calc type""" @@ -128,25 +133,28 @@ def _make_calc_dir(self, workerID, H_rows, calc_str: str, locs: LocationStack): copy_files=copy_files, symlink_files=symlink_files, ignore_FileExists=True, + allow_overwrite=self.allow_overwrite, ) return key # All cases now should involve sim_dirs or gen_dirs # ensemble_dir/worker_dir registered here, set as parent dir for calc dirs + if self.use_worker_dirs: worker_dir = "worker" + str(workerID) worker_path = (self.ensemble_dir / Path(worker_dir)).absolute() - calc_dir = calc_str + str(H_rows) - locs.register_loc(workerID, Path(worker_dir), prefix=self.ensemble_dir) + locs.register_loc( + workerID, Path(worker_dir), prefix=self.ensemble_dir, allow_overwrite=self.allow_overwrite + ) calc_prefix = worker_path # Otherwise, ensemble_dir set as parent dir for sim dirs else: - calc_dir = f"{calc_str}{H_rows}_worker{workerID}" if not self.ensemble_dir.exists(): self.ensemble_dir.mkdir(exist_ok=True, parents=True) calc_prefix = self.ensemble_dir + calc_dir = calc_str + str(H_rows) # Register calc dir with adjusted parent dir and sourcefile location locs.register_loc( calc_dir, @@ -154,6 +162,7 @@ def _make_calc_dir(self, workerID, H_rows, calc_str: str, locs: LocationStack): prefix=calc_prefix, copy_files=copy_files, symlink_files=symlink_files, + allow_overwrite=self.allow_overwrite, ) return calc_dir @@ -176,7 +185,7 @@ def prep_calc_dir(self, Work: dict, calc_iter: dict, workerID: int, calc_type: i def copy_back(self) -> None: """Copy back all ensemble dir contents to launch location""" - if self.ensemble_dir.exists() and self.ensemble_copy_back: + if self.ensemble_dir.exists() and self.ensemble_copy_back and self.loc_stack: no_calc_dirs = not self.sim_dirs_make or not self.gen_dirs_make for dire in self.loc_stack.dirs.values(): @@ -185,16 +194,20 @@ def copy_back(self) -> None: if dire == self.ensemble_dir: # occurs when no_calc_dirs is True continue # otherwise, entire ensemble dir copied into copyback dir + if self.allow_overwrite: + shutil.rmtree(dest_path, ignore_errors=True) shutil.copytree(dire, dest_path, symlinks=True, dirs_exist_ok=True) if dire.stem.startswith("worker"): return # Worker dir (with all contents) has been copied. # If not using calc dirs, likely miscellaneous files to copy back if no_calc_dirs: - p = re.compile(r"((^sim)|(^gen))\d+_worker\d+") + p = re.compile(r"((^sim)|(^gen))\d") for filep in [i for i in os.listdir(self.ensemble_dir) if not p.match(i)]: # each noncalc_dir file source_path = os.path.join(self.ensemble_dir, filep) dest_path = os.path.join(self.copybackdir, filep) + if self.allow_overwrite: + shutil.rmtree(dest_path, ignore_errors=True) try: if os.path.isdir(source_path): shutil.copytree(source_path, dest_path, symlinks=True)