Skip to content

Commit

Permalink
[tune/release] Wait for final experiment checkpoint sync to finish (r…
Browse files Browse the repository at this point in the history
…ay-project#31131)

This PR includes fixes to deflake the `tune_cloud_gcp_k8s_durable_upload` release test, including (1) including a wait for the final experiment checkpoint sync to finish and (2) fixing forced checkpointing frequency logic.

Signed-off-by: Justin Yu <[email protected]>
Signed-off-by: tmynn <[email protected]>
  • Loading branch information
justinvyu authored and tamohannes committed Jan 25, 2023
1 parent 80ebf5e commit 5d81d01
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 5 deletions.
2 changes: 1 addition & 1 deletion python/ray/tune/execution/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def on_trial_checkpoint(self, trial: Trial):
self._trial_num_checkpoints_since_last_sync[trial] += 1
if (
self._trial_num_checkpoints_since_last_sync[trial]
> self._sync_every_n_trial_checkpoints
>= self._sync_every_n_trial_checkpoints
):
self._should_force_cloud_sync = True

Expand Down
59 changes: 58 additions & 1 deletion python/ray/tune/tests/test_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
import ray
import ray.cloudpickle as pickle
from ray import tune
from ray.air import Checkpoint
from ray.air import session, Checkpoint, RunConfig
from ray.tune import TuneError
from ray.tune.syncer import Syncer, _DefaultSyncer
from ray.tune.utils.file_transfer import _pack_dir, _unpack_dir
from ray.air._internal.remote_storage import upload_to_uri, download_from_uri


@pytest.fixture
Expand Down Expand Up @@ -606,6 +607,62 @@ def test_syncer_serialize(temp_data_dirs):
pickle.dumps(syncer)


def test_final_experiment_checkpoint_sync(tmpdir):
class SlowSyncer(_DefaultSyncer):
def __init__(self, **kwargs):
super(_DefaultSyncer, self).__init__(**kwargs)
self._num_syncs = 0

def _sync_up_command(self, local_path, uri, exclude):
def slow_upload(local_path, uri, exclude):
# Sleep to check that experiment doesn't exit without waiting
time.sleep(2)
upload_to_uri(local_path, uri, exclude)
self._num_syncs += 1

return (
slow_upload,
dict(local_path=local_path, uri=uri, exclude=exclude),
)

# Long sync period so there will only be 2 experiment checkpoints:
# One at the beginning which always happens, then a forced checkpoint at the
# end of the experiment.
syncer = SlowSyncer(sync_period=60)

def train_func(config):
for i in range(8):
session.report({"score": i})
time.sleep(0.5)

tuner = tune.Tuner(
train_func,
run_config=RunConfig(
name="exp_name",
sync_config=tune.SyncConfig(
upload_dir="memory:///test_upload_dir", syncer=syncer
),
),
)
results = tuner.fit()
assert not results.errors

# Check the contents of the upload_dir immediately after the experiment
# This won't be up to date if we don't wait on the last sync
download_from_uri("memory:///test_upload_dir/exp_name", tmpdir)
cloud_results = tune.Tuner.restore(str(tmpdir)).get_results()
last_reported_iter = cloud_results[0].metrics.get("training_iteration", None)
assert last_reported_iter == 8, (
"Experiment did not wait to finish the final experiment sync before exiting. "
"The last reported training iteration synced to the remote dir was "
f"{last_reported_iter}. (None if no results are synced.)"
)
assert syncer._num_syncs == 2, (
"Should have seen 2 syncs, once at the beginning of the experiment, and one "
f"forced sync at the end. Got {syncer._num_syncs} syncs instead."
)


if __name__ == "__main__":
import sys

Expand Down
12 changes: 9 additions & 3 deletions python/ray/tune/tests/test_trial_runner_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ def delete(self, remote_dir: str) -> bool:
sync_config=SyncConfig(upload_dir="fake", syncer=syncer),
remote_checkpoint_dir="fake",
trial_checkpoint_config=checkpoint_config,
checkpoint_period=100, # Only rely on forced syncing
trial_executor=RayTrialExecutor(resource_manager=self._resourceManager()),
)

Expand All @@ -976,9 +977,14 @@ def should_checkpoint(self):
runner.step()
assert any("syncing has been triggered multiple" in x for x in buffer)

# we should sync 4 times - every 2 checkpoints, but the last sync will not
# happen as the experiment finishes before it is triggered
assert syncer.sync_up_counter == 4
# We should sync 6 times:
# The first checkpoint happens when the experiment starts,
# since no checkpoints have happened yet
# (This corresponds to the new_trial event in the runner loop)
# Then, every num_to_keep=2 checkpoints, we should perform a forced checkpoint
# which results in 5 more checkpoints (running for 10 iterations),
# giving a total of 6
assert syncer.sync_up_counter == 6

def getHangingSyncer(self, sync_period: float, sync_timeout: float):
def _hanging_sync_up_command(*args, **kwargs):
Expand Down
3 changes: 3 additions & 0 deletions python/ray/tune/tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,9 @@ class and registered trainables.

try:
runner.checkpoint(force=True)
# Wait for the final remote directory sync to finish before exiting
if runner._syncer:
runner._syncer.wait()
except Exception as e:
logger.warning(f"Trial Runner checkpointing failed: {str(e)}")

Expand Down

0 comments on commit 5d81d01

Please sign in to comment.