From a6799c04a1db9696125a7e45bbb2cc5ba6c17ec2 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Thu, 23 Mar 2023 23:12:49 -0700 Subject: [PATCH 01/24] fix typo Signed-off-by: Justin Yu --- python/ray/tune/callback.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/tune/callback.py b/python/ray/tune/callback.py index da071f1c7db3..20d57bac93a0 100644 --- a/python/ray/tune/callback.py +++ b/python/ray/tune/callback.py @@ -393,7 +393,7 @@ def get_state(self) -> Optional[Dict]: def set_state(self, state: Dict): """Sets the state for all callbacks contained within this list. - Skipps setting state for all stateless callbacks where `get_state` + Skips setting state for all stateless callbacks where `get_state` returned None.""" for i, callback in enumerate(self._callbacks): callback_state = state.get(i, None) From 2e7927b3a2d7b9c9bb20d9753f922bbfede1ad98 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 24 Mar 2023 09:10:48 -0700 Subject: [PATCH 02/24] Add initial test Signed-off-by: Justin Yu --- python/ray/tune/tests/_test_tuner_restore.py | 220 ++++++++++++++++++ .../tests/test_tuner_restore_integration.py | 189 +++++++++++++++ 2 files changed, 409 insertions(+) create mode 100644 python/ray/tune/tests/_test_tuner_restore.py create mode 100644 python/ray/tune/tests/test_tuner_restore_integration.py diff --git a/python/ray/tune/tests/_test_tuner_restore.py b/python/ray/tune/tests/_test_tuner_restore.py new file mode 100644 index 000000000000..526ffeb1f968 --- /dev/null +++ b/python/ray/tune/tests/_test_tuner_restore.py @@ -0,0 +1,220 @@ +import collections +import json +import os +from pathlib import Path +import random +import time +from typing import Dict, List, Optional + +from ray import air, tune +from ray.air import Checkpoint, session +from ray.tune.experiment import Trial + +FAIL_ON = os.environ.get("FAIL_ON", "") + +STORAGE_PATH = os.environ.get("STORAGE_PATH", "/tmp/ray_results") + +EXP_NAME = os.environ.get("EXP_NAME", "tuner_restore_integration_test") + +TIME_PER_ITER_S = float(os.environ.get("TIME_PER_ITER_S", "0.5")) + +CALLBACK_DUMP_DIR = os.environ.get("CALLBACK_DUMP_DIR", "/tmp/callback_dump_dir") + +# 4 iterations per failure --> 2 seconds per failure +# 11 failure hooks + 1 "no failure" --> 12 total failures +# ~24 seconds, 48 iterations at the last failure + +TOTAL_FAILURES = 12 +MAX_CONCURRENT_TRIALS = 1 +ITERATIONS_PER_FAILURE = 4 +FAILURES_PER_TRIAL = 2 + +ITERATIONS_PER_TRIAL = ITERATIONS_PER_FAILURE * FAILURES_PER_TRIAL # 8 +NUM_TRIALS = MAX_CONCURRENT_TRIALS * (TOTAL_FAILURES // FAILURES_PER_TRIAL) # 6 + + +class StatefulCallback(tune.Callback): + def __init__(self): + self._trial_iterations = collections.defaultdict(list) + + def on_trial_result( + self, + iteration: int, + trials: List["Trial"], + trial: "Trial", + result: Dict, + **info, + ): + self._trial_iterations[trial.trial_id].append(result["training_iteration"]) + + def on_experiment_end(self, trials: List["Trial"], **info): + # Save to directory + pass + + def get_state(self) -> Optional[Dict]: + return {"trial_iters": self._trial_iterations.copy()} + + def set_state(self, state: Dict): + self._trial_iterations = state["trial_iters"] + + +class StatefulSearcher(tune.search.Searcher): + def __init__( + self, + metric: Optional[str] = None, + mode: Optional[str] = None, + ): + super().__init__(metric=metric, mode=mode) + self._trial_count = 0 + + def suggest(self, trial_id: str) -> Optional[Dict]: + self._trial_count += 1 + # Have a few trials error occasionally. + should_error = self._trial_count % MAX_CONCURRENT_TRIALS == 0 + return {"id": self._trial_count, "should_error": should_error} + + def on_trial_complete( + self, trial_id: str, result: Optional[Dict] = None, error: bool = False + ) -> None: + pass + + def save(self, checkpoint_path: str): + with open(checkpoint_path, "w") as f: + json.dump({"trial_count": self._trial_count}, f) + + def restore(self, checkpoint_path: str): + with open(checkpoint_path, "r") as f: + state = json.load(f) + self._trial_count = state["trial_count"] + + +class FailingCallback(tune.Callback): + def __init__(self): + self._trial_iters_since_restore = {} + + @property + def fail_on(self) -> str: + return os.environ.get("FAIL_ON", "") + + def should_fail(self, trials: List["Trial"]) -> bool: + if len(self._trial_iters_since_restore) < MAX_CONCURRENT_TRIALS: + return False + + should_fail = all( + iter >= ITERATIONS_PER_FAILURE + for iter in self._trial_iters_since_restore.values() + ) + return should_fail + + def on_step_begin(self, iteration: int, trials: List["Trial"], **info): + if self.should_fail(trials) and self.fail_on == "on_step_begin": + raise RuntimeError("on_step_begin") + + def on_step_end(self, iteration: int, trials: List["Trial"], **info): + if self.should_fail(trials) and self.fail_on == "on_step_end": + raise RuntimeError("on_step_end") + + def on_trial_start( + self, iteration: int, trials: List["Trial"], trial: "Trial", **info + ): + if self.should_fail(trials) and self.fail_on == "on_trial_start": + raise RuntimeError("on_trial_start") + + def on_trial_restore( + self, iteration: int, trials: List["Trial"], trial: "Trial", **info + ): + if self.should_fail(trials) and self.fail_on == "on_trial_restore": + raise RuntimeError("on_trial_restore") + + def on_trial_save( + self, iteration: int, trials: List["Trial"], trial: "Trial", **info + ): + if self.should_fail(trials) and self.fail_on == "on_trial_save": + raise RuntimeError("on_trial_save") + + def on_trial_result( + self, + iteration: int, + trials: List["Trial"], + trial: "Trial", + result: Dict, + **info, + ): + if self.should_fail(trials) and self.fail_on == "on_trial_result": + raise RuntimeError("on_trial_result") + self._trial_iters_since_restore[trial.trial_id] = result.get( + "iterations_since_restore", 0 + ) + + def on_trial_complete( + self, iteration: int, trials: List["Trial"], trial: "Trial", **info + ): + if self.should_fail(trials) and self.fail_on == "on_trial_complete": + raise RuntimeError("on_trial_complete") + + def on_trial_error( + self, iteration: int, trials: List["Trial"], trial: "Trial", **info + ): + if self.should_fail(trials) and self.fail_on == "on_trial_error": + raise RuntimeError("on_trial_error") + + def on_checkpoint( + self, + iteration: int, + trials: List["Trial"], + trial: "Trial", + checkpoint, + **info, + ): + if self.should_fail(trials) and self.fail_on == "on_checkpoint": + raise RuntimeError("on_checkpoint") + + +def train_fn(config, data=None): + checkpoint = session.get_checkpoint() + start = checkpoint.to_dict()["iteration"] + 1 if checkpoint else 1 + + training_started_marker = Path(os.environ.get("RUN_STARTED_MARKER")) + + if training_started_marker.exists(): + training_started_marker.unlink() + else: + time.sleep(TIME_PER_ITER_S * 2) + + for iteration in range(start, ITERATIONS_PER_TRIAL + 1): + time.sleep(TIME_PER_ITER_S) + + session.report( + {"score": random.random()}, + checkpoint=Checkpoint.from_dict({"iteration": iteration}), + ) + + +trainable = tune.with_resources(train_fn, resources={"CPU": 1}) +trainable = tune.with_parameters(trainable, data={"dummy_data": [1, 2, 3]}) + + +experiment_path = os.path.join(STORAGE_PATH, EXP_NAME) + +if tune.Tuner.can_restore(experiment_path): + tuner = tune.Tuner.restore( + experiment_path, trainable=trainable, resume_errored=True + ) +else: + tuner = tune.Tuner( + trainable, + run_config=air.RunConfig( + local_dir=STORAGE_PATH, + name=EXP_NAME, + checkpoint_config=air.CheckpointConfig(num_to_keep=1), + failure_config=air.FailureConfig(max_failures=-1), + callbacks=[StatefulCallback(), FailingCallback()], + ), + tune_config=tune.TuneConfig( + num_samples=8, + max_concurrent_trials=2, + search_alg=StatefulSearcher(), + ), + ) + +result_grid = tuner.fit() diff --git a/python/ray/tune/tests/test_tuner_restore_integration.py b/python/ray/tune/tests/test_tuner_restore_integration.py new file mode 100644 index 000000000000..d6425c410446 --- /dev/null +++ b/python/ray/tune/tests/test_tuner_restore_integration.py @@ -0,0 +1,189 @@ +import numpy as np +from pathlib import Path +import pytest +import time +import signal +import subprocess +import sys + +import ray + + +@pytest.fixture +def ray_start_4_cpus(): + address_info = ray.init(num_cpus=4) + yield address_info + # The code after the yield will run as teardown code. + ray.shutdown() + + +def kill_process(process, timeout=10): + kill_timeout = time.monotonic() + 10 + while process.poll() is None and time.monotonic() < kill_timeout: + time.sleep(1) + if process.poll() is None: + process.terminate() + + +def test_tuner_restore_integration(ray_start_4_cpus, tmp_path): + script_path = Path(__file__).parent / "_test_tuner_restore.py" + + # 6 trials, each running one at a time + # Each trial runs for 8 iterations + # Each iteration takes 0.5 seconds + # 4 seconds per trial --> 24 seconds in total + + errors_to_test = [""] * 10 + + theoretical_runtime = 24.0 + passing_factor = 1.5 + passing_runtime = theoretical_runtime * passing_factor + print(f"Experiment should with a total runtime of less than {passing_runtime}.") + + return_code = None + runtime_so_far = 0 + run_iter = 0 + while runtime_so_far < passing_runtime: + run_started_marker = tmp_path / "run_started_marker" + # run_ended_marker = tmp_path / "run_started_marker" + + run_started_marker.write_text("", encoding="utf-8") + # run_ended_marker.write_text("", encoding="utf-8") + + time_per_iter_s = 0.5 + env = { + "STORAGE_PATH": str(tmp_path / "ray_results"), + "TIME_PER_ITER_S": str(time_per_iter_s), + "CALLBACK_DUMP_DIR": str(tmp_path / "callback_dump_dir"), + "RUN_STARTED_MARKER": str(run_started_marker), + } + run = subprocess.Popen( + [sys.executable, script_path], env=env # , stderr=subprocess.PIPE + ) + run_iter += 1 + print(f"Started run #{run_iter}:", run.pid) + + # Start the timer after the first trial has entered its training loop. + while run.poll() is None and run_started_marker.exists(): + time.sleep(0.05) + + # If the run finished before any trials started, then exit immediately. + if run.poll() is not None: + return_code = run.poll() + break + + timeout = min( + np.random.uniform(3 * time_per_iter_s, 6 * time_per_iter_s), + passing_runtime - runtime_so_far, + ) + + print("\n") + print("=" * 40) + print("Training has started...") + print(f"Interrupting after {timeout:.2f} s") + print(f"Currently at {runtime_so_far:.2f}/{passing_runtime}") + print("=" * 40) + print("\n") + + time.sleep(timeout) + runtime_so_far += timeout + + if run.poll() is None: + # Send "SIGINT" to stop the run + print("Sending SIGUSR1 to process", run.pid) + run.send_signal(signal.SIGUSR1) + + # Make sure the process is stopped forcefully after a timeout. + kill_process(run) + else: + print("Run already terminated!") + return_code = run.poll() + assert return_code + break + + print("\nTotal number of runs:", run_iter) + print("Total runtime:", runtime_so_far) + print("Return code:", return_code) + + # errors_to_test = [ + # # All (non-trivial) Tune loop hooks + # # Crash on 4 + # "on_step_begin", + # # Resume from 4, trial failure, restore, crash on 8 + # "on_step_end", + # # Resume from 8 + # "on_trial_start", + # "on_trial_restore", + # "on_trial_save", + # "on_trial_result", + # "on_trial_complete", + # "on_trial_error", + # # Test w/ SIGINT randomly between other error locations. + # "", + # "", + # ] + + # for error_on in errors_to_test: + # print("\n\n") + # print("=" * 40) + # print("Testing error on: ", error_on) + # print("=" * 40) + # print("\n\n") + + # run_started_marker = tmp_path / "run_started_marker" + # run_ended_marker = tmp_path / "run_started_marker" + + # run_started_marker.write_text("", encoding="utf-8") + # run_ended_marker.write_text("", encoding="utf-8") + + # env = { + # "FAIL_ON": error_on, + # "STORAGE_PATH": str(tmp_path / "ray_results"), + # # "EXP_NAME": "", + # "TIME_PER_ITER_S": str(0.5), + # "CALLBACK_DUMP_DIR": str(tmp_path / "callback_dump_dir"), + # "RUN_STARTED_MARKER": str(run_started_marker), + # "RUN_ENDED_MARKER": str(run_ended_marker), + # } + # run = subprocess.Popen( + # [sys.executable, script_path], env=env # , stderr=subprocess.PIPE + # ) + # print("Started run:", run.pid) + + # while run_started_marker.exists(): + # time.sleep(0.05) + + # timeout = 4 * 0.5 + # if error_on != "": + # timeout *= 2 + + # start = time.time() + # time.sleep(timeout) + # while run.poll() is None and time.time() - start <= timeout: + # time.sleep(0.1) + + # elapsed = time.time() - start + # print(f"Ran for {time.time() - start} seconds!\n") + # total_runtime += elapsed + + # if run.poll() is None: + # # Send "SIGINT" to stop the run + # print("Sending SIGUSR1 to process", run.pid) + # run.send_signal(signal.SIGUSR1) + + # # Make sure the process is stopped forcefully after a timeout. + # kill_timeout = time.monotonic() + 10 + # while run.poll() is None and time.monotonic() < kill_timeout: + # time.sleep(1) + # if run.poll() is None: + # run.terminate() + # else: + # print("Run already terminated!") + + # print("\nTotal runtime:", total_runtime) + + +if __name__ == "__main__": + import pytest + + sys.exit(pytest.main(["-v", __file__])) From 962612caae52aa7a1c127011a13b7ba6cd45fb7b Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 24 Mar 2023 12:29:44 -0700 Subject: [PATCH 03/24] draft 2 Signed-off-by: Justin Yu --- python/ray/tune/tests/_test_tuner_restore.py | 2 +- .../tests/test_tuner_restore_integration.py | 190 ++++++++---------- 2 files changed, 82 insertions(+), 110 deletions(-) diff --git a/python/ray/tune/tests/_test_tuner_restore.py b/python/ray/tune/tests/_test_tuner_restore.py index 526ffeb1f968..b03d5bc42ba3 100644 --- a/python/ray/tune/tests/_test_tuner_restore.py +++ b/python/ray/tune/tests/_test_tuner_restore.py @@ -174,7 +174,7 @@ def train_fn(config, data=None): checkpoint = session.get_checkpoint() start = checkpoint.to_dict()["iteration"] + 1 if checkpoint else 1 - training_started_marker = Path(os.environ.get("RUN_STARTED_MARKER")) + training_started_marker = Path(os.environ.get("RUN_STARTED_MARKER", "asdf.py")) if training_started_marker.exists(): training_started_marker.unlink() diff --git a/python/ray/tune/tests/test_tuner_restore_integration.py b/python/ray/tune/tests/test_tuner_restore_integration.py index d6425c410446..9cf57e1deee8 100644 --- a/python/ray/tune/tests/test_tuner_restore_integration.py +++ b/python/ray/tune/tests/test_tuner_restore_integration.py @@ -1,3 +1,21 @@ +""" +This test is meant to be an integration stress test for experiment restoration. + +Test setup: +- 8 trials, with a max of 2 running concurrently (--> 4 rounds of trials) +- Each iteration takes 0.5 seconds +- Each trial runs for 8 iterations --> 4 seconds +- Each round of 2 trials should take 4 seconds +- Without any interrupts/restoration: + - Minimum runtime: 4 rounds * 4 seconds / round = 16 seconds + - Actually running it w/o interrupts = ~24 seconds +- The test will stop the script with a SIGINT at a random time between + 4-8 iterations after restoring. + +Requirements: +- With interrupts, the experiment should finish within 1.5 * 24 = 36 seconds. + - 1.5x is the passing threshold. +""" import numpy as np from pathlib import Path import pytest @@ -8,6 +26,9 @@ import ray +from ray.tune.result_grid import ResultGrid +from ray.tune.analysis import ExperimentAnalysis + @pytest.fixture def ray_start_4_cpus(): @@ -17,42 +38,55 @@ def ray_start_4_cpus(): ray.shutdown() -def kill_process(process, timeout=10): - kill_timeout = time.monotonic() + 10 +def kill_process(process, timeout_s=10): + kill_timeout = time.monotonic() + timeout_s while process.poll() is None and time.monotonic() < kill_timeout: time.sleep(1) if process.poll() is None: process.terminate() +def print_message(message): + print("\n") + print("=" * 50) + print(message) + print("=" * 50) + print("\n") + + def test_tuner_restore_integration(ray_start_4_cpus, tmp_path): - script_path = Path(__file__).parent / "_test_tuner_restore.py" + np.random.seed(2023) - # 6 trials, each running one at a time - # Each trial runs for 8 iterations - # Each iteration takes 0.5 seconds - # 4 seconds per trial --> 24 seconds in total + script_path = Path(__file__).parent / "_test_tuner_restore.py" - errors_to_test = [""] * 10 + # Args to pass into the script as environment variables + exp_name = "tuner_restore_integration_test" + storage_path = tmp_path / "ray_results" + iters_per_trial = 8 + num_trials = 8 + total_iters = iters_per_trial * num_trials + max_concurrent = 2 - theoretical_runtime = 24.0 + # Pass criteria + no_interrupts_runtime = 24.0 passing_factor = 1.5 - passing_runtime = theoretical_runtime * passing_factor - print(f"Experiment should with a total runtime of less than {passing_runtime}.") + passing_runtime = no_interrupts_runtime * passing_factor + print(f"Experiment should finish with a total runtime <= {passing_runtime}.") + # Variables used in the loop return_code = None - runtime_so_far = 0 + total_runtime = 0 run_iter = 0 - while runtime_so_far < passing_runtime: - run_started_marker = tmp_path / "run_started_marker" - # run_ended_marker = tmp_path / "run_started_marker" + progress_history = [] + while total_runtime < passing_runtime: + run_started_marker = tmp_path / "run_started_marker" run_started_marker.write_text("", encoding="utf-8") - # run_ended_marker.write_text("", encoding="utf-8") time_per_iter_s = 0.5 env = { - "STORAGE_PATH": str(tmp_path / "ray_results"), + "STORAGE_PATH": str(storage_path), + "EXP_NAME": exp_name, "TIME_PER_ITER_S": str(time_per_iter_s), "CALLBACK_DUMP_DIR": str(tmp_path / "callback_dump_dir"), "RUN_STARTED_MARKER": str(run_started_marker), @@ -61,126 +95,64 @@ def test_tuner_restore_integration(ray_start_4_cpus, tmp_path): [sys.executable, script_path], env=env # , stderr=subprocess.PIPE ) run_iter += 1 - print(f"Started run #{run_iter}:", run.pid) + + print_message(f"Started run #{run_iter} w/ PID = {run.pid}") # Start the timer after the first trial has entered its training loop. while run.poll() is None and run_started_marker.exists(): time.sleep(0.05) - # If the run finished before any trials started, then exit immediately. + # If the run already finished, then exit immediately. if run.poll() is not None: return_code = run.poll() break timeout = min( - np.random.uniform(3 * time_per_iter_s, 6 * time_per_iter_s), - passing_runtime - runtime_so_far, + np.random.uniform(4 * time_per_iter_s, 8 * time_per_iter_s), + passing_runtime - total_runtime, ) - print("\n") - print("=" * 40) - print("Training has started...") - print(f"Interrupting after {timeout:.2f} s") - print(f"Currently at {runtime_so_far:.2f}/{passing_runtime}") - print("=" * 40) - print("\n") + print_message( + "Training has started...\n" + f"Interrupting after {timeout:.2f} s\n" + f"Currently at {total_runtime:.2f}/{passing_runtime} seconds" + ) time.sleep(timeout) - runtime_so_far += timeout + total_runtime += timeout if run.poll() is None: # Send "SIGINT" to stop the run - print("Sending SIGUSR1 to process", run.pid) + print_message(f"Sending SIGUSR1 to run #{run_iter} w/ PID = {run.pid}") run.send_signal(signal.SIGUSR1) # Make sure the process is stopped forcefully after a timeout. kill_process(run) else: - print("Run already terminated!") + print_message("Run has already terminated!") return_code = run.poll() assert return_code break - print("\nTotal number of runs:", run_iter) - print("Total runtime:", runtime_so_far) + # Check up on the results. + results = ResultGrid(ExperimentAnalysis(str(storage_path / exp_name))) + iters = [result.metrics.get("training_iteration", 0) for result in results] + progress = sum(iters) / total_iters + progress_history.append(progress) + print_message( + f"Number of trials = {len(results)}\n" + f"% completion = {progress} ({sum(iters)} iters / {total_iters})\n" + f"Currently at {total_runtime:.2f}/{passing_runtime} seconds" + ) + + print("Total number of restorations:", run_iter) + print("Total runtime:", total_runtime) print("Return code:", return_code) - # errors_to_test = [ - # # All (non-trivial) Tune loop hooks - # # Crash on 4 - # "on_step_begin", - # # Resume from 4, trial failure, restore, crash on 8 - # "on_step_end", - # # Resume from 8 - # "on_trial_start", - # "on_trial_restore", - # "on_trial_save", - # "on_trial_result", - # "on_trial_complete", - # "on_trial_error", - # # Test w/ SIGINT randomly between other error locations. - # "", - # "", - # ] - - # for error_on in errors_to_test: - # print("\n\n") - # print("=" * 40) - # print("Testing error on: ", error_on) - # print("=" * 40) - # print("\n\n") - - # run_started_marker = tmp_path / "run_started_marker" - # run_ended_marker = tmp_path / "run_started_marker" - - # run_started_marker.write_text("", encoding="utf-8") - # run_ended_marker.write_text("", encoding="utf-8") - - # env = { - # "FAIL_ON": error_on, - # "STORAGE_PATH": str(tmp_path / "ray_results"), - # # "EXP_NAME": "", - # "TIME_PER_ITER_S": str(0.5), - # "CALLBACK_DUMP_DIR": str(tmp_path / "callback_dump_dir"), - # "RUN_STARTED_MARKER": str(run_started_marker), - # "RUN_ENDED_MARKER": str(run_ended_marker), - # } - # run = subprocess.Popen( - # [sys.executable, script_path], env=env # , stderr=subprocess.PIPE - # ) - # print("Started run:", run.pid) - - # while run_started_marker.exists(): - # time.sleep(0.05) - - # timeout = 4 * 0.5 - # if error_on != "": - # timeout *= 2 - - # start = time.time() - # time.sleep(timeout) - # while run.poll() is None and time.time() - start <= timeout: - # time.sleep(0.1) - - # elapsed = time.time() - start - # print(f"Ran for {time.time() - start} seconds!\n") - # total_runtime += elapsed - - # if run.poll() is None: - # # Send "SIGINT" to stop the run - # print("Sending SIGUSR1 to process", run.pid) - # run.send_signal(signal.SIGUSR1) - - # # Make sure the process is stopped forcefully after a timeout. - # kill_timeout = time.monotonic() + 10 - # while run.poll() is None and time.monotonic() < kill_timeout: - # time.sleep(1) - # if run.poll() is None: - # run.terminate() - # else: - # print("Run already terminated!") - - # print("\nTotal runtime:", total_runtime) + assert total_runtime < passing_runtime + assert return_code == 0 + # Check that progress increases monotonically (we never go backwards/start from 0) + assert np.all(np.diff(progress_history)) if __name__ == "__main__": From db9af9cdc8f795374a5a9f83e2da5578b3b5164b Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 24 Mar 2023 14:35:59 -0700 Subject: [PATCH 04/24] working version of tuner restore stress test Signed-off-by: Justin Yu --- python/ray/tune/tests/_test_tuner_restore.py | 116 ++---------------- .../tests/test_tuner_restore_integration.py | 81 +++++++++--- 2 files changed, 78 insertions(+), 119 deletions(-) diff --git a/python/ray/tune/tests/_test_tuner_restore.py b/python/ray/tune/tests/_test_tuner_restore.py index b03d5bc42ba3..62429f4df533 100644 --- a/python/ray/tune/tests/_test_tuner_restore.py +++ b/python/ray/tune/tests/_test_tuner_restore.py @@ -10,27 +10,17 @@ from ray.air import Checkpoint, session from ray.tune.experiment import Trial -FAIL_ON = os.environ.get("FAIL_ON", "") STORAGE_PATH = os.environ.get("STORAGE_PATH", "/tmp/ray_results") - EXP_NAME = os.environ.get("EXP_NAME", "tuner_restore_integration_test") +CALLBACK_DUMP_FILE = os.environ.get( + "CALLBACK_DUMP_FILE", "/tmp/callback_dump_file.json" +) TIME_PER_ITER_S = float(os.environ.get("TIME_PER_ITER_S", "0.5")) - -CALLBACK_DUMP_DIR = os.environ.get("CALLBACK_DUMP_DIR", "/tmp/callback_dump_dir") - -# 4 iterations per failure --> 2 seconds per failure -# 11 failure hooks + 1 "no failure" --> 12 total failures -# ~24 seconds, 48 iterations at the last failure - -TOTAL_FAILURES = 12 -MAX_CONCURRENT_TRIALS = 1 -ITERATIONS_PER_FAILURE = 4 -FAILURES_PER_TRIAL = 2 - -ITERATIONS_PER_TRIAL = ITERATIONS_PER_FAILURE * FAILURES_PER_TRIAL # 8 -NUM_TRIALS = MAX_CONCURRENT_TRIALS * (TOTAL_FAILURES // FAILURES_PER_TRIAL) # 6 +NUM_TRIALS = int(os.environ.get("NUM_TRIALS", "8")) +MAX_CONCURRENT_TRIALS = int(os.environ.get("MAX_CONCURRENT_TRIALS", "2")) +ITERATIONS_PER_TRIAL = int(os.environ.get("ITERATIONS_PER_TRIAL", "8")) class StatefulCallback(tune.Callback): @@ -48,8 +38,9 @@ def on_trial_result( self._trial_iterations[trial.trial_id].append(result["training_iteration"]) def on_experiment_end(self, trials: List["Trial"], **info): - # Save to directory - pass + # Save callback contents to file + with open(CALLBACK_DUMP_FILE, "w") as f: + json.dump(self.get_state(), f, indent=2) def get_state(self) -> Optional[Dict]: return {"trial_iters": self._trial_iterations.copy()} @@ -69,9 +60,7 @@ def __init__( def suggest(self, trial_id: str) -> Optional[Dict]: self._trial_count += 1 - # Have a few trials error occasionally. - should_error = self._trial_count % MAX_CONCURRENT_TRIALS == 0 - return {"id": self._trial_count, "should_error": should_error} + return {"id": self._trial_count} def on_trial_complete( self, trial_id: str, result: Optional[Dict] = None, error: bool = False @@ -88,88 +77,6 @@ def restore(self, checkpoint_path: str): self._trial_count = state["trial_count"] -class FailingCallback(tune.Callback): - def __init__(self): - self._trial_iters_since_restore = {} - - @property - def fail_on(self) -> str: - return os.environ.get("FAIL_ON", "") - - def should_fail(self, trials: List["Trial"]) -> bool: - if len(self._trial_iters_since_restore) < MAX_CONCURRENT_TRIALS: - return False - - should_fail = all( - iter >= ITERATIONS_PER_FAILURE - for iter in self._trial_iters_since_restore.values() - ) - return should_fail - - def on_step_begin(self, iteration: int, trials: List["Trial"], **info): - if self.should_fail(trials) and self.fail_on == "on_step_begin": - raise RuntimeError("on_step_begin") - - def on_step_end(self, iteration: int, trials: List["Trial"], **info): - if self.should_fail(trials) and self.fail_on == "on_step_end": - raise RuntimeError("on_step_end") - - def on_trial_start( - self, iteration: int, trials: List["Trial"], trial: "Trial", **info - ): - if self.should_fail(trials) and self.fail_on == "on_trial_start": - raise RuntimeError("on_trial_start") - - def on_trial_restore( - self, iteration: int, trials: List["Trial"], trial: "Trial", **info - ): - if self.should_fail(trials) and self.fail_on == "on_trial_restore": - raise RuntimeError("on_trial_restore") - - def on_trial_save( - self, iteration: int, trials: List["Trial"], trial: "Trial", **info - ): - if self.should_fail(trials) and self.fail_on == "on_trial_save": - raise RuntimeError("on_trial_save") - - def on_trial_result( - self, - iteration: int, - trials: List["Trial"], - trial: "Trial", - result: Dict, - **info, - ): - if self.should_fail(trials) and self.fail_on == "on_trial_result": - raise RuntimeError("on_trial_result") - self._trial_iters_since_restore[trial.trial_id] = result.get( - "iterations_since_restore", 0 - ) - - def on_trial_complete( - self, iteration: int, trials: List["Trial"], trial: "Trial", **info - ): - if self.should_fail(trials) and self.fail_on == "on_trial_complete": - raise RuntimeError("on_trial_complete") - - def on_trial_error( - self, iteration: int, trials: List["Trial"], trial: "Trial", **info - ): - if self.should_fail(trials) and self.fail_on == "on_trial_error": - raise RuntimeError("on_trial_error") - - def on_checkpoint( - self, - iteration: int, - trials: List["Trial"], - trial: "Trial", - checkpoint, - **info, - ): - if self.should_fail(trials) and self.fail_on == "on_checkpoint": - raise RuntimeError("on_checkpoint") - - def train_fn(config, data=None): checkpoint = session.get_checkpoint() start = checkpoint.to_dict()["iteration"] + 1 if checkpoint else 1 @@ -207,8 +114,7 @@ def train_fn(config, data=None): local_dir=STORAGE_PATH, name=EXP_NAME, checkpoint_config=air.CheckpointConfig(num_to_keep=1), - failure_config=air.FailureConfig(max_failures=-1), - callbacks=[StatefulCallback(), FailingCallback()], + callbacks=[StatefulCallback()], ), tune_config=tune.TuneConfig( num_samples=8, diff --git a/python/ray/tune/tests/test_tuner_restore_integration.py b/python/ray/tune/tests/test_tuner_restore_integration.py index 9cf57e1deee8..fafae49ca011 100644 --- a/python/ray/tune/tests/test_tuner_restore_integration.py +++ b/python/ray/tune/tests/test_tuner_restore_integration.py @@ -8,14 +8,22 @@ - Each round of 2 trials should take 4 seconds - Without any interrupts/restoration: - Minimum runtime: 4 rounds * 4 seconds / round = 16 seconds - - Actually running it w/o interrupts = ~24 seconds + - Actually running it without any interrupts = ~24 seconds - The test will stop the script with a SIGINT at a random time between 4-8 iterations after restoring. Requirements: -- With interrupts, the experiment should finish within 1.5 * 24 = 36 seconds. +- Req 1: Reasonable runtime + - The experiment should finish within 1.5 * 24 = 36 seconds. - 1.5x is the passing threshold. +- Req 2: Training progress persisted + - The experiment should progress monotonically. + - The experiment shouldn't "go backward" at any point. + - Trials shouldn't start from scratch. +- Req 3: Searcher state saved/restored correctly +- Req 4: Callback state saved/restored correctly """ +import json import numpy as np from pathlib import Path import pytest @@ -38,7 +46,7 @@ def ray_start_4_cpus(): ray.shutdown() -def kill_process(process, timeout_s=10): +def kill_process_if_needed(process, timeout_s=10): kill_timeout = time.monotonic() + timeout_s while process.poll() is None and time.monotonic() < kill_timeout: time.sleep(1) @@ -54,7 +62,7 @@ def print_message(message): print("\n") -def test_tuner_restore_integration(ray_start_4_cpus, tmp_path): +def test_air_experiment_restore(ray_start_4_cpus, tmp_path): np.random.seed(2023) script_path = Path(__file__).parent / "_test_tuner_restore.py" @@ -65,8 +73,11 @@ def test_tuner_restore_integration(ray_start_4_cpus, tmp_path): iters_per_trial = 8 num_trials = 8 total_iters = iters_per_trial * num_trials + time_per_iter_s = 0.5 max_concurrent = 2 + callback_dump_file = tmp_path / "callback_dump_file.json" + # Pass criteria no_interrupts_runtime = 24.0 passing_factor = 1.5 @@ -83,12 +94,13 @@ def test_tuner_restore_integration(ray_start_4_cpus, tmp_path): run_started_marker = tmp_path / "run_started_marker" run_started_marker.write_text("", encoding="utf-8") - time_per_iter_s = 0.5 env = { "STORAGE_PATH": str(storage_path), "EXP_NAME": exp_name, + "CALLBACK_DUMP_FILE": str(callback_dump_file), "TIME_PER_ITER_S": str(time_per_iter_s), - "CALLBACK_DUMP_DIR": str(tmp_path / "callback_dump_dir"), + "ITERATIONS_PER_TRIAL": str(iters_per_trial), + "MAX_CONCURRENT_TRIALS": str(max_concurrent), "RUN_STARTED_MARKER": str(run_started_marker), } run = subprocess.Popen( @@ -114,10 +126,11 @@ def test_tuner_restore_integration(ray_start_4_cpus, tmp_path): print_message( "Training has started...\n" - f"Interrupting after {timeout:.2f} s\n" + f"Interrupting after {timeout:.2f} seconds\n" f"Currently at {total_runtime:.2f}/{passing_runtime} seconds" ) + # Sleep for a random amount of time, then stop the run. time.sleep(timeout) total_runtime += timeout @@ -127,7 +140,7 @@ def test_tuner_restore_integration(ray_start_4_cpus, tmp_path): run.send_signal(signal.SIGUSR1) # Make sure the process is stopped forcefully after a timeout. - kill_process(run) + kill_process_if_needed(run) else: print_message("Run has already terminated!") return_code = run.poll() @@ -145,14 +158,54 @@ def test_tuner_restore_integration(ray_start_4_cpus, tmp_path): f"Currently at {total_runtime:.2f}/{passing_runtime} seconds" ) - print("Total number of restorations:", run_iter) - print("Total runtime:", total_runtime) - print("Return code:", return_code) + print_message( + f"Total number of restorations = {run_iter}\n" + f"Total runtime = {total_runtime}\n" + f"Return code = {return_code}" + ) + + # The script shouldn't have errored. (It should have finished by this point.) + assert return_code == 0, ( + "The script errored with return code: {return_code}.\n" + "Check the `_test_tuner_restore_run.py` script for any issues." + ) + + # Req 1: runtime + assert ( + total_runtime <= passing_runtime + ), f"Expected runtime to be <= {passing_runtime}, but ran for: {total_runtime}" - assert total_runtime < passing_runtime - assert return_code == 0 + # Req 2: training progress persisted # Check that progress increases monotonically (we never go backwards/start from 0) - assert np.all(np.diff(progress_history)) + assert np.all(np.diff(progress_history) >= 0), ( + f"Expected progress to increase monotonically. Instead, got:\n" + "{progress_history}" + ) + + # Req 3: searcher state + results = ResultGrid(ExperimentAnalysis(str(storage_path / exp_name))) + # Check that all trials have unique ids assigned by the searcher + ids = [result.config["id"] for result in results] + assert sorted(ids) == list(range(1, num_trials + 1)), ( + "Expected the searcher to assign increasing id for each trial, but got:" + f"{ids}" + ) + + # Req 4: callback state + with open(callback_dump_file, "r") as f: + callback_state = json.load(f) + + trial_iters = callback_state["trial_iters"] + for iters in trial_iters.values(): + # Check that the callback has data for each trial, for all iters + # NOTE: There may be some duplicate data, due to the fact that + # the callback will be updated on every `on_trial_result` hook, + # but the trial may crash before the corresponding checkpoint gets processed. + assert sorted(list(set(iters))) == list( + range(1, iters_per_trial + 1) + ), f"Expected data from all iterations, but got: {iters}" + + print_message("Success!") if __name__ == "__main__": From 64a915a9c2e94b18391b607c071736b1f9e0be59 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 24 Mar 2023 15:51:43 -0700 Subject: [PATCH 05/24] add case for trainer Signed-off-by: Justin Yu --- ...py => _test_air_experiment_restore_run.py} | 93 +++++++++++++------ .../tests/test_tuner_restore_integration.py | 57 ++++++++---- 2 files changed, 105 insertions(+), 45 deletions(-) rename python/ray/tune/tests/{_test_tuner_restore.py => _test_air_experiment_restore_run.py} (58%) diff --git a/python/ray/tune/tests/_test_tuner_restore.py b/python/ray/tune/tests/_test_air_experiment_restore_run.py similarity index 58% rename from python/ray/tune/tests/_test_tuner_restore.py rename to python/ray/tune/tests/_test_air_experiment_restore_run.py index 62429f4df533..713ca33f197b 100644 --- a/python/ray/tune/tests/_test_tuner_restore.py +++ b/python/ray/tune/tests/_test_air_experiment_restore_run.py @@ -6,11 +6,14 @@ import time from typing import Dict, List, Optional +import ray from ray import air, tune from ray.air import Checkpoint, session +from ray.train.data_parallel_trainer import DataParallelTrainer from ray.tune.experiment import Trial +RUNNER_TYPE = os.environ.get("RUNNER_TYPE", "tuner") STORAGE_PATH = os.environ.get("STORAGE_PATH", "/tmp/ray_results") EXP_NAME = os.environ.get("EXP_NAME", "tuner_restore_integration_test") CALLBACK_DUMP_FILE = os.environ.get( @@ -85,8 +88,6 @@ def train_fn(config, data=None): if training_started_marker.exists(): training_started_marker.unlink() - else: - time.sleep(TIME_PER_ITER_S * 2) for iteration in range(start, ITERATIONS_PER_TRIAL + 1): time.sleep(TIME_PER_ITER_S) @@ -97,30 +98,70 @@ def train_fn(config, data=None): ) -trainable = tune.with_resources(train_fn, resources={"CPU": 1}) -trainable = tune.with_parameters(trainable, data={"dummy_data": [1, 2, 3]}) +experiment_path = os.path.join(STORAGE_PATH, EXP_NAME) -experiment_path = os.path.join(STORAGE_PATH, EXP_NAME) +if RUNNER_TYPE == "tuner": + trainable = tune.with_resources(train_fn, resources={"CPU": 1}) + trainable = tune.with_parameters(trainable, data={"dummy_data": [1, 2, 3]}) + + if tune.Tuner.can_restore(experiment_path): + tuner = tune.Tuner.restore( + experiment_path, trainable=trainable, resume_errored=True + ) + else: + tuner = tune.Tuner( + trainable, + run_config=air.RunConfig( + local_dir=STORAGE_PATH, + name=EXP_NAME, + checkpoint_config=air.CheckpointConfig(num_to_keep=1), + callbacks=[StatefulCallback()], + ), + tune_config=tune.TuneConfig( + num_samples=8, + max_concurrent_trials=2, + search_alg=StatefulSearcher(), + ), + ) + + result_grid = tuner.fit() + +elif RUNNER_TYPE == "trainer": + dataset_size = 128 + num_workers = 4 + + def train_loop_per_worker(config): + # Wrap the other train_fn with a check for the dataset. + assert session.get_dataset_shard("train").count() == dataset_size // num_workers + train_fn(config) + + datasets = {"train": ray.data.range(dataset_size)} + + if DataParallelTrainer.can_restore(experiment_path): + trainer = DataParallelTrainer.restore( + experiment_path, + datasets=datasets, + train_loop_per_worker=train_loop_per_worker, + ) + else: + trainer = DataParallelTrainer( + train_loop_per_worker, + datasets=datasets, + scaling_config=air.ScalingConfig( + num_workers=num_workers, trainer_resources={"CPU": 0} + ), + run_config=air.RunConfig( + local_dir=STORAGE_PATH, + name=EXP_NAME, + checkpoint_config=air.CheckpointConfig(num_to_keep=1), + callbacks=[StatefulCallback()], + ), + ) + + result = trainer.fit() + +ray.shutdown() +import gc -if tune.Tuner.can_restore(experiment_path): - tuner = tune.Tuner.restore( - experiment_path, trainable=trainable, resume_errored=True - ) -else: - tuner = tune.Tuner( - trainable, - run_config=air.RunConfig( - local_dir=STORAGE_PATH, - name=EXP_NAME, - checkpoint_config=air.CheckpointConfig(num_to_keep=1), - callbacks=[StatefulCallback()], - ), - tune_config=tune.TuneConfig( - num_samples=8, - max_concurrent_trials=2, - search_alg=StatefulSearcher(), - ), - ) - -result_grid = tuner.fit() +gc.collect() diff --git a/python/ray/tune/tests/test_tuner_restore_integration.py b/python/ray/tune/tests/test_tuner_restore_integration.py index fafae49ca011..0519d1005c59 100644 --- a/python/ray/tune/tests/test_tuner_restore_integration.py +++ b/python/ray/tune/tests/test_tuner_restore_integration.py @@ -38,6 +38,9 @@ from ray.tune.analysis import ExperimentAnalysis +_RUN_SCRIPT_FILENAME = "_test_air_experiment_restore_run.py" + + @pytest.fixture def ray_start_4_cpus(): address_info = ray.init(num_cpus=4) @@ -62,21 +65,47 @@ def print_message(message): print("\n") -def test_air_experiment_restore(ray_start_4_cpus, tmp_path): +# @pytest.mark.parametrize("runner_type", ["tuner", "trainer"]) +@pytest.mark.parametrize("runner_type", ["trainer"]) +def test_air_experiment_restore(ray_start_4_cpus, tmp_path, runner_type): np.random.seed(2023) - script_path = Path(__file__).parent / "_test_tuner_restore.py" + script_path = Path(__file__).parent / _RUN_SCRIPT_FILENAME # Args to pass into the script as environment variables - exp_name = "tuner_restore_integration_test" + exp_name = f"{runner_type}_restore_integration_test" + callback_dump_file = tmp_path / f"{runner_type}-callback_dump_file.json" storage_path = tmp_path / "ray_results" - iters_per_trial = 8 - num_trials = 8 - total_iters = iters_per_trial * num_trials + if storage_path.exists(): + import shutil + + shutil.rmtree(storage_path) + + run_started_marker = tmp_path / "run_started_marker" + time_per_iter_s = 0.5 max_concurrent = 2 - callback_dump_file = tmp_path / "callback_dump_file.json" + if runner_type == "tuner": + iters_per_trial = 8 + num_trials = 8 + elif runner_type == "trainer": + iters_per_trial = 64 + num_trials = 1 + + total_iters = iters_per_trial * num_trials + + env = { + "RUNNER_TYPE": runner_type, + "STORAGE_PATH": str(storage_path), + "EXP_NAME": exp_name, + "CALLBACK_DUMP_FILE": str(callback_dump_file), + "RUN_STARTED_MARKER": str(run_started_marker), + "TIME_PER_ITER_S": str(time_per_iter_s), + "ITERATIONS_PER_TRIAL": str(iters_per_trial), + "NUM_TRIALS": str(num_trials), + "MAX_CONCURRENT_TRIALS": str(max_concurrent), + } # Pass criteria no_interrupts_runtime = 24.0 @@ -91,18 +120,8 @@ def test_air_experiment_restore(ray_start_4_cpus, tmp_path): progress_history = [] while total_runtime < passing_runtime: - run_started_marker = tmp_path / "run_started_marker" run_started_marker.write_text("", encoding="utf-8") - env = { - "STORAGE_PATH": str(storage_path), - "EXP_NAME": exp_name, - "CALLBACK_DUMP_FILE": str(callback_dump_file), - "TIME_PER_ITER_S": str(time_per_iter_s), - "ITERATIONS_PER_TRIAL": str(iters_per_trial), - "MAX_CONCURRENT_TRIALS": str(max_concurrent), - "RUN_STARTED_MARKER": str(run_started_marker), - } run = subprocess.Popen( [sys.executable, script_path], env=env # , stderr=subprocess.PIPE ) @@ -166,8 +185,8 @@ def test_air_experiment_restore(ray_start_4_cpus, tmp_path): # The script shouldn't have errored. (It should have finished by this point.) assert return_code == 0, ( - "The script errored with return code: {return_code}.\n" - "Check the `_test_tuner_restore_run.py` script for any issues." + f"The script errored with return code: {return_code}.\n" + f"Check the `{_RUN_SCRIPT_FILENAME}` script for any issues." ) # Req 1: runtime From 057639fa7d9996f4179b25f81fef851455f68805 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 24 Mar 2023 17:16:43 -0700 Subject: [PATCH 06/24] Fix lint Signed-off-by: Justin Yu --- python/ray/tune/tests/test_tuner_restore_integration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/tune/tests/test_tuner_restore_integration.py b/python/ray/tune/tests/test_tuner_restore_integration.py index 0519d1005c59..cce8bc8a43e3 100644 --- a/python/ray/tune/tests/test_tuner_restore_integration.py +++ b/python/ray/tune/tests/test_tuner_restore_integration.py @@ -197,7 +197,7 @@ def test_air_experiment_restore(ray_start_4_cpus, tmp_path, runner_type): # Req 2: training progress persisted # Check that progress increases monotonically (we never go backwards/start from 0) assert np.all(np.diff(progress_history) >= 0), ( - f"Expected progress to increase monotonically. Instead, got:\n" + "Expected progress to increase monotonically. Instead, got:\n" "{progress_history}" ) @@ -220,7 +220,7 @@ def test_air_experiment_restore(ray_start_4_cpus, tmp_path, runner_type): # NOTE: There may be some duplicate data, due to the fact that # the callback will be updated on every `on_trial_result` hook, # but the trial may crash before the corresponding checkpoint gets processed. - assert sorted(list(set(iters))) == list( + assert sorted(set(iters)) == list( range(1, iters_per_trial + 1) ), f"Expected data from all iterations, but got: {iters}" From 000edba25eb4654491b4585672936210ac0ecc27 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 24 Mar 2023 18:27:09 -0700 Subject: [PATCH 07/24] minor fixes (wrap in main method) Signed-off-by: Justin Yu --- .../tests/_test_air_experiment_restore_run.py | 142 +++++++++--------- .../tests/test_tuner_restore_integration.py | 2 +- 2 files changed, 72 insertions(+), 72 deletions(-) diff --git a/python/ray/tune/tests/_test_air_experiment_restore_run.py b/python/ray/tune/tests/_test_air_experiment_restore_run.py index 713ca33f197b..951526444afc 100644 --- a/python/ray/tune/tests/_test_air_experiment_restore_run.py +++ b/python/ray/tune/tests/_test_air_experiment_restore_run.py @@ -13,17 +13,17 @@ from ray.tune.experiment import Trial -RUNNER_TYPE = os.environ.get("RUNNER_TYPE", "tuner") +RUNNER_TYPE = os.environ.get("RUNNER_TYPE", "trainer") STORAGE_PATH = os.environ.get("STORAGE_PATH", "/tmp/ray_results") -EXP_NAME = os.environ.get("EXP_NAME", "tuner_restore_integration_test") +EXP_NAME = os.environ.get("EXP_NAME", "restore_integration_test") CALLBACK_DUMP_FILE = os.environ.get( "CALLBACK_DUMP_FILE", "/tmp/callback_dump_file.json" ) TIME_PER_ITER_S = float(os.environ.get("TIME_PER_ITER_S", "0.5")) -NUM_TRIALS = int(os.environ.get("NUM_TRIALS", "8")) +NUM_TRIALS = int(os.environ.get("NUM_TRIALS", "1")) MAX_CONCURRENT_TRIALS = int(os.environ.get("MAX_CONCURRENT_TRIALS", "2")) -ITERATIONS_PER_TRIAL = int(os.environ.get("ITERATIONS_PER_TRIAL", "8")) +ITERATIONS_PER_TRIAL = int(os.environ.get("ITERATIONS_PER_TRIAL", "64")) class StatefulCallback(tune.Callback): @@ -98,70 +98,70 @@ def train_fn(config, data=None): ) -experiment_path = os.path.join(STORAGE_PATH, EXP_NAME) - - -if RUNNER_TYPE == "tuner": - trainable = tune.with_resources(train_fn, resources={"CPU": 1}) - trainable = tune.with_parameters(trainable, data={"dummy_data": [1, 2, 3]}) - - if tune.Tuner.can_restore(experiment_path): - tuner = tune.Tuner.restore( - experiment_path, trainable=trainable, resume_errored=True - ) - else: - tuner = tune.Tuner( - trainable, - run_config=air.RunConfig( - local_dir=STORAGE_PATH, - name=EXP_NAME, - checkpoint_config=air.CheckpointConfig(num_to_keep=1), - callbacks=[StatefulCallback()], - ), - tune_config=tune.TuneConfig( - num_samples=8, - max_concurrent_trials=2, - search_alg=StatefulSearcher(), - ), - ) - - result_grid = tuner.fit() - -elif RUNNER_TYPE == "trainer": - dataset_size = 128 - num_workers = 4 - - def train_loop_per_worker(config): - # Wrap the other train_fn with a check for the dataset. - assert session.get_dataset_shard("train").count() == dataset_size // num_workers - train_fn(config) - - datasets = {"train": ray.data.range(dataset_size)} - - if DataParallelTrainer.can_restore(experiment_path): - trainer = DataParallelTrainer.restore( - experiment_path, - datasets=datasets, - train_loop_per_worker=train_loop_per_worker, - ) - else: - trainer = DataParallelTrainer( - train_loop_per_worker, - datasets=datasets, - scaling_config=air.ScalingConfig( - num_workers=num_workers, trainer_resources={"CPU": 0} - ), - run_config=air.RunConfig( - local_dir=STORAGE_PATH, - name=EXP_NAME, - checkpoint_config=air.CheckpointConfig(num_to_keep=1), - callbacks=[StatefulCallback()], - ), - ) - - result = trainer.fit() - -ray.shutdown() -import gc - -gc.collect() +if __name__ == "__main__": + experiment_path = os.path.join(STORAGE_PATH, EXP_NAME) + + ray.init() + + if RUNNER_TYPE == "tuner": + trainable = tune.with_resources(train_fn, resources={"CPU": 1}) + trainable = tune.with_parameters(trainable, data={"dummy_data": [1, 2, 3]}) + + if tune.Tuner.can_restore(experiment_path): + tuner = tune.Tuner.restore( + experiment_path, trainable=trainable, resume_errored=True + ) + else: + tuner = tune.Tuner( + trainable, + run_config=air.RunConfig( + local_dir=STORAGE_PATH, + name=EXP_NAME, + checkpoint_config=air.CheckpointConfig(num_to_keep=1), + callbacks=[StatefulCallback()], + ), + tune_config=tune.TuneConfig( + num_samples=8, + max_concurrent_trials=2, + search_alg=StatefulSearcher(), + ), + ) + + result_grid = tuner.fit() + + elif RUNNER_TYPE == "trainer": + dataset_size = 128 + num_workers = 4 + + def train_loop_per_worker(config): + # Wrap the other train_fn with a check for the dataset. + assert ( + session.get_dataset_shard("train").count() + == dataset_size // num_workers + ) + train_fn(config) + + datasets = {"train": ray.data.range(dataset_size)} + + if DataParallelTrainer.can_restore(experiment_path): + trainer = DataParallelTrainer.restore( + experiment_path, + datasets=datasets, + train_loop_per_worker=train_loop_per_worker, + ) + else: + trainer = DataParallelTrainer( + train_loop_per_worker, + datasets=datasets, + scaling_config=air.ScalingConfig( + num_workers=num_workers, trainer_resources={"CPU": 0} + ), + run_config=air.RunConfig( + local_dir=STORAGE_PATH, + name=EXP_NAME, + checkpoint_config=air.CheckpointConfig(num_to_keep=1), + callbacks=[StatefulCallback()], + ), + ) + + result = trainer.fit() diff --git a/python/ray/tune/tests/test_tuner_restore_integration.py b/python/ray/tune/tests/test_tuner_restore_integration.py index cce8bc8a43e3..f7cd66607e2c 100644 --- a/python/ray/tune/tests/test_tuner_restore_integration.py +++ b/python/ray/tune/tests/test_tuner_restore_integration.py @@ -67,7 +67,7 @@ def print_message(message): # @pytest.mark.parametrize("runner_type", ["tuner", "trainer"]) @pytest.mark.parametrize("runner_type", ["trainer"]) -def test_air_experiment_restore(ray_start_4_cpus, tmp_path, runner_type): +def test_air_experiment_restore(tmp_path, runner_type): np.random.seed(2023) script_path = Path(__file__).parent / _RUN_SCRIPT_FILENAME From 34653f0cf6cd27044164e95a4b01f6dcfb3f61cc Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 24 Mar 2023 18:32:32 -0700 Subject: [PATCH 08/24] move to air Signed-off-by: Justin Yu --- .../tests/_test_experiment_restore_run.py} | 0 .../tests/test_experiment_restore.py} | 6 +++--- 2 files changed, 3 insertions(+), 3 deletions(-) rename python/ray/{tune/tests/_test_air_experiment_restore_run.py => air/tests/_test_experiment_restore_run.py} (100%) rename python/ray/{tune/tests/test_tuner_restore_integration.py => air/tests/test_experiment_restore.py} (97%) diff --git a/python/ray/tune/tests/_test_air_experiment_restore_run.py b/python/ray/air/tests/_test_experiment_restore_run.py similarity index 100% rename from python/ray/tune/tests/_test_air_experiment_restore_run.py rename to python/ray/air/tests/_test_experiment_restore_run.py diff --git a/python/ray/tune/tests/test_tuner_restore_integration.py b/python/ray/air/tests/test_experiment_restore.py similarity index 97% rename from python/ray/tune/tests/test_tuner_restore_integration.py rename to python/ray/air/tests/test_experiment_restore.py index f7cd66607e2c..90f711c2cf4e 100644 --- a/python/ray/tune/tests/test_tuner_restore_integration.py +++ b/python/ray/air/tests/test_experiment_restore.py @@ -38,7 +38,7 @@ from ray.tune.analysis import ExperimentAnalysis -_RUN_SCRIPT_FILENAME = "_test_air_experiment_restore_run.py" +_RUN_SCRIPT_FILENAME = "_test_experiment_restore_run.py" @pytest.fixture @@ -65,8 +65,8 @@ def print_message(message): print("\n") -# @pytest.mark.parametrize("runner_type", ["tuner", "trainer"]) -@pytest.mark.parametrize("runner_type", ["trainer"]) +# TODO(ml-team): "trainer" doesn't work +@pytest.mark.parametrize("runner_type", ["tuner", "trainer"]) def test_air_experiment_restore(tmp_path, runner_type): np.random.seed(2023) From f9ad0a67031ee69eef4d77ce3d98a7bf2bdd4fcf Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 24 Mar 2023 18:38:16 -0700 Subject: [PATCH 09/24] add to build Signed-off-by: Justin Yu --- python/ray/air/BUILD | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/ray/air/BUILD b/python/ray/air/BUILD index addb0574e457..89b38c00124c 100644 --- a/python/ray/air/BUILD +++ b/python/ray/air/BUILD @@ -74,6 +74,13 @@ py_test( deps = [":ml_lib"] ) +py_test( + name = "test_experiment_restore", + size = "large", + srcs = ["tests/test_experiment_restore.py"], + tags = ["team:ml", "exclusive"], + deps = [":ml_lib"] +) py_test( name = "test_integration_comet", From 3fd9affbbbbc5f8148ab70174fb1f286d2ab5b3f Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Fri, 24 Mar 2023 18:38:25 -0700 Subject: [PATCH 10/24] change some configs Signed-off-by: Justin Yu --- python/ray/air/tests/test_experiment_restore.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/python/ray/air/tests/test_experiment_restore.py b/python/ray/air/tests/test_experiment_restore.py index 90f711c2cf4e..e43ac09a07a1 100644 --- a/python/ray/air/tests/test_experiment_restore.py +++ b/python/ray/air/tests/test_experiment_restore.py @@ -2,6 +2,7 @@ This test is meant to be an integration stress test for experiment restoration. Test setup: +(For Tuner.restore) - 8 trials, with a max of 2 running concurrently (--> 4 rounds of trials) - Each iteration takes 0.5 seconds - Each trial runs for 8 iterations --> 4 seconds @@ -10,7 +11,15 @@ - Minimum runtime: 4 rounds * 4 seconds / round = 16 seconds - Actually running it without any interrupts = ~24 seconds - The test will stop the script with a SIGINT at a random time between - 4-8 iterations after restoring. + 4-8 iterations each restore. +(For Trainer.restore) +- 1 trial with 4 workers +- Each iteration takes 0.5 seconds +- Runs for 32 iterations --> Minimum runtime = 16 seconds + - Actually running it without any interrupts = ~24 seconds +- The test will stop the script with a SIGINT at a random time between + 4-8 iterations after each restore. + Requirements: - Req 1: Reasonable runtime @@ -90,7 +99,7 @@ def test_air_experiment_restore(tmp_path, runner_type): iters_per_trial = 8 num_trials = 8 elif runner_type == "trainer": - iters_per_trial = 64 + iters_per_trial = 32 num_trials = 1 total_iters = iters_per_trial * num_trials From e1490319062fd2cc005db9c8ad8dbae6e236ba22 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Sat, 25 Mar 2023 00:02:36 -0700 Subject: [PATCH 11/24] add helper file to the test srcs Signed-off-by: Justin Yu --- python/ray/air/BUILD | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/ray/air/BUILD b/python/ray/air/BUILD index 89b38c00124c..d6d45c44ae05 100644 --- a/python/ray/air/BUILD +++ b/python/ray/air/BUILD @@ -76,8 +76,11 @@ py_test( py_test( name = "test_experiment_restore", - size = "large", - srcs = ["tests/test_experiment_restore.py"], + size = "medium", + srcs = [ + "tests/test_experiment_restore.py", + "tests/_test_experiment_restore_run.py" + ], tags = ["team:ml", "exclusive"], deps = [":ml_lib"] ) From 309a5df8bced0286763a63ed110e882baeabec77 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Mon, 10 Apr 2023 11:14:07 -0700 Subject: [PATCH 12/24] Fix test for trainer (don't serialize datasets) Signed-off-by: Justin Yu --- .../air/tests/_test_experiment_restore_run.py | 9 ++---- .../ray/air/tests/test_experiment_restore.py | 29 ++++++++++--------- python/ray/train/base_trainer.py | 28 ++++++++++++------ 3 files changed, 36 insertions(+), 30 deletions(-) diff --git a/python/ray/air/tests/_test_experiment_restore_run.py b/python/ray/air/tests/_test_experiment_restore_run.py index 951526444afc..99d94cbb97f5 100644 --- a/python/ray/air/tests/_test_experiment_restore_run.py +++ b/python/ray/air/tests/_test_experiment_restore_run.py @@ -85,9 +85,7 @@ def train_fn(config, data=None): start = checkpoint.to_dict()["iteration"] + 1 if checkpoint else 1 training_started_marker = Path(os.environ.get("RUN_STARTED_MARKER", "asdf.py")) - - if training_started_marker.exists(): - training_started_marker.unlink() + training_started_marker.unlink(missing_ok=True) for iteration in range(start, ITERATIONS_PER_TRIAL + 1): time.sleep(TIME_PER_ITER_S) @@ -135,10 +133,7 @@ def train_fn(config, data=None): def train_loop_per_worker(config): # Wrap the other train_fn with a check for the dataset. - assert ( - session.get_dataset_shard("train").count() - == dataset_size // num_workers - ) + assert session.get_dataset_shard("train") train_fn(config) datasets = {"train": ray.data.range(dataset_size)} diff --git a/python/ray/air/tests/test_experiment_restore.py b/python/ray/air/tests/test_experiment_restore.py index e43ac09a07a1..200332064739 100644 --- a/python/ray/air/tests/test_experiment_restore.py +++ b/python/ray/air/tests/test_experiment_restore.py @@ -9,21 +9,19 @@ - Each round of 2 trials should take 4 seconds - Without any interrupts/restoration: - Minimum runtime: 4 rounds * 4 seconds / round = 16 seconds - - Actually running it without any interrupts = ~24 seconds - The test will stop the script with a SIGINT at a random time between 4-8 iterations each restore. (For Trainer.restore) - 1 trial with 4 workers - Each iteration takes 0.5 seconds - Runs for 32 iterations --> Minimum runtime = 16 seconds - - Actually running it without any interrupts = ~24 seconds - The test will stop the script with a SIGINT at a random time between 4-8 iterations after each restore. Requirements: - Req 1: Reasonable runtime - - The experiment should finish within 1.5 * 24 = 36 seconds. + - The experiment should finish within 1.5 * 16 = 24 seconds. - 1.5x is the passing threshold. - Req 2: Training progress persisted - The experiment should progress monotonically. @@ -37,6 +35,7 @@ from pathlib import Path import pytest import time +import shutil import signal import subprocess import sys @@ -74,7 +73,6 @@ def print_message(message): print("\n") -# TODO(ml-team): "trainer" doesn't work @pytest.mark.parametrize("runner_type", ["tuner", "trainer"]) def test_air_experiment_restore(tmp_path, runner_type): np.random.seed(2023) @@ -86,8 +84,6 @@ def test_air_experiment_restore(tmp_path, runner_type): callback_dump_file = tmp_path / f"{runner_type}-callback_dump_file.json" storage_path = tmp_path / "ray_results" if storage_path.exists(): - import shutil - shutil.rmtree(storage_path) run_started_marker = tmp_path / "run_started_marker" @@ -117,10 +113,13 @@ def test_air_experiment_restore(tmp_path, runner_type): } # Pass criteria - no_interrupts_runtime = 24.0 + no_interrupts_runtime = 16.0 passing_factor = 1.5 passing_runtime = no_interrupts_runtime * passing_factor - print(f"Experiment should finish with a total runtime <= {passing_runtime}.") + print( + "\n\nExperiment should finish with a total runtime <= " + f"{passing_runtime} seconds." + ) # Variables used in the loop return_code = None @@ -212,12 +211,14 @@ def test_air_experiment_restore(tmp_path, runner_type): # Req 3: searcher state results = ResultGrid(ExperimentAnalysis(str(storage_path / exp_name))) - # Check that all trials have unique ids assigned by the searcher - ids = [result.config["id"] for result in results] - assert sorted(ids) == list(range(1, num_trials + 1)), ( - "Expected the searcher to assign increasing id for each trial, but got:" - f"{ids}" - ) + # Check that all trials have unique ids assigned by the searcher (if applicable) + ids = [result.config.get("id", -1) for result in results] + ids = [id for id in ids if id >= 0] + if ids: + assert sorted(ids) == list(range(1, num_trials + 1)), ( + "Expected the searcher to assign increasing id for each trial, but got:" + f"{ids}" + ) # Req 4: callback state with open(callback_dump_file, "r") as f: diff --git a/python/ray/train/base_trainer.py b/python/ray/train/base_trainer.py index 40c89b64f27e..eb674ad0d336 100644 --- a/python/ray/train/base_trainer.py +++ b/python/ray/train/base_trainer.py @@ -293,19 +293,16 @@ def training_loop(self): assert trainer_state_path.exists() with open(trainer_state_path, "rb") as fp: - original_trainer = pickle.load(fp) - if type(original_trainer) is not cls: + trainer_cls, param_dict = pickle.load(fp) + if trainer_cls is not cls: warnings.warn( f"Invalid trainer type. You are attempting to restore a trainer of type" - f" {type(original_trainer)} with `{cls.__name__}.restore`, " + f" {trainer_cls} with `{cls.__name__}.restore`, " "which will most likely fail. " - f"Use `{type(original_trainer).__name__}.restore` instead." + f"Use `{trainer_cls.__name__}.restore` instead." ) - # Get the param dict used to initialize the original trainer - param_dict = original_trainer._param_dict - - original_datasets = original_trainer.datasets or {} + original_datasets = param_dict.pop("datasets", {}) if original_datasets and not datasets: raise ValueError( "The following datasets need to be provided again on restore: " @@ -624,9 +621,22 @@ def _save(self, experiment_path: Union[str, Path]): set of parameters can be passed in again), the argument will be loaded from this saved one. """ + param_dict = self._param_dict.copy() + datasets = param_dict.pop("datasets", {}) + + def raise_fn(): + raise RuntimeError + + if datasets: + param_dict["datasets"] = { + dataset_name: raise_fn for dataset_name in datasets + } + + cls_and_param_dict = (self.__class__, param_dict) + experiment_path = Path(experiment_path) with open(experiment_path / _TRAINER_PKL, "wb") as fp: - pickle.dump(self, fp) + pickle.dump(cls_and_param_dict, fp) def _extract_fields_for_tuner_param_space(self) -> Dict: """Extracts fields to be included in `Tuner.param_space`. From 0c4dd6e86359204fb9d53515b038a46d42028042 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Mon, 10 Apr 2023 11:23:17 -0700 Subject: [PATCH 13/24] Improve some docstrings Signed-off-by: Justin Yu --- .../air/tests/_test_experiment_restore_run.py | 6 +- .../ray/air/tests/test_experiment_restore.py | 56 +++++++++---------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/python/ray/air/tests/_test_experiment_restore_run.py b/python/ray/air/tests/_test_experiment_restore_run.py index 99d94cbb97f5..f251d80a50e7 100644 --- a/python/ray/air/tests/_test_experiment_restore_run.py +++ b/python/ray/air/tests/_test_experiment_restore_run.py @@ -80,11 +80,13 @@ def restore(self, checkpoint_path: str): self._trial_count = state["trial_count"] -def train_fn(config, data=None): +def train_fn(config: dict, data: Optional[dict] = None): checkpoint = session.get_checkpoint() start = checkpoint.to_dict()["iteration"] + 1 if checkpoint else 1 - training_started_marker = Path(os.environ.get("RUN_STARTED_MARKER", "asdf.py")) + training_started_marker = Path( + os.environ.get("RUN_STARTED_MARKER", "/tmp/does-not-exist") + ) training_started_marker.unlink(missing_ok=True) for iteration in range(start, ITERATIONS_PER_TRIAL + 1): diff --git a/python/ray/air/tests/test_experiment_restore.py b/python/ray/air/tests/test_experiment_restore.py index 200332064739..f79d4d9f145d 100644 --- a/python/ray/air/tests/test_experiment_restore.py +++ b/python/ray/air/tests/test_experiment_restore.py @@ -1,22 +1,25 @@ """ This test is meant to be an integration stress test for experiment restoration. + Test setup: -(For Tuner.restore) -- 8 trials, with a max of 2 running concurrently (--> 4 rounds of trials) -- Each iteration takes 0.5 seconds -- Each trial runs for 8 iterations --> 4 seconds -- Each round of 2 trials should take 4 seconds -- Without any interrupts/restoration: - - Minimum runtime: 4 rounds * 4 seconds / round = 16 seconds -- The test will stop the script with a SIGINT at a random time between - 4-8 iterations each restore. -(For Trainer.restore) -- 1 trial with 4 workers -- Each iteration takes 0.5 seconds -- Runs for 32 iterations --> Minimum runtime = 16 seconds -- The test will stop the script with a SIGINT at a random time between - 4-8 iterations after each restore. + +- For Tuner.restore: + - 8 trials, with a max of 2 running concurrently (--> 4 rounds of trials) + - Each iteration takes 0.5 seconds + - Each trial runs for 8 iterations --> 4 seconds + - Each round of 2 trials should take 4 seconds + - Without any interrupts/restoration: + - Minimum runtime: 4 rounds * 4 seconds / round = 16 seconds + - The test will stop the script with a SIGINT at a random time between + 4-8 iterations each restore. + +- For Trainer.restore: + - 1 trial with 4 workers + - Each iteration takes 0.5 seconds + - Runs for 32 iterations --> Minimum runtime = 16 seconds + - The test will stop the script with a SIGINT at a random time between + 4-8 iterations after each restore. Requirements: @@ -25,11 +28,12 @@ - 1.5x is the passing threshold. - Req 2: Training progress persisted - The experiment should progress monotonically. - - The experiment shouldn't "go backward" at any point. + (The training iteration shouldn't go backward at any point) - Trials shouldn't start from scratch. - Req 3: Searcher state saved/restored correctly - Req 4: Callback state saved/restored correctly """ + import json import numpy as np from pathlib import Path @@ -40,8 +44,6 @@ import subprocess import sys -import ray - from ray.tune.result_grid import ResultGrid from ray.tune.analysis import ExperimentAnalysis @@ -49,18 +51,14 @@ _RUN_SCRIPT_FILENAME = "_test_experiment_restore_run.py" -@pytest.fixture -def ray_start_4_cpus(): - address_info = ray.init(num_cpus=4) - yield address_info - # The code after the yield will run as teardown code. - ray.shutdown() - - -def kill_process_if_needed(process, timeout_s=10): +def kill_process_if_needed( + process: subprocess.Popen, timeout_s: float = 10, poll_interval_s: float = 1.0 +): + """Kills a process if it hasn't finished in `timeout_s` seconds. + Polls every `poll_interval_s` seconds to check if the process is still running.""" kill_timeout = time.monotonic() + timeout_s while process.poll() is None and time.monotonic() < kill_timeout: - time.sleep(1) + time.sleep(poll_interval_s) if process.poll() is None: process.terminate() @@ -74,7 +72,7 @@ def print_message(message): @pytest.mark.parametrize("runner_type", ["tuner", "trainer"]) -def test_air_experiment_restore(tmp_path, runner_type): +def test_experiment_restore(tmp_path, runner_type): np.random.seed(2023) script_path = Path(__file__).parent / _RUN_SCRIPT_FILENAME From 132e6e4341cf2b3782f0b4915ed7cfa73d621eb6 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Mon, 10 Apr 2023 11:40:35 -0700 Subject: [PATCH 14/24] add csv datasource Signed-off-by: Justin Yu --- python/ray/air/tests/_test_experiment_restore_run.py | 6 +++++- python/ray/air/tests/test_experiment_restore.py | 8 +++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/python/ray/air/tests/_test_experiment_restore_run.py b/python/ray/air/tests/_test_experiment_restore_run.py index f251d80a50e7..40e22224274a 100644 --- a/python/ray/air/tests/_test_experiment_restore_run.py +++ b/python/ray/air/tests/_test_experiment_restore_run.py @@ -19,6 +19,7 @@ CALLBACK_DUMP_FILE = os.environ.get( "CALLBACK_DUMP_FILE", "/tmp/callback_dump_file.json" ) +CSV_DATA_FILE = os.environ.get("CSV_DATA_FILE", "/tmp/dummy.csv") TIME_PER_ITER_S = float(os.environ.get("TIME_PER_ITER_S", "0.5")) NUM_TRIALS = int(os.environ.get("NUM_TRIALS", "1")) @@ -138,7 +139,10 @@ def train_loop_per_worker(config): assert session.get_dataset_shard("train") train_fn(config) - datasets = {"train": ray.data.range(dataset_size)} + datasets = { + "train": ray.data.range(dataset_size), + "valid": ray.data.read_csv(CSV_DATA_FILE), + } if DataParallelTrainer.can_restore(experiment_path): trainer = DataParallelTrainer.restore( diff --git a/python/ray/air/tests/test_experiment_restore.py b/python/ray/air/tests/test_experiment_restore.py index f79d4d9f145d..1a3e910c2a27 100644 --- a/python/ray/air/tests/test_experiment_restore.py +++ b/python/ray/air/tests/test_experiment_restore.py @@ -36,6 +36,7 @@ import json import numpy as np +import pandas as pd from pathlib import Path import pytest import time @@ -84,6 +85,10 @@ def test_experiment_restore(tmp_path, runner_type): if storage_path.exists(): shutil.rmtree(storage_path) + csv_file = str(tmp_path / "dummy_data.csv") + dummy_df = pd.DataFrame({"x": np.arange(128), "y": 2 * np.arange(128)}) + dummy_df.to_csv(csv_file) + run_started_marker = tmp_path / "run_started_marker" time_per_iter_s = 0.5 @@ -108,13 +113,14 @@ def test_experiment_restore(tmp_path, runner_type): "ITERATIONS_PER_TRIAL": str(iters_per_trial), "NUM_TRIALS": str(num_trials), "MAX_CONCURRENT_TRIALS": str(max_concurrent), + "CSV_DATA_FILE": csv_file, } # Pass criteria no_interrupts_runtime = 16.0 passing_factor = 1.5 passing_runtime = no_interrupts_runtime * passing_factor - print( + print_message( "\n\nExperiment should finish with a total runtime <= " f"{passing_runtime} seconds." ) From cc38a5d7edba1bbcacc11c55714401cca687fc23 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Mon, 10 Apr 2023 11:40:52 -0700 Subject: [PATCH 15/24] fix total runtime calculation to account for early end Signed-off-by: Justin Yu --- .../ray/air/tests/test_experiment_restore.py | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/python/ray/air/tests/test_experiment_restore.py b/python/ray/air/tests/test_experiment_restore.py index 1a3e910c2a27..a83dae2cea90 100644 --- a/python/ray/air/tests/test_experiment_restore.py +++ b/python/ray/air/tests/test_experiment_restore.py @@ -150,22 +150,27 @@ def test_experiment_restore(tmp_path, runner_type): return_code = run.poll() break - timeout = min( + timeout_s = min( np.random.uniform(4 * time_per_iter_s, 8 * time_per_iter_s), passing_runtime - total_runtime, ) + polling_interval_s = 0.1 print_message( "Training has started...\n" - f"Interrupting after {timeout:.2f} seconds\n" + f"Interrupting after {timeout_s:.2f} seconds\n" f"Currently at {total_runtime:.2f}/{passing_runtime} seconds" ) # Sleep for a random amount of time, then stop the run. - time.sleep(timeout) - total_runtime += timeout - - if run.poll() is None: + start_time = time.monotonic() + stopping_time = start_time + timeout_s + while time.monotonic() < stopping_time: + time.sleep(polling_interval_s) + total_runtime += time.monotonic() - start_time + + return_code = run.poll() + if return_code is None: # Send "SIGINT" to stop the run print_message(f"Sending SIGUSR1 to run #{run_iter} w/ PID = {run.pid}") run.send_signal(signal.SIGUSR1) @@ -174,8 +179,6 @@ def test_experiment_restore(tmp_path, runner_type): kill_process_if_needed(run) else: print_message("Run has already terminated!") - return_code = run.poll() - assert return_code break # Check up on the results. @@ -191,7 +194,7 @@ def test_experiment_restore(tmp_path, runner_type): print_message( f"Total number of restorations = {run_iter}\n" - f"Total runtime = {total_runtime}\n" + f"Total runtime = {total_runtime:.2f}\n" f"Return code = {return_code}" ) From 83a174e3ab994e7ac8e5a9af9cc7c9fd6fffea41 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Mon, 10 Apr 2023 11:46:25 -0700 Subject: [PATCH 16/24] switch to using storage_path Signed-off-by: Justin Yu --- .../air/tests/_test_experiment_restore_run.py | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/python/ray/air/tests/_test_experiment_restore_run.py b/python/ray/air/tests/_test_experiment_restore_run.py index 40e22224274a..7b9dee8e59b3 100644 --- a/python/ray/air/tests/_test_experiment_restore_run.py +++ b/python/ray/air/tests/_test_experiment_restore_run.py @@ -104,6 +104,13 @@ def train_fn(config: dict, data: Optional[dict] = None): ray.init() + run_config = air.RunConfig( + storage_path=STORAGE_PATH, + name=EXP_NAME, + checkpoint_config=air.CheckpointConfig(num_to_keep=1), + callbacks=[StatefulCallback()], + ) + if RUNNER_TYPE == "tuner": trainable = tune.with_resources(train_fn, resources={"CPU": 1}) trainable = tune.with_parameters(trainable, data={"dummy_data": [1, 2, 3]}) @@ -115,12 +122,7 @@ def train_fn(config: dict, data: Optional[dict] = None): else: tuner = tune.Tuner( trainable, - run_config=air.RunConfig( - local_dir=STORAGE_PATH, - name=EXP_NAME, - checkpoint_config=air.CheckpointConfig(num_to_keep=1), - callbacks=[StatefulCallback()], - ), + run_config=run_config, tune_config=tune.TuneConfig( num_samples=8, max_concurrent_trials=2, @@ -157,12 +159,7 @@ def train_loop_per_worker(config): scaling_config=air.ScalingConfig( num_workers=num_workers, trainer_resources={"CPU": 0} ), - run_config=air.RunConfig( - local_dir=STORAGE_PATH, - name=EXP_NAME, - checkpoint_config=air.CheckpointConfig(num_to_keep=1), - callbacks=[StatefulCallback()], - ), + run_config=run_config, ) result = trainer.fit() From 1edfb528691b5c212366f89182c134052c8d1ef6 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Mon, 10 Apr 2023 15:39:42 -0700 Subject: [PATCH 17/24] fix for py37 Signed-off-by: Justin Yu --- python/ray/air/tests/_test_experiment_restore_run.py | 7 ++++++- python/ray/air/tests/test_experiment_restore.py | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/python/ray/air/tests/_test_experiment_restore_run.py b/python/ray/air/tests/_test_experiment_restore_run.py index 7b9dee8e59b3..c609b5fb8bac 100644 --- a/python/ray/air/tests/_test_experiment_restore_run.py +++ b/python/ray/air/tests/_test_experiment_restore_run.py @@ -88,7 +88,12 @@ def train_fn(config: dict, data: Optional[dict] = None): training_started_marker = Path( os.environ.get("RUN_STARTED_MARKER", "/tmp/does-not-exist") ) - training_started_marker.unlink(missing_ok=True) + if training_started_marker.exists(): + # Multiple workers may be trying to delete the same marker + try: + training_started_marker.unlink() + except: + pass for iteration in range(start, ITERATIONS_PER_TRIAL + 1): time.sleep(TIME_PER_ITER_S) diff --git a/python/ray/air/tests/test_experiment_restore.py b/python/ray/air/tests/test_experiment_restore.py index a83dae2cea90..55528af4b048 100644 --- a/python/ray/air/tests/test_experiment_restore.py +++ b/python/ray/air/tests/test_experiment_restore.py @@ -121,8 +121,8 @@ def test_experiment_restore(tmp_path, runner_type): passing_factor = 1.5 passing_runtime = no_interrupts_runtime * passing_factor print_message( - "\n\nExperiment should finish with a total runtime <= " - f"{passing_runtime} seconds." + "Experiment should finish with a total runtime of\n" + f"<= {passing_runtime} seconds." ) # Variables used in the loop From 4aea8449e2b8651e29520930e9b58f4fc6cd5952 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Mon, 10 Apr 2023 15:40:35 -0700 Subject: [PATCH 18/24] Fix lint Signed-off-by: Justin Yu --- python/ray/air/tests/_test_experiment_restore_run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/air/tests/_test_experiment_restore_run.py b/python/ray/air/tests/_test_experiment_restore_run.py index c609b5fb8bac..db998b86b0b5 100644 --- a/python/ray/air/tests/_test_experiment_restore_run.py +++ b/python/ray/air/tests/_test_experiment_restore_run.py @@ -92,7 +92,7 @@ def train_fn(config: dict, data: Optional[dict] = None): # Multiple workers may be trying to delete the same marker try: training_started_marker.unlink() - except: + except Exception: pass for iteration in range(start, ITERATIONS_PER_TRIAL + 1): From 238be73b1b6a0f6970f61e8258b0f36ee8c8b00d Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 11 Apr 2023 10:07:05 -0700 Subject: [PATCH 19/24] Address some style comments Signed-off-by: Justin Yu --- .../air/tests/_test_experiment_restore_run.py | 113 ++++++++++-------- .../ray/air/tests/test_experiment_restore.py | 7 +- 2 files changed, 65 insertions(+), 55 deletions(-) diff --git a/python/ray/air/tests/_test_experiment_restore_run.py b/python/ray/air/tests/_test_experiment_restore_run.py index db998b86b0b5..41dae20f59a7 100644 --- a/python/ray/air/tests/_test_experiment_restore_run.py +++ b/python/ray/air/tests/_test_experiment_restore_run.py @@ -104,6 +104,63 @@ def train_fn(config: dict, data: Optional[dict] = None): ) +def tune(experiment_path: str, run_config: air.RunConfig) -> tune.ResultGrid: + trainable = tune.with_resources(train_fn, resources={"CPU": 1}) + trainable = tune.with_parameters(trainable, data={"dummy_data": [1, 2, 3]}) + + if tune.Tuner.can_restore(experiment_path): + tuner = tune.Tuner.restore( + experiment_path, trainable=trainable, resume_errored=True + ) + else: + tuner = tune.Tuner( + trainable, + run_config=run_config, + tune_config=tune.TuneConfig( + num_samples=8, + max_concurrent_trials=2, + search_alg=StatefulSearcher(), + ), + ) + + result_grid = tuner.fit() + return result_grid + + +def train(experiment_path: str, run_config: air.RunConfig) -> air.Result: + dataset_size = 128 + num_workers = 4 + + def train_loop_per_worker(config): + # Wrap the other train_fn with a check for the dataset. + assert session.get_dataset_shard("train") + train_fn(config) + + datasets = { + "train": ray.data.range(dataset_size), + "valid": ray.data.read_csv(CSV_DATA_FILE), + } + + if DataParallelTrainer.can_restore(experiment_path): + trainer = DataParallelTrainer.restore( + experiment_path, + datasets=datasets, + train_loop_per_worker=train_loop_per_worker, + ) + else: + trainer = DataParallelTrainer( + train_loop_per_worker, + datasets=datasets, + scaling_config=air.ScalingConfig( + num_workers=num_workers, trainer_resources={"CPU": 0} + ), + run_config=run_config, + ) + + result = trainer.fit() + return result + + if __name__ == "__main__": experiment_path = os.path.join(STORAGE_PATH, EXP_NAME) @@ -117,54 +174,10 @@ def train_fn(config: dict, data: Optional[dict] = None): ) if RUNNER_TYPE == "tuner": - trainable = tune.with_resources(train_fn, resources={"CPU": 1}) - trainable = tune.with_parameters(trainable, data={"dummy_data": [1, 2, 3]}) - - if tune.Tuner.can_restore(experiment_path): - tuner = tune.Tuner.restore( - experiment_path, trainable=trainable, resume_errored=True - ) - else: - tuner = tune.Tuner( - trainable, - run_config=run_config, - tune_config=tune.TuneConfig( - num_samples=8, - max_concurrent_trials=2, - search_alg=StatefulSearcher(), - ), - ) - - result_grid = tuner.fit() - + tune(experiment_path, run_config) elif RUNNER_TYPE == "trainer": - dataset_size = 128 - num_workers = 4 - - def train_loop_per_worker(config): - # Wrap the other train_fn with a check for the dataset. - assert session.get_dataset_shard("train") - train_fn(config) - - datasets = { - "train": ray.data.range(dataset_size), - "valid": ray.data.read_csv(CSV_DATA_FILE), - } - - if DataParallelTrainer.can_restore(experiment_path): - trainer = DataParallelTrainer.restore( - experiment_path, - datasets=datasets, - train_loop_per_worker=train_loop_per_worker, - ) - else: - trainer = DataParallelTrainer( - train_loop_per_worker, - datasets=datasets, - scaling_config=air.ScalingConfig( - num_workers=num_workers, trainer_resources={"CPU": 0} - ), - run_config=run_config, - ) - - result = trainer.fit() + train(experiment_path, run_config) + else: + raise NotImplementedError( + "`RUNNER_TYPE` environment var must be one of ['tuner', 'trainer']" + ) diff --git a/python/ray/air/tests/test_experiment_restore.py b/python/ray/air/tests/test_experiment_restore.py index 55528af4b048..18b475c19daf 100644 --- a/python/ray/air/tests/test_experiment_restore.py +++ b/python/ray/air/tests/test_experiment_restore.py @@ -65,11 +65,8 @@ def kill_process_if_needed( def print_message(message): - print("\n") - print("=" * 50) - print(message) - print("=" * 50) - print("\n") + sep = "=" * 50 + print(f"{sep}\n{message}\n{sep}\n") @pytest.mark.parametrize("runner_type", ["tuner", "trainer"]) From fae1affc2ae2e8847bb945c4e9c4882eaf8d189b Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 11 Apr 2023 10:22:15 -0700 Subject: [PATCH 20/24] Some cleanup Signed-off-by: Justin Yu --- .../ray/air/tests/test_experiment_restore.py | 108 +++++++++--------- 1 file changed, 55 insertions(+), 53 deletions(-) diff --git a/python/ray/air/tests/test_experiment_restore.py b/python/ray/air/tests/test_experiment_restore.py index 18b475c19daf..8050cc64b807 100644 --- a/python/ray/air/tests/test_experiment_restore.py +++ b/python/ray/air/tests/test_experiment_restore.py @@ -1,39 +1,3 @@ -""" -This test is meant to be an integration stress test for experiment restoration. - - -Test setup: - -- For Tuner.restore: - - 8 trials, with a max of 2 running concurrently (--> 4 rounds of trials) - - Each iteration takes 0.5 seconds - - Each trial runs for 8 iterations --> 4 seconds - - Each round of 2 trials should take 4 seconds - - Without any interrupts/restoration: - - Minimum runtime: 4 rounds * 4 seconds / round = 16 seconds - - The test will stop the script with a SIGINT at a random time between - 4-8 iterations each restore. - -- For Trainer.restore: - - 1 trial with 4 workers - - Each iteration takes 0.5 seconds - - Runs for 32 iterations --> Minimum runtime = 16 seconds - - The test will stop the script with a SIGINT at a random time between - 4-8 iterations after each restore. - - -Requirements: -- Req 1: Reasonable runtime - - The experiment should finish within 1.5 * 16 = 24 seconds. - - 1.5x is the passing threshold. -- Req 2: Training progress persisted - - The experiment should progress monotonically. - (The training iteration shouldn't go backward at any point) - - Trials shouldn't start from scratch. -- Req 3: Searcher state saved/restored correctly -- Req 4: Callback state saved/restored correctly -""" - import json import numpy as np import pandas as pd @@ -52,7 +16,7 @@ _RUN_SCRIPT_FILENAME = "_test_experiment_restore_run.py" -def kill_process_if_needed( +def _kill_process_if_needed( process: subprocess.Popen, timeout_s: float = 10, poll_interval_s: float = 1.0 ): """Kills a process if it hasn't finished in `timeout_s` seconds. @@ -64,13 +28,50 @@ def kill_process_if_needed( process.terminate() -def print_message(message): +def _print_message(message): sep = "=" * 50 print(f"{sep}\n{message}\n{sep}\n") @pytest.mark.parametrize("runner_type", ["tuner", "trainer"]) def test_experiment_restore(tmp_path, runner_type): + """ + This is an integration stress test for experiment restoration. + + + Test setup: + + - For Tuner.restore: + - 8 trials, with a max of 2 running concurrently (--> 4 rounds of trials) + - Each iteration takes 0.5 seconds + - Each trial runs for 8 iterations --> 4 seconds + - Each round of 2 trials should take 4 seconds + - Without any interrupts/restoration: + - Minimum runtime: 4 rounds * 4 seconds / round = 16 seconds + - The test will stop the script with a SIGINT at a random time between + 4-8 iterations each restore. + + - For Trainer.restore: + - 1 trial with 4 workers + - Each iteration takes 0.5 seconds + - Runs for 32 iterations --> Minimum runtime = 16 seconds + - The test will stop the script with a SIGINT at a random time between + 4-8 iterations after each restore. + + + Requirements: + - Req 1: Reasonable runtime + - The experiment should finish within 1.5 * 16 = 24 seconds. + - 1.5x is the passing threshold. + - 16 seconds is the minimum runtime. + - Req 2: Training progress persisted + - The experiment should progress monotonically. + (The training iteration shouldn't go backward at any point) + - Trials shouldn't start from scratch. + - Req 3: Searcher state saved/restored correctly + - Req 4: Callback state saved/restored correctly + """ + np.random.seed(2023) script_path = Path(__file__).parent / _RUN_SCRIPT_FILENAME @@ -117,7 +118,7 @@ def test_experiment_restore(tmp_path, runner_type): no_interrupts_runtime = 16.0 passing_factor = 1.5 passing_runtime = no_interrupts_runtime * passing_factor - print_message( + _print_message( "Experiment should finish with a total runtime of\n" f"<= {passing_runtime} seconds." ) @@ -128,19 +129,20 @@ def test_experiment_restore(tmp_path, runner_type): run_iter = 0 progress_history = [] + poll_interval_s = 0.1 + test_start_time = time.monotonic() + while total_runtime < passing_runtime: run_started_marker.write_text("", encoding="utf-8") - run = subprocess.Popen( - [sys.executable, script_path], env=env # , stderr=subprocess.PIPE - ) + run = subprocess.Popen([sys.executable, script_path], env=env) run_iter += 1 - print_message(f"Started run #{run_iter} w/ PID = {run.pid}") + _print_message(f"Started run #{run_iter} w/ PID = {run.pid}") # Start the timer after the first trial has entered its training loop. while run.poll() is None and run_started_marker.exists(): - time.sleep(0.05) + time.sleep(poll_interval_s) # If the run already finished, then exit immediately. if run.poll() is not None: @@ -151,9 +153,8 @@ def test_experiment_restore(tmp_path, runner_type): np.random.uniform(4 * time_per_iter_s, 8 * time_per_iter_s), passing_runtime - total_runtime, ) - polling_interval_s = 0.1 - print_message( + _print_message( "Training has started...\n" f"Interrupting after {timeout_s:.2f} seconds\n" f"Currently at {total_runtime:.2f}/{passing_runtime} seconds" @@ -163,19 +164,19 @@ def test_experiment_restore(tmp_path, runner_type): start_time = time.monotonic() stopping_time = start_time + timeout_s while time.monotonic() < stopping_time: - time.sleep(polling_interval_s) + time.sleep(poll_interval_s) total_runtime += time.monotonic() - start_time return_code = run.poll() if return_code is None: # Send "SIGINT" to stop the run - print_message(f"Sending SIGUSR1 to run #{run_iter} w/ PID = {run.pid}") + _print_message(f"Sending SIGUSR1 to run #{run_iter} w/ PID = {run.pid}") run.send_signal(signal.SIGUSR1) # Make sure the process is stopped forcefully after a timeout. - kill_process_if_needed(run) + _kill_process_if_needed(run) else: - print_message("Run has already terminated!") + _print_message("Run has already terminated!") break # Check up on the results. @@ -183,17 +184,18 @@ def test_experiment_restore(tmp_path, runner_type): iters = [result.metrics.get("training_iteration", 0) for result in results] progress = sum(iters) / total_iters progress_history.append(progress) - print_message( + _print_message( f"Number of trials = {len(results)}\n" f"% completion = {progress} ({sum(iters)} iters / {total_iters})\n" f"Currently at {total_runtime:.2f}/{passing_runtime} seconds" ) - print_message( + _print_message( f"Total number of restorations = {run_iter}\n" f"Total runtime = {total_runtime:.2f}\n" f"Return code = {return_code}" ) + test_end_time = time.monotonic() # The script shouldn't have errored. (It should have finished by this point.) assert return_code == 0, ( @@ -238,7 +240,7 @@ def test_experiment_restore(tmp_path, runner_type): range(1, iters_per_trial + 1) ), f"Expected data from all iterations, but got: {iters}" - print_message("Success!") + _print_message(f"Success! Test took {test_end_time - test_start_time:.2f} seconds.") if __name__ == "__main__": From 66d53ccd2ca267f0cc40934b390655e09e2bdd41 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 11 Apr 2023 10:59:47 -0700 Subject: [PATCH 21/24] Fix test Signed-off-by: Justin Yu --- python/ray/air/tests/_test_experiment_restore_run.py | 8 ++++---- python/ray/air/tests/test_experiment_restore.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python/ray/air/tests/_test_experiment_restore_run.py b/python/ray/air/tests/_test_experiment_restore_run.py index 41dae20f59a7..8895c90e7a81 100644 --- a/python/ray/air/tests/_test_experiment_restore_run.py +++ b/python/ray/air/tests/_test_experiment_restore_run.py @@ -104,7 +104,7 @@ def train_fn(config: dict, data: Optional[dict] = None): ) -def tune(experiment_path: str, run_config: air.RunConfig) -> tune.ResultGrid: +def tuner(experiment_path: str, run_config: air.RunConfig) -> tune.ResultGrid: trainable = tune.with_resources(train_fn, resources={"CPU": 1}) trainable = tune.with_parameters(trainable, data={"dummy_data": [1, 2, 3]}) @@ -127,7 +127,7 @@ def tune(experiment_path: str, run_config: air.RunConfig) -> tune.ResultGrid: return result_grid -def train(experiment_path: str, run_config: air.RunConfig) -> air.Result: +def trainer(experiment_path: str, run_config: air.RunConfig) -> air.Result: dataset_size = 128 num_workers = 4 @@ -174,9 +174,9 @@ def train_loop_per_worker(config): ) if RUNNER_TYPE == "tuner": - tune(experiment_path, run_config) + tuner(experiment_path, run_config) elif RUNNER_TYPE == "trainer": - train(experiment_path, run_config) + trainer(experiment_path, run_config) else: raise NotImplementedError( "`RUNNER_TYPE` environment var must be one of ['tuner', 'trainer']" diff --git a/python/ray/air/tests/test_experiment_restore.py b/python/ray/air/tests/test_experiment_restore.py index 8050cc64b807..30492df69fe2 100644 --- a/python/ray/air/tests/test_experiment_restore.py +++ b/python/ray/air/tests/test_experiment_restore.py @@ -30,7 +30,7 @@ def _kill_process_if_needed( def _print_message(message): sep = "=" * 50 - print(f"{sep}\n{message}\n{sep}\n") + print(f"\n{sep}\n{message}\n{sep}\n") @pytest.mark.parametrize("runner_type", ["tuner", "trainer"]) From a398eea669eb35440210d5d601f42b4b1ba6e07c Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 11 Apr 2023 11:12:03 -0700 Subject: [PATCH 22/24] Add some clarifying docstring Signed-off-by: Justin Yu --- python/ray/train/base_trainer.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/python/ray/train/base_trainer.py b/python/ray/train/base_trainer.py index eb674ad0d336..21b4a658575d 100644 --- a/python/ray/train/base_trainer.py +++ b/python/ray/train/base_trainer.py @@ -614,12 +614,20 @@ def fit(self) -> Result: return result def _save(self, experiment_path: Union[str, Path]): - """Saves the trainer to a directory. - - This is used to populate a newly constructed trainer on restore. - Unless a parameter is re-specified during restoration (only a limited - set of parameters can be passed in again), the argument will be loaded - from this saved one. + """Saves the current trainer's class along with the param_dict used to + initialize the Trainer. + + This is used to construct a new trainer on restore. + Unless a parameter is re-specified during restoration (only a subset + of parameters can be passed in again), that parameter will be loaded + from the saved copy. + + Ray Datasets should not be saved as part of the state. Instead, we save the + keys and replace the dataset values with dummy functions that will + raise an error if invoked. The error only serves as a guardrail for + misuse (e.g., manually unpickling and constructing the Trainer again) + and is not typically surfaced, since datasets must be re-specified + upon restoration. """ param_dict = self._param_dict.copy() datasets = param_dict.pop("datasets", {}) From f7eda3e843dfa412d9a647df46673b1d2d834b45 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Tue, 11 Apr 2023 11:13:30 -0700 Subject: [PATCH 23/24] more clarifications Signed-off-by: Justin Yu --- python/ray/train/base_trainer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/train/base_trainer.py b/python/ray/train/base_trainer.py index 21b4a658575d..1da90bfaf7e2 100644 --- a/python/ray/train/base_trainer.py +++ b/python/ray/train/base_trainer.py @@ -614,10 +614,10 @@ def fit(self) -> Result: return result def _save(self, experiment_path: Union[str, Path]): - """Saves the current trainer's class along with the param_dict used to - initialize the Trainer. + """Saves the current trainer's class along with the `param_dict` of + parameters passed to this trainer's constructor. - This is used to construct a new trainer on restore. + This is used to recreate the trainer on restore. Unless a parameter is re-specified during restoration (only a subset of parameters can be passed in again), that parameter will be loaded from the saved copy. From f236ef9bd56d0c693add668329717c3f100906c5 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Wed, 12 Apr 2023 17:47:25 -0700 Subject: [PATCH 24/24] address comments Signed-off-by: Justin Yu --- python/ray/air/tests/_test_experiment_restore_run.py | 2 +- python/ray/air/tests/test_experiment_restore.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/ray/air/tests/_test_experiment_restore_run.py b/python/ray/air/tests/_test_experiment_restore_run.py index 8895c90e7a81..4e6daa6ac1ff 100644 --- a/python/ray/air/tests/_test_experiment_restore_run.py +++ b/python/ray/air/tests/_test_experiment_restore_run.py @@ -92,7 +92,7 @@ def train_fn(config: dict, data: Optional[dict] = None): # Multiple workers may be trying to delete the same marker try: training_started_marker.unlink() - except Exception: + except FileNotFoundError: pass for iteration in range(start, ITERATIONS_PER_TRIAL + 1): diff --git a/python/ray/air/tests/test_experiment_restore.py b/python/ray/air/tests/test_experiment_restore.py index 30492df69fe2..54ef313c6ee2 100644 --- a/python/ray/air/tests/test_experiment_restore.py +++ b/python/ray/air/tests/test_experiment_restore.py @@ -61,8 +61,8 @@ def test_experiment_restore(tmp_path, runner_type): Requirements: - Req 1: Reasonable runtime - - The experiment should finish within 1.5 * 16 = 24 seconds. - - 1.5x is the passing threshold. + - The experiment should finish within 2 * 16 = 32 seconds. + - 2x is the passing threshold. - 16 seconds is the minimum runtime. - Req 2: Training progress persisted - The experiment should progress monotonically. @@ -116,7 +116,7 @@ def test_experiment_restore(tmp_path, runner_type): # Pass criteria no_interrupts_runtime = 16.0 - passing_factor = 1.5 + passing_factor = 2 passing_runtime = no_interrupts_runtime * passing_factor _print_message( "Experiment should finish with a total runtime of\n"