Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tune] TrialRunner checkpointing shouldn't fail if ray.data.Dataset w/o lineage captured in trial config #33565

Merged
merged 5 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/pipeline.ml.yml
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@
instance_size: medium
commands:
- cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT
- TUNE_TESTING=1 ./ci/env/install-dependencies.sh
- TUNE_TESTING=1 DATA_PROCESSING_TESTING=1 ./ci/env/install-dependencies.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only
--test_tag_filters=medium_instance,-py37,-soft_imports,-gpu_only,-rllib,-multinode
Expand Down
5 changes: 5 additions & 0 deletions python/ray/tune/execution/experiment_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ def checkpoint(
# Checkpoint
checkpoint_time_start = time.monotonic()

# NOTE: This context manager is for Ray Datasets captured in a trial config.
# This is the case when *tuning over datasets*.
# If the datasets have already been full executed, then serializing
# block refs means that this checkpoint is not usable in a new Ray cluster.
# This context will serialize the dataset execution plan instead, if available.
with out_of_band_serialize_dataset():
save_fn()

Expand Down
2 changes: 1 addition & 1 deletion python/ray/tune/impl/out_of_band_serialize_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def _reduce(ds: ray.data.Dataset):
if "serialize_lineage" in tb:
_already_in_out_of_band_serialization = True
break
if not _already_in_out_of_band_serialization:
if not _already_in_out_of_band_serialization and ds.has_serializable_lineage():
return _deserialize_and_fully_execute_if_needed, (ds.serialize_lineage(),)
else:
return ds.__reduce__()
Expand Down
70 changes: 70 additions & 0 deletions python/ray/tune/tests/test_trial_runner_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections import Counter
import logging
import os
import pandas as pd
import pickle
import shutil
import sys
Expand All @@ -14,6 +15,7 @@
import ray
from ray.air import CheckpointConfig
from ray.air.execution import PlacementGroupResourceManager, FixedResourceManager
from ray.exceptions import OwnerDiedError
from ray.rllib import _register_all
from ray.rllib.algorithms.callbacks import DefaultCallbacks

Expand Down Expand Up @@ -1155,6 +1157,74 @@ def testPeriodicCloudCheckpointSyncTimeout(self):
assert any("did not finish running within the timeout" in x for x in buffer)
assert syncer.sync_up_counter == 2

def testExperimentCheckpointWithDatasets(self):
"""Test trial runner checkpointing where trials contain Ray Datasets.
When possible, a dataset plan should be saved (for read_* APIs).
See `Dataset.serialize_lineage` for more information.

If a dataset cannot be serialized, an experiment checkpoint
should still be created. Users can pass in the dataset again by
re-specifying the `param_space`.
"""
ray.init(num_cpus=2)
# Save some test data to load
data_filepath = os.path.join(self.tmpdir, "test.csv")
pd.DataFrame({"x": list(range(10))}).to_csv(data_filepath)

def create_trial_config():
return {
"datasets": {
"with_lineage": ray.data.read_csv(data_filepath),
"no_lineage": ray.data.from_items([{"x": i} for i in range(10)]),
}
}

resolvers = create_resolvers_map()
config_with_placeholders = inject_placeholders(create_trial_config(), resolvers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to call these manually in the test?
I hope users don't have to worry about this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, users will go through Tuner path which will do this automatically. This is just bc I'm testing trial runner by itself.

trial = Trial(
"__fake",
experiment_path=self.tmpdir,
config=config_with_placeholders,
)
trial.init_local_path()
runner = TrialRunner(
experiment_path=self.tmpdir, placeholder_resolvers=resolvers
)
runner.add_trial(trial)
# Req: TrialRunner checkpointing shouldn't error
runner.checkpoint(force=True)

# Manually clear all block refs that may have been created
ray.shutdown()
ray.init(num_cpus=2)

new_runner = TrialRunner(experiment_path=self.tmpdir)
new_runner.resume()
[loaded_trial] = new_runner.get_trials()
loaded_datasets = loaded_trial.config["datasets"]

# Req: The deserialized dataset (w/ lineage) should be usable.
assert [el["x"] for el in loaded_datasets["with_lineage"].take()] == list(
range(10)
)
# Req: The deserialized dataset (w/o lineage) should NOT be usable.
with self.assertRaises(OwnerDiedError):
loaded_datasets["no_lineage"].take()

replaced_resolvers = create_resolvers_map()
inject_placeholders(create_trial_config(), replaced_resolvers)

respecified_config_runner = TrialRunner(
placeholder_resolvers=replaced_resolvers,
local_checkpoint_dir=self.tmpdir,
)
respecified_config_runner.resume()
[loaded_trial] = respecified_config_runner.get_trials()
ray_ds_no_lineage = loaded_trial.config["datasets"]["no_lineage"]

# Req: The dataset (w/o lineage) can be re-specified and is usable after.
assert [el["x"] for el in ray_ds_no_lineage.take()] == list(range(10))


class FixedResourceTrialRunnerTest3(TrialRunnerTest3):
def _resourceManager(self):
Expand Down