diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index df5946f94b39..aa7817dee87e 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -202,7 +202,7 @@ def on_trial_checkpoint(self, trial: Trial): self._trial_num_checkpoints_since_last_sync[trial] += 1 if ( self._trial_num_checkpoints_since_last_sync[trial] - > self._sync_every_n_trial_checkpoints + >= self._sync_every_n_trial_checkpoints ): self._should_force_cloud_sync = True diff --git a/python/ray/tune/tests/test_syncer.py b/python/ray/tune/tests/test_syncer.py index 9627f140b032..6cd5496bd8dc 100644 --- a/python/ray/tune/tests/test_syncer.py +++ b/python/ray/tune/tests/test_syncer.py @@ -13,10 +13,11 @@ import ray import ray.cloudpickle as pickle from ray import tune -from ray.air import Checkpoint +from ray.air import session, Checkpoint, RunConfig from ray.tune import TuneError from ray.tune.syncer import Syncer, _DefaultSyncer from ray.tune.utils.file_transfer import _pack_dir, _unpack_dir +from ray.air._internal.remote_storage import upload_to_uri, download_from_uri @pytest.fixture @@ -606,6 +607,62 @@ def test_syncer_serialize(temp_data_dirs): pickle.dumps(syncer) +def test_final_experiment_checkpoint_sync(tmpdir): + class SlowSyncer(_DefaultSyncer): + def __init__(self, **kwargs): + super(_DefaultSyncer, self).__init__(**kwargs) + self._num_syncs = 0 + + def _sync_up_command(self, local_path, uri, exclude): + def slow_upload(local_path, uri, exclude): + # Sleep to check that experiment doesn't exit without waiting + time.sleep(2) + upload_to_uri(local_path, uri, exclude) + self._num_syncs += 1 + + return ( + slow_upload, + dict(local_path=local_path, uri=uri, exclude=exclude), + ) + + # Long sync period so there will only be 2 experiment checkpoints: + # One at the beginning which always happens, then a forced checkpoint at the + # end of the experiment. + syncer = SlowSyncer(sync_period=60) + + def train_func(config): + for i in range(8): + session.report({"score": i}) + time.sleep(0.5) + + tuner = tune.Tuner( + train_func, + run_config=RunConfig( + name="exp_name", + sync_config=tune.SyncConfig( + upload_dir="memory:///test_upload_dir", syncer=syncer + ), + ), + ) + results = tuner.fit() + assert not results.errors + + # Check the contents of the upload_dir immediately after the experiment + # This won't be up to date if we don't wait on the last sync + download_from_uri("memory:///test_upload_dir/exp_name", tmpdir) + cloud_results = tune.Tuner.restore(str(tmpdir)).get_results() + last_reported_iter = cloud_results[0].metrics.get("training_iteration", None) + assert last_reported_iter == 8, ( + "Experiment did not wait to finish the final experiment sync before exiting. " + "The last reported training iteration synced to the remote dir was " + f"{last_reported_iter}. (None if no results are synced.)" + ) + assert syncer._num_syncs == 2, ( + "Should have seen 2 syncs, once at the beginning of the experiment, and one " + f"forced sync at the end. Got {syncer._num_syncs} syncs instead." + ) + + if __name__ == "__main__": import sys diff --git a/python/ray/tune/tests/test_trial_runner_3.py b/python/ray/tune/tests/test_trial_runner_3.py index 19bcc7723695..9b0c10f6db4b 100644 --- a/python/ray/tune/tests/test_trial_runner_3.py +++ b/python/ray/tune/tests/test_trial_runner_3.py @@ -953,6 +953,7 @@ def delete(self, remote_dir: str) -> bool: sync_config=SyncConfig(upload_dir="fake", syncer=syncer), remote_checkpoint_dir="fake", trial_checkpoint_config=checkpoint_config, + checkpoint_period=100, # Only rely on forced syncing trial_executor=RayTrialExecutor(resource_manager=self._resourceManager()), ) @@ -976,9 +977,14 @@ def should_checkpoint(self): runner.step() assert any("syncing has been triggered multiple" in x for x in buffer) - # we should sync 4 times - every 2 checkpoints, but the last sync will not - # happen as the experiment finishes before it is triggered - assert syncer.sync_up_counter == 4 + # We should sync 6 times: + # The first checkpoint happens when the experiment starts, + # since no checkpoints have happened yet + # (This corresponds to the new_trial event in the runner loop) + # Then, every num_to_keep=2 checkpoints, we should perform a forced checkpoint + # which results in 5 more checkpoints (running for 10 iterations), + # giving a total of 6 + assert syncer.sync_up_counter == 6 def getHangingSyncer(self, sync_period: float, sync_timeout: float): def _hanging_sync_up_command(*args, **kwargs): diff --git a/python/ray/tune/tune.py b/python/ray/tune/tune.py index feb758411509..cdcaa6a7fdb3 100644 --- a/python/ray/tune/tune.py +++ b/python/ray/tune/tune.py @@ -736,6 +736,9 @@ class and registered trainables. try: runner.checkpoint(force=True) + # Wait for the final remote directory sync to finish before exiting + if runner._syncer: + runner._syncer.wait() except Exception as e: logger.warning(f"Trial Runner checkpointing failed: {str(e)}")