diff --git a/.buildkite/pipeline.ml.yml b/.buildkite/pipeline.ml.yml index b8ce8d4aa2ad..f5e91f551ead 100644 --- a/.buildkite/pipeline.ml.yml +++ b/.buildkite/pipeline.ml.yml @@ -233,7 +233,7 @@ instance_size: medium commands: - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT - - TUNE_TESTING=1 ./ci/env/install-dependencies.sh + - TUNE_TESTING=1 DATA_PROCESSING_TESTING=1 ./ci/env/install-dependencies.sh - ./ci/env/env_info.sh - bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=medium_instance,-py37,-soft_imports,-gpu_only,-rllib,-multinode diff --git a/python/ray/tune/execution/experiment_state.py b/python/ray/tune/execution/experiment_state.py index b344d9654ecb..719d61b8576e 100644 --- a/python/ray/tune/execution/experiment_state.py +++ b/python/ray/tune/execution/experiment_state.py @@ -204,6 +204,11 @@ def checkpoint( # Checkpoint checkpoint_time_start = time.monotonic() + # NOTE: This context manager is for Ray Datasets captured in a trial config. + # This is the case when *tuning over datasets*. + # If the datasets have already been full executed, then serializing + # block refs means that this checkpoint is not usable in a new Ray cluster. + # This context will serialize the dataset execution plan instead, if available. with out_of_band_serialize_dataset(): save_fn() diff --git a/python/ray/tune/impl/out_of_band_serialize_dataset.py b/python/ray/tune/impl/out_of_band_serialize_dataset.py index 380869fa3d32..112cee4d8032 100644 --- a/python/ray/tune/impl/out_of_band_serialize_dataset.py +++ b/python/ray/tune/impl/out_of_band_serialize_dataset.py @@ -17,7 +17,7 @@ def _reduce(ds: ray.data.Dataset): if "serialize_lineage" in tb: _already_in_out_of_band_serialization = True break - if not _already_in_out_of_band_serialization: + if not _already_in_out_of_band_serialization and ds.has_serializable_lineage(): return _deserialize_and_fully_execute_if_needed, (ds.serialize_lineage(),) else: return ds.__reduce__() diff --git a/python/ray/tune/tests/test_trial_runner_3.py b/python/ray/tune/tests/test_trial_runner_3.py index 1ef23a938b3d..a7c817bf48af 100644 --- a/python/ray/tune/tests/test_trial_runner_3.py +++ b/python/ray/tune/tests/test_trial_runner_3.py @@ -2,6 +2,7 @@ from collections import Counter import logging import os +import pandas as pd import pickle import shutil import sys @@ -14,6 +15,7 @@ import ray from ray.air import CheckpointConfig from ray.air.execution import PlacementGroupResourceManager, FixedResourceManager +from ray.exceptions import OwnerDiedError from ray.rllib import _register_all from ray.rllib.algorithms.callbacks import DefaultCallbacks @@ -1155,6 +1157,74 @@ def testPeriodicCloudCheckpointSyncTimeout(self): assert any("did not finish running within the timeout" in x for x in buffer) assert syncer.sync_up_counter == 2 + def testExperimentCheckpointWithDatasets(self): + """Test trial runner checkpointing where trials contain Ray Datasets. + When possible, a dataset plan should be saved (for read_* APIs). + See `Dataset.serialize_lineage` for more information. + + If a dataset cannot be serialized, an experiment checkpoint + should still be created. Users can pass in the dataset again by + re-specifying the `param_space`. + """ + ray.init(num_cpus=2) + # Save some test data to load + data_filepath = os.path.join(self.tmpdir, "test.csv") + pd.DataFrame({"x": list(range(10))}).to_csv(data_filepath) + + def create_trial_config(): + return { + "datasets": { + "with_lineage": ray.data.read_csv(data_filepath), + "no_lineage": ray.data.from_items([{"x": i} for i in range(10)]), + } + } + + resolvers = create_resolvers_map() + config_with_placeholders = inject_placeholders(create_trial_config(), resolvers) + trial = Trial( + "__fake", + experiment_path=self.tmpdir, + config=config_with_placeholders, + ) + trial.init_local_path() + runner = TrialRunner( + experiment_path=self.tmpdir, placeholder_resolvers=resolvers + ) + runner.add_trial(trial) + # Req: TrialRunner checkpointing shouldn't error + runner.checkpoint(force=True) + + # Manually clear all block refs that may have been created + ray.shutdown() + ray.init(num_cpus=2) + + new_runner = TrialRunner(experiment_path=self.tmpdir) + new_runner.resume() + [loaded_trial] = new_runner.get_trials() + loaded_datasets = loaded_trial.config["datasets"] + + # Req: The deserialized dataset (w/ lineage) should be usable. + assert [el["x"] for el in loaded_datasets["with_lineage"].take()] == list( + range(10) + ) + # Req: The deserialized dataset (w/o lineage) should NOT be usable. + with self.assertRaises(OwnerDiedError): + loaded_datasets["no_lineage"].take() + + replaced_resolvers = create_resolvers_map() + inject_placeholders(create_trial_config(), replaced_resolvers) + + respecified_config_runner = TrialRunner( + placeholder_resolvers=replaced_resolvers, + local_checkpoint_dir=self.tmpdir, + ) + respecified_config_runner.resume() + [loaded_trial] = respecified_config_runner.get_trials() + ray_ds_no_lineage = loaded_trial.config["datasets"]["no_lineage"] + + # Req: The dataset (w/o lineage) can be re-specified and is usable after. + assert [el["x"] for el in ray_ds_no_lineage.take()] == list(range(10)) + class FixedResourceTrialRunnerTest3(TrialRunnerTest3): def _resourceManager(self):