From 074e9769bb9db8dce594893b088997b28a261673 Mon Sep 17 00:00:00 2001 From: Kai Fricke Date: Thu, 13 Apr 2023 21:25:08 +0200 Subject: [PATCH] [air] Move to new storage_path API in tests and examples (#34263) Following #33463, this PR updates our tests, examples, and docs to use the new `storage_path` API. The only locations where we continue to use the `local_dir` statement are tests where we specify both a local dir and a remote dir. For these tests, we can move to an environment-variable based wrapper in the future. Signed-off-by: Kai Fricke Signed-off-by: elliottower --- doc/source/ray-air/doc_code/tuner.py | 7 ++- .../ray-air/examples/batch_forecasting.ipynb | 2 +- .../ray-air/examples/batch_tuning.ipynb | 2 +- doc/source/train/doc_code/key_concepts.py | 10 ++-- .../lightning/lightning_mnist_example.ipynb | 2 +- .../pytorch/pytorch_resnet_finetune.ipynb | 4 +- doc/source/train/faq.rst | 6 +-- doc/source/tune/api/suggestion.rst | 2 +- doc/source/tune/doc_code/faq.py | 13 ++---- doc/source/tune/examples/pbt_guide.ipynb | 2 +- doc/source/tune/examples/tune-aim.ipynb | 2 +- .../tune/examples/tune_analyze_results.ipynb | 6 +-- doc/source/tune/faq.rst | 4 +- .../tune/tutorials/tune-distributed.rst | 18 ++++---- doc/source/tune/tutorials/tune-output.rst | 4 +- doc/source/tune/tutorials/tune-storage.rst | 44 ++++++++---------- .../tune/tutorials/tune-trial-checkpoints.rst | 2 +- python/ray/air/tests/test_errors.py | 4 +- python/ray/tune/execution/experiment_state.py | 4 +- python/ray/tune/execution/trial_runner.py | 6 +-- python/ray/tune/syncer.py | 12 ++++- .../tests/_test_cluster_interrupt_searcher.py | 2 +- python/ray/tune/tests/test_actor_reuse.py | 7 ++- python/ray/tune/tests/test_api.py | 17 ++++--- python/ray/tune/tests/test_cluster.py | 28 +++++------ python/ray/tune/tests/test_commands.py | 6 +-- python/ray/tune/tests/test_experiment.py | 10 ++-- .../tune/tests/test_experiment_analysis.py | 16 +++---- .../tests/test_experiment_analysis_mem.py | 11 +++-- .../test_integration_pytorch_lightning.py | 4 +- python/ray/tune/tests/test_multinode_sync.py | 4 +- python/ray/tune/tests/test_result_grid.py | 12 +++-- python/ray/tune/tests/test_syncer.py | 46 ++++++++----------- python/ray/tune/tests/test_trainable.py | 4 +- python/ray/tune/tests/test_trainable_util.py | 2 +- .../tune/tests/test_trial_relative_logdir.py | 14 +++--- python/ray/tune/tests/test_trial_runner_2.py | 2 +- python/ray/tune/tests/test_trial_runner_3.py | 26 ++++++----- python/ray/tune/tests/test_tune_restore.py | 18 ++++---- .../tests/test_tune_restore_warm_start.py | 4 +- .../ray/tune/tests/test_tune_save_restore.py | 2 +- python/ray/tune/tests/test_tuner_restore.py | 36 +++++++-------- python/ray/tune/utils/release_test_util.py | 6 +-- .../cloud_tests/workloads/_tune_script.py | 8 ++-- .../cloud_tests/workloads/run_cloud_test.py | 30 ++++++------ .../test_tune_worker_fault_tolerance.py | 11 ++--- .../workloads/test_durable_trainable.py | 5 +- 47 files changed, 236 insertions(+), 251 deletions(-) diff --git a/doc/source/ray-air/doc_code/tuner.py b/doc/source/ray-air/doc_code/tuner.py index 08bc7266d710..9957d51d4aaa 100644 --- a/doc/source/ray-air/doc_code/tuner.py +++ b/doc/source/ray-air/doc_code/tuner.py @@ -104,7 +104,7 @@ tuner = Tuner( trainable=trainer, - run_config=RunConfig(name="test_tuner", local_dir="~/ray_results"), + run_config=RunConfig(name="test_tuner", storage_path="~/ray_results"), param_space=param_space, tune_config=tune.TuneConfig( mode="min", metric="loss", num_samples=2, max_concurrent_trials=2 @@ -215,14 +215,13 @@ def get_another_dataset(): # __result_grid_inspection_end__ # __run_config_start__ -from ray import air, tune +from ray import air from ray.air.config import RunConfig run_config = RunConfig( name="MyExperiment", - local_dir="./your_log_directory/", + storage_path="s3://...", verbose=2, - sync_config=tune.SyncConfig(upload_dir="s3://..."), checkpoint_config=air.CheckpointConfig(checkpoint_frequency=2), ) # __run_config_end__ diff --git a/doc/source/ray-air/examples/batch_forecasting.ipynb b/doc/source/ray-air/examples/batch_forecasting.ipynb index 4fa33c5a7b4f..5b954a770caa 100644 --- a/doc/source/ray-air/examples/batch_forecasting.ipynb +++ b/doc/source/ray-air/examples/batch_forecasting.ipynb @@ -1411,7 +1411,7 @@ " ),\n", " run_config=air.RunConfig(\n", " # Redirect logs to relative path instead of default ~/ray_results/.\n", - " local_dir=\"my_Tune_logs\",\n", + " storage_path=\"my_Tune_logs\",\n", " # Specify name to make logs easier to find in log path.\n", " name=\"ptf_nyc\",\n", " ),\n", diff --git a/doc/source/ray-air/examples/batch_tuning.ipynb b/doc/source/ray-air/examples/batch_tuning.ipynb index cf1c8fcbb6fc..9194adc8529e 100644 --- a/doc/source/ray-air/examples/batch_tuning.ipynb +++ b/doc/source/ray-air/examples/batch_tuning.ipynb @@ -667,7 +667,7 @@ " param_space=search_space,\n", " run_config=air.RunConfig(\n", " # redirect logs to relative path instead of default ~/ray_results/\n", - " local_dir=\"my_Tune_logs\",\n", + " storage_path=\"my_Tune_logs\",\n", " name=\"batch_tuning\",\n", " # Set Ray Tune verbosity. Print summary table only with levels 2 or 3.\n", " verbose=2,\n", diff --git a/doc/source/train/doc_code/key_concepts.py b/doc/source/train/doc_code/key_concepts.py index fd1a225c0941..b7845f80d530 100644 --- a/doc/source/train/doc_code/key_concepts.py +++ b/doc/source/train/doc_code/key_concepts.py @@ -102,8 +102,8 @@ def train_fn(config): run_config = RunConfig( # Name of the training run (directory name). name="my_train_run", - # Directory to store results in (will be local_dir/name). - local_dir="~/ray_results", + # Directory to store results in (will be storage_path/name). + storage_path="~/ray_results", # Low training verbosity. verbose=1, ) @@ -125,10 +125,8 @@ def train_fn(config): from ray.tune import SyncConfig run_config = RunConfig( - sync_config=SyncConfig( - # This will store checkpoints on S3. - upload_dir="s3://remote-bucket/location" - ) + # This will store checkpoints on S3. + storage_path="s3://remote-bucket/location" ) # __sync_config_end__ diff --git a/doc/source/train/examples/lightning/lightning_mnist_example.ipynb b/doc/source/train/examples/lightning/lightning_mnist_example.ipynb index 08f0a01ebd14..043dba3a08ac 100644 --- a/doc/source/train/examples/lightning/lightning_mnist_example.ipynb +++ b/doc/source/train/examples/lightning/lightning_mnist_example.ipynb @@ -290,7 +290,7 @@ "\n", "run_config = RunConfig(\n", " name=\"ptl-mnist-example\",\n", - " local_dir=\"/tmp/ray_results\",\n", + " storage_path=\"/tmp/ray_results\",\n", " checkpoint_config=CheckpointConfig(\n", " num_to_keep=3,\n", " checkpoint_score_attribute=\"val_accuracy\",\n", diff --git a/doc/source/train/examples/pytorch/pytorch_resnet_finetune.ipynb b/doc/source/train/examples/pytorch/pytorch_resnet_finetune.ipynb index 379fa5f789a1..6ecdb7db4504 100644 --- a/doc/source/train/examples/pytorch/pytorch_resnet_finetune.ipynb +++ b/doc/source/train/examples/pytorch/pytorch_resnet_finetune.ipynb @@ -355,7 +355,7 @@ "# Set experiment name and checkpoint configs\n", "run_config = RunConfig(\n", " name=\"finetune-resnet\",\n", - " local_dir=\"/tmp/ray_results\",\n", + " storage_path=\"/tmp/ray_results\",\n", " checkpoint_config=checkpoint_config,\n", ")\n" ] @@ -628,7 +628,7 @@ "## Load the checkpoint for prediction:\n", "\n", " \n", - " The metadata and checkpoints have already been saved in the `local_dir` specified in TorchTrainer:" + " The metadata and checkpoints have already been saved in the `storage_path` specified in TorchTrainer:" ] }, { diff --git a/doc/source/train/faq.rst b/doc/source/train/faq.rst index 0cf0b093a1bb..737cf16f81e0 100644 --- a/doc/source/train/faq.rst +++ b/doc/source/train/faq.rst @@ -44,8 +44,8 @@ Since this is applicable to all of Ray Train's built-in trainers, we'll use `FrameworkTrainer` to refer to a generic trainer for the remainder of this answer. To restore an experiment, first find the experiment directory that your previous -run was saved to. If you saved locally, this will look like ``{local_dir}/{name}``, -where ``local_dir`` may be ``~/ray_results``, and ``name`` is something +run was saved to. If you saved locally, this will look like ``{storage_path}/{name}``, +where ``storage_path`` may be ``~/ray_results``, and ``name`` is something like ``FrameworkTrainer_2023-xxx``. Note that these are the same parameters that you pass through :class:`~ray.air.RunConfig`. @@ -108,7 +108,7 @@ to determine the existence/validity of the given experiment directory. scaling_config=air.ScalingConfig(num_workers=2, use_gpu=False), run_config=air.RunConfig( name=experiment_name, - local_dir="~/ray_results", + storage_path="~/ray_results", failure_config=air.FailureConfig(max_failures=3), stop={"training_iteration": 10}, ), diff --git a/doc/source/tune/api/suggestion.rst b/doc/source/tune/api/suggestion.rst index b225b635f790..7c1c05a2187b 100644 --- a/doc/source/tune/api/suggestion.rst +++ b/doc/source/tune/api/suggestion.rst @@ -84,7 +84,7 @@ identifier: ), run_config=air.RunConfig( name="my-experiment-1", - local_dir="~/my_results", + storage_path="~/my_results", ) ) results = tuner_1.fit() diff --git a/doc/source/tune/doc_code/faq.py b/doc/source/tune/doc_code/faq.py index 27e5966aef0c..cc6ae423f8b1 100644 --- a/doc/source/tune/doc_code/faq.py +++ b/doc/source/tune/doc_code/faq.py @@ -223,7 +223,7 @@ def trainable(config): # __torch_seed_example_end__ # __large_data_start__ -from ray import tune, air +from ray import air, tune import numpy as np @@ -244,7 +244,7 @@ def f(config, data=None): # __log_1_start__ tuner = tune.Tuner( MyTrainableClass, - sync_config=tune.SyncConfig(upload_dir="s3://my-log-dir"), + run_config=air.RunConfig(storage_path="s3://my-log-dir"), ) tuner.fit() # __log_1_end__ @@ -268,9 +268,7 @@ def delete(self, remote_dir: str) -> bool: tuner = tune.Tuner( MyTrainableClass, - sync_config=tune.SyncConfig( - upload_dir="s3://my-log-dir", syncer=CustomSyncer() - ), + run_config=air.RunConfig(storage_path="s3://my-log-dir"), ) tuner.fit() # __log_2_end__ @@ -344,7 +342,6 @@ def wait(self): sync_down_template="aws s3 sync {source} {target}", delete_template="aws s3 rm {target} --recursive", ), - upload_dir="s3://bucket/path", ) # __custom_command_syncer_end__ @@ -356,7 +353,7 @@ def wait(self): tuner = tune.Tuner( train_fn, # ..., - sync_config=tune.SyncConfig(upload_dir="s3://your-s3-bucket/durable-trial/"), + run_config=air.RunConfig(storage_path="s3://your-s3-bucket/durable-trial/"), ) tuner.fit() # __s3_end__ @@ -367,7 +364,7 @@ def wait(self): tuner = tune.Tuner( train_fn, run_config=air.RunConfig( - local_dir="/path/to/shared/storage", + storage_path="/path/to/shared/storage", ), sync_config=tune.SyncConfig( # Do not sync because we are on shared storage diff --git a/doc/source/tune/examples/pbt_guide.ipynb b/doc/source/tune/examples/pbt_guide.ipynb index 5660242fb133..6c49b2cbd5e0 100644 --- a/doc/source/tune/examples/pbt_guide.ipynb +++ b/doc/source/tune/examples/pbt_guide.ipynb @@ -199,7 +199,7 @@ " checkpoint_score_attribute=\"mean_accuracy\",\n", " num_to_keep=4,\n", " ),\n", - " local_dir=\"/tmp/ray_results\",\n", + " storage_path=\"/tmp/ray_results\",\n", " ),\n", " tune_config=tune.TuneConfig(\n", " scheduler=scheduler,\n", diff --git a/doc/source/tune/examples/tune-aim.ipynb b/doc/source/tune/examples/tune-aim.ipynb index 1f6833b31077..ad180658faa8 100644 --- a/doc/source/tune/examples/tune-aim.ipynb +++ b/doc/source/tune/examples/tune-aim.ipynb @@ -262,7 +262,7 @@ " train_function,\n", " run_config=air.RunConfig(\n", " callbacks=[AimLoggerCallback()],\n", - " local_dir=\"/tmp/ray_results\",\n", + " storage_path=\"/tmp/ray_results\",\n", " name=\"aim_example\",\n", " ),\n", " param_space={\n", diff --git a/doc/source/tune/examples/tune_analyze_results.ipynb b/doc/source/tune/examples/tune_analyze_results.ipynb index 5a310b065a7b..55efc636e8dd 100644 --- a/doc/source/tune/examples/tune_analyze_results.ipynb +++ b/doc/source/tune/examples/tune_analyze_results.ipynb @@ -36,7 +36,7 @@ "from ray.tune.examples.mnist_pytorch import train_mnist\n", "from ray.tune import ResultGrid\n", "\n", - "local_dir = \"/tmp/ray_results\"\n", + "storage_path = \"/tmp/ray_results\"\n", "exp_name = \"tune_analyzing_results\"\n", "tuner = tune.Tuner(\n", " train_mnist,\n", @@ -52,7 +52,7 @@ " checkpoint_score_attribute=\"mean_accuracy\",\n", " num_to_keep=5,\n", " ),\n", - " local_dir=local_dir,\n", + " storage_path=storage_path,\n", " ),\n", " tune_config=tune.TuneConfig(mode=\"max\", metric=\"mean_accuracy\", num_samples=3),\n", ")\n", @@ -91,7 +91,7 @@ } ], "source": [ - "experiment_path = f\"{local_dir}/{exp_name}\"\n", + "experiment_path = f\"{storage_path}/{exp_name}\"\n", "print(f\"Loading results from {experiment_path}...\")\n", "\n", "restored_tuner = tune.Tuner.restore(experiment_path, trainable=train_mnist)\n", diff --git a/doc/source/tune/faq.rst b/doc/source/tune/faq.rst index 4e835408cf3b..034a77093327 100644 --- a/doc/source/tune/faq.rst +++ b/doc/source/tune/faq.rst @@ -476,7 +476,7 @@ not kept open by Ray Tune. logs and checkpoints will not be synced to the driver, so if you need to access them later, you will have to transfer them where you need them manually. -2. You can use :ref:`cloud checkpointing ` to save logs and checkpoints to a specified `upload_dir`. +2. You can use :ref:`cloud checkpointing ` to save logs and checkpoints to a specified `storage_path`. This is the preferred way to deal with this. All syncing will be taken care of automatically, as all nodes are able to access the cloud storage. Additionally, your results will be safe, so even when you're working on pre-emptible instances, you won't lose any of your data. @@ -592,7 +592,7 @@ be automatically fetched and passed to your trainable as a parameter. How can I upload my Tune results to cloud storage? ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -If an upload directory is provided, Tune will automatically sync results from the ``local_dir`` to the given directory, +If an upload directory is provided, Tune will automatically sync results from the ``RAY_AIR_LOCAL_CACHE_DIR`` to the given directory, natively supporting standard URIs for systems like S3, gsutil or HDFS. You can add more filesystems by installing `fs-spec `_-compatible filesystems e.g. using pip. diff --git a/doc/source/tune/tutorials/tune-distributed.rst b/doc/source/tune/tutorials/tune-distributed.rst index 369f3e8da57b..95d4baa91a12 100644 --- a/doc/source/tune/tutorials/tune-distributed.rst +++ b/doc/source/tune/tutorials/tune-distributed.rst @@ -52,7 +52,7 @@ Analyze your results on TensorBoard by starting TensorBoard on the remote head m ray exec tune-default.yaml 'tensorboard --logdir=~/ray_results/ --port 6006' --port-forward 6006 -Note that you can customize the directory of results by specifying: ``air.RunConfig(local_dir=..)``, taken in by ``Tuner``. You can then point TensorBoard to that directory to visualize results. You can also use `awless `_ for easy cluster management on AWS. +Note that you can customize the directory of results by specifying: ``air.RunConfig(storage_path=..)``, taken in by ``Tuner``. You can then point TensorBoard to that directory to visualize results. You can also use `awless `_ for easy cluster management on AWS. Running a Distributed Tune Experiment @@ -101,8 +101,8 @@ Storage Options in a Distributed Tune Run ----------------------------------------- In a distributed experiment, you should try to use :ref:`cloud checkpointing ` to -reduce synchronization overhead. For this, you just have to specify an ``upload_dir`` in the -:class:`tune.SyncConfig `. +reduce synchronization overhead. For this, you just have to specify a remote ``storage_path`` in the +:class:`air.RunConfig `. `my_trainable` is a user-defined :ref:`Tune Trainable ` in the following example: @@ -114,10 +114,8 @@ reduce synchronization overhead. For this, you just have to specify an ``upload_ tuner = tune.Tuner( my_trainable, run_config=air.RunConfig( - name="experiment_name" - sync_config=tune.SyncConfig( - upload_dir="s3://bucket-name/sub-path/" - ) + name="experiment_name", + storage_path="s3://bucket-name/sub-path/", ) ) tuner.fit() @@ -214,7 +212,7 @@ To summarize, here are the commands to run: You should see Tune eventually continue the trials on a different worker node. See the :ref:`Fault Tolerance ` section for more details. -You can also specify ``sync_config=tune.SyncConfig(upload_dir=...)``, as part of ``air.RunConfig``, which is taken in by ``Tuner``, to sync results with a cloud storage like S3, allowing you to persist results in case you want to start and stop your cluster automatically. +You can also specify ``storage_path=...``, as part of ``air.RunConfig``, which is taken in by ``Tuner``, to upload results to cloud storage like S3, allowing you to persist results in case you want to start and stop your cluster automatically. .. _tune-fault-tol: @@ -256,8 +254,8 @@ Below are some commonly used commands for submitting experiments. Please see the # Start a cluster and run an experiment in a detached tmux session, # and shut down the cluster as soon as the experiment completes. - # In `tune_experiment.py`, set `tune.SyncConfig(upload_dir="s3://...")` - # and pass it to `sync_config=...` to persist results + # In `tune_experiment.py`, set `air.RunConfig(storage_path="s3://...")` + # to persist results $ ray submit CLUSTER.YAML --tmux --start --stop tune_experiment.py -- --address=localhost:6379 # To start or update your cluster: diff --git a/doc/source/tune/tutorials/tune-output.rst b/doc/source/tune/tutorials/tune-output.rst index 9b9ca1ab4161..8091267a8ad4 100644 --- a/doc/source/tune/tutorials/tune-output.rst +++ b/doc/source/tune/tutorials/tune-output.rst @@ -21,7 +21,7 @@ Tune will log the results of each trial to a sub-folder under a specified local tuner = tune.Tuner(trainable, run_config=air.RunConfig(num_samples=2)) results = tuner.fit() -You can specify the ``local_dir`` and ``trainable_name``: +You can specify the ``storage_path`` and ``trainable_name``: .. code-block:: python @@ -30,7 +30,7 @@ You can specify the ``local_dir`` and ``trainable_name``: # Only trial_name is autogenerated. tuner = tune.Tuner(trainable, tune_config=tune.TuneConfig(num_samples=2), - run_config=air.RunConfig(local_dir="./results", name="test_experiment")) + run_config=air.RunConfig(storage_path="./results", name="test_experiment")) results = tuner.fit() diff --git a/doc/source/tune/tutorials/tune-storage.rst b/doc/source/tune/tutorials/tune-storage.rst index 6d3843fa724b..ddeb0dd66586 100644 --- a/doc/source/tune/tutorials/tune-storage.rst +++ b/doc/source/tune/tutorials/tune-storage.rst @@ -69,7 +69,7 @@ has finished saving a checkpoint. This can be configured by ``sync_on_checkpoint trainable, run_config=RunConfig( name="experiment_name", - local_dir="~/ray_results", + storage_path="~/ray_results", sync_config=tune.SyncConfig( syncer="auto", # Sync approximately every minute rather than on every checkpoint @@ -116,7 +116,7 @@ All we need to do is **set the shared network filesystem as the path to save res trainable, run_config=air.RunConfig( name="experiment_name", - local_dir="/path/to/shared/storage/", + storage_path="/path/to/shared/storage/", sync_config=tune.SyncConfig( syncer=None # Disable syncing ) @@ -134,7 +134,7 @@ Configuring Tune with cloud storage (AWS S3, Google Cloud Storage) If all nodes in a Ray cluster have access to cloud storage, e.g. AWS S3 or Google Cloud Storage (GCS), then all experiment outputs can be saved in a shared cloud bucket. -We can configure cloud storage by telling Ray Tune to **upload to a remote** ``upload_dir``: +We can configure cloud storage by telling Ray Tune to **upload to a remote** ``storage_path``: .. code-block:: python :emphasize-lines: 8, 9, 10, 11 @@ -146,22 +146,20 @@ We can configure cloud storage by telling Ray Tune to **upload to a remote** ``u trainable, run_config=RunConfig( name="experiment_name", - sync_config=tune.SyncConfig( - upload_dir="s3://bucket-name/sub-path/", - syncer="auto", - ) + storage_path="s3://bucket-name/sub-path/", ) ) tuner.fit() -``syncer="auto"`` automatically configures a default syncer that uses pyarrow to -perform syncing with the specified cloud ``upload_dir``. -The ``syncer`` config can also take in a custom :class:`Syncer ` +Ray AIR automatically configures a default syncer that uses pyarrow to +perform syncing with the specified cloud ``storage_path``. +You can also pass a custom :class:`Syncer ` object +to the :class:`tune.SyncConfig ` if you want to implement custom logic for uploading/downloading from the cloud. See :ref:`tune-cloud-syncing` and :ref:`tune-cloud-syncing-command-line-example` for more details and examples of custom syncing. -In this example, all experiment results can be found in the shared storage at ``s3://bucket-name/sub-path/experiment_name`` ``/path/to/shared/storage/experiment_name`` for further processing. +In this example, all experiment results can be found in the shared storage at ``s3://bucket-name/sub-path/experiment_name`` for further processing. .. note:: @@ -193,6 +191,7 @@ that implements saving and loading checkpoints. .. code-block:: python + import os import ray from ray import air, tune from your_module import my_trainable @@ -200,21 +199,20 @@ that implements saving and loading checkpoints. # Look for the existing cluster and connect to it ray.init() - # Configure how experiment data and checkpoints are sync'd - # We recommend cloud storage checkpointing as it survives the cluster when - # instances are terminated and has better performance - sync_config = tune.SyncConfig( - upload_dir="s3://my-checkpoints-bucket/path/", # requires AWS credentials - ) + # Set the local caching directory. Results will be stored here + # before they are synced to remote storage. This env variable is ignored + # if `storage_path` below is set to a local directory. + os.environ["RAY_AIR_LOCAL_CACHE_DIR"] = "/tmp/mypath" tuner = tune.Tuner( my_trainable, run_config=air.RunConfig( # Name of your experiment name="my-tune-exp", - # Directory where each node's results are stored before being - # sync'd to cloud storage - local_dir="/tmp/mypath", + # Configure how experiment data and checkpoints are persisted. + # We recommend cloud storage checkpointing as it survives the cluster when + # instances are terminated and has better performance. + storage_path="s3://my-checkpoints-bucket/path/", # See above! we will sync our checkpoints to S3 directory sync_config=sync_config, checkpoint_config=air.CheckpointConfig( @@ -281,11 +279,7 @@ Again, we're running this example script from the Ray cluster's head node. my_trainable, run_config=air.RunConfig( name="my-tune-exp", - local_dir="/tmp/mypath", - # Use the default syncing behavior - # You don't have to pass an empty sync config - but we - # do it here for clarity and comparison - sync_config=tune.SyncConfig(), + storage_path="/tmp/mypath", checkpoint_config=air.CheckpointConfig( checkpoint_score_attribute="max-auc", checkpoint_score_order="max", diff --git a/doc/source/tune/tutorials/tune-trial-checkpoints.rst b/doc/source/tune/tutorials/tune-trial-checkpoints.rst index e271dc0351d2..976e1377e5b8 100644 --- a/doc/source/tune/tutorials/tune-trial-checkpoints.rst +++ b/doc/source/tune/tutorials/tune-trial-checkpoints.rst @@ -44,7 +44,7 @@ To create an AIR checkpoint, one can either use :meth:`~ray.air.checkpoint.Check In the above code snippet: - We implement *checkpoint saving* with :meth:`session.report(..., checkpoint=checkpoint) `. Note that every checkpoint must be reported alongside a set of metrics -- this way, checkpoints can be ordered with respect to a specified metric. -- The saved checkpoint during training iteration `epoch` is saved to the path ``///checkpoint_`` on the node on which training happens and can be further synced to a consolidated storage location depending on the :ref:`storage configuration `. +- The saved checkpoint during training iteration `epoch` is saved to the path ``///checkpoint_`` on the node on which training happens and can be further synced to a consolidated storage location depending on the :ref:`storage configuration `. - We implement *checkpoint loading* with :meth:`session.get_checkpoint() `. This will be populated with a trial's latest checkpoint whenever Tune restores a trial. This happens when (1) a trial is configured to retry after encountering a failure, (2) the experiment is being restored, and (3) the trial is being resumed after a pause (ex: :doc:`PBT `). .. TODO: for (1), link to tune fault tolerance guide. For (2), link to tune restore guide. diff --git a/python/ray/air/tests/test_errors.py b/python/ray/air/tests/test_errors.py index 57c41089eada..6e731357299b 100644 --- a/python/ray/air/tests/test_errors.py +++ b/python/ray/air/tests/test_errors.py @@ -111,7 +111,7 @@ def test_trainable_error_with_trainer(ray_start_4_cpus, tmp_path, fail_fast): name = f"test_trainer_errors-fail_fast={fail_fast}" trainer = FailingTrainer( run_config=RunConfig( - local_dir=str(tmp_path), + storage_path=str(tmp_path), name=name, failure_config=FailureConfig(fail_fast=fail_fast), ), @@ -168,7 +168,7 @@ def test_driver_error_with_trainer(ray_start_4_cpus, tmp_path, error_on): train_loop_per_worker=passing_fn, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( - local_dir=str(tmp_path), + storage_path=str(tmp_path), name=name, callbacks=[FailingCallback(error_on=error_on)], ), diff --git a/python/ray/tune/execution/experiment_state.py b/python/ray/tune/execution/experiment_state.py index cf40037f1055..daf89075012f 100644 --- a/python/ray/tune/execution/experiment_state.py +++ b/python/ray/tune/execution/experiment_state.py @@ -234,7 +234,7 @@ def sync_up(self, force: bool = False, wait: bool = False) -> bool: if not self._syncer: # or not self._remote_checkpoint_dir: return False - if bool(self._sync_config.upload_dir): + if bool(self._remote_checkpoint_dir): # If an upload dir is given, trainable actors upload checkpoints # themselves. Then the driver does not need to sync checkpoints. exclude = ["*/checkpoint_*"] @@ -297,7 +297,7 @@ def sync_down(self, force: bool = False, wait: bool = False) -> bool: if not self._syncer or not self._remote_checkpoint_dir: return False - if bool(self._sync_config.upload_dir): + if bool(self._remote_checkpoint_dir): # If an upload dir is given, trainable actors upload checkpoints # themselves. Then the driver does not need to sync checkpoints. exclude = ["*/checkpoint_*"] diff --git a/python/ray/tune/execution/trial_runner.py b/python/ray/tune/execution/trial_runner.py index da927062de50..185ec391b3c0 100644 --- a/python/ray/tune/execution/trial_runner.py +++ b/python/ray/tune/execution/trial_runner.py @@ -87,11 +87,9 @@ class _TuneControllerBase: Trial objects. scheduler: Defaults to FIFOScheduler. experiment_path: Path where global experiment state checkpoints - are saved and restored from. + are saved and restored from. If this is a remote URI, + experiment checkpoints will be synced to this location. sync_config: See :class:`~ray.tune.syncer.SyncConfig`. - Within sync config, the `upload_dir` specifies cloud storage, and - experiment state checkpoints will be synced to the `remote_checkpoint_dir`: - `{sync_config.upload_dir}/{experiment_name}`. experiment_dir_name: Experiment directory name. See :class:`~ray.tune.experiment.Experiment`. stopper: Custom class for stopping whole experiments. See ``Stopper``. diff --git a/python/ray/tune/syncer.py b/python/ray/tune/syncer.py index 2a5efdefdb7f..402259258863 100644 --- a/python/ray/tune/syncer.py +++ b/python/ray/tune/syncer.py @@ -138,8 +138,6 @@ def __post_init__(self): "disables syncing. Either remove the `upload_dir`, " "or set `syncer` to 'auto' or a custom syncer." ) - if not self.upload_dir and isinstance(self.syncer, Syncer): - raise ValueError("Must specify an `upload_dir` to use a custom `syncer`.") def _repr_html_(self) -> str: """Generate an HTML representation of the SyncConfig. @@ -190,6 +188,16 @@ def validate_upload_dir(self, upload_dir: Optional[str] = None) -> bool: upload_dir: Path to validate. """ + upload_dir = upload_dir or self.upload_dir + if upload_dir and self.syncer is None: + raise ValueError( + "`upload_dir` enables syncing to cloud storage, but `syncer=None` " + "disables syncing. Either remove the `upload_dir`, " + "or set `syncer` to 'auto' or a custom syncer." + ) + if not upload_dir and isinstance(self.syncer, Syncer): + raise ValueError("Must specify an `upload_dir` to use a custom `syncer`.") + if isinstance(self.syncer, Syncer): return self.syncer.validate_upload_dir(upload_dir or self.upload_dir) else: diff --git a/python/ray/tune/tests/_test_cluster_interrupt_searcher.py b/python/ray/tune/tests/_test_cluster_interrupt_searcher.py index 2bd1df189b97..f6d979fd66f5 100644 --- a/python/ray/tune/tests/_test_cluster_interrupt_searcher.py +++ b/python/ray/tune/tests/_test_cluster_interrupt_searcher.py @@ -52,6 +52,6 @@ mode="max", config=space, stop={"training_iteration": 2}, - local_dir=args.local_dir, + storage_path=args.local_dir, name="experiment", ) diff --git a/python/ray/tune/tests/test_actor_reuse.py b/python/ray/tune/tests/test_actor_reuse.py index ac834fdc1e5d..b35d685ae895 100644 --- a/python/ray/tune/tests/test_actor_reuse.py +++ b/python/ray/tune/tests/test_actor_reuse.py @@ -465,7 +465,7 @@ def step(self): max_concurrent_trials=2, local_dir=str(tmp_path), name=exp_name, - sync_config=tune.SyncConfig(upload_dir=f"file://{tmp_target}"), + storage_path=f"file://{tmp_target}", trial_dirname_creator=lambda t: str(t.config.get("id")), checkpoint_freq=1, ) @@ -546,9 +546,8 @@ def load_checkpoint(self, *args, **kwargs): max_concurrent_trials=2, local_dir=str(local_dir), name=exp_name, - sync_config=tune.SyncConfig( - upload_dir=f"file://{tmp_target}", sync_artifacts=True - ), + storage_path=f"file://{tmp_target}", + sync_config=tune.SyncConfig(sync_artifacts=True), trial_dirname_creator=lambda t: str(t.config.get("id")), checkpoint_freq=1, ) diff --git a/python/ray/tune/tests/test_api.py b/python/ray/tune/tests/test_api.py index 15fde343c4e5..fd28f68b6023 100644 --- a/python/ray/tune/tests/test_api.py +++ b/python/ray/tune/tests/test_api.py @@ -667,7 +667,7 @@ def custom_trial_dir(trial): train, config={"t1": tune.grid_search([1, 2, 3])}, trial_dirname_creator=custom_trial_dir, - local_dir=self.tmpdir, + storage_path=self.tmpdir, ).trials logdirs = {t.local_path for t in trials} assert len(logdirs) == 3 @@ -679,7 +679,7 @@ def train(config, reporter): reporter(test=i) trials = tune.run( - train, config={"t1": tune.grid_search([1, 2, 3])}, local_dir=self.tmpdir + train, config={"t1": tune.grid_search([1, 2, 3])}, storage_path=self.tmpdir ).trials logdirs = {t.local_path for t in trials} for i in [1, 2, 3]: @@ -734,7 +734,7 @@ def test(config): "id": tune.grid_search(list(range(5))), }, verbose=1, - local_dir=tmpdir, + storage_path=tmpdir, ) trials = tune.run(test, raise_on_failed_trial=False, **config).trials self.assertEqual(Counter(t.status for t in trials)["ERROR"], 5) @@ -1066,9 +1066,8 @@ def _create_remote_actor(trainable_cls, sync_to_cloud): exp = Experiment( name="test_durable_sync", run=trainable_cls, - sync_config=tune.SyncConfig( - syncer=sync_to_cloud, upload_dir=upload_dir - ), + storage_path=upload_dir, + sync_config=tune.SyncConfig(syncer=sync_to_cloud), ) searchers = BasicVariantGenerator() @@ -1318,7 +1317,7 @@ def test_trial_dir(config): # Per default, the directory should be named `test_trial_dir_{date}` with tempfile.TemporaryDirectory() as tmp_dir: - tune.run(test_trial_dir, local_dir=tmp_dir) + tune.run(test_trial_dir, storage_path=tmp_dir) subdirs = list(os.listdir(tmp_dir)) self.assertNotIn("test_trial_dir", subdirs) @@ -1331,7 +1330,7 @@ def test_trial_dir(config): # If we set an explicit name, no date should be appended with tempfile.TemporaryDirectory() as tmp_dir: - tune.run(test_trial_dir, local_dir=tmp_dir, name="my_test_exp") + tune.run(test_trial_dir, storage_path=tmp_dir, name="my_test_exp") subdirs = list(os.listdir(tmp_dir)) self.assertIn("my_test_exp", subdirs) @@ -1345,7 +1344,7 @@ def test_trial_dir(config): # Don't append date if we set the env variable os.environ["TUNE_DISABLE_DATED_SUBDIR"] = "1" with tempfile.TemporaryDirectory() as tmp_dir: - tune.run(test_trial_dir, local_dir=tmp_dir) + tune.run(test_trial_dir, storage_path=tmp_dir) subdirs = list(os.listdir(tmp_dir)) self.assertIn("test_trial_dir", subdirs) diff --git a/python/ray/tune/tests/test_cluster.py b/python/ray/tune/tests/test_cluster.py index d5eeefb8c658..02c9f9d8f75d 100644 --- a/python/ray/tune/tests/test_cluster.py +++ b/python/ray/tune/tests/test_cluster.py @@ -15,7 +15,7 @@ from ray.tune.experiment import Experiment from ray.tune.error import TuneError from ray.tune.search import BasicVariantGenerator -from ray.tune.syncer import SyncerCallback, SyncConfig +from ray.tune.syncer import SyncerCallback from ray.tune.experiment import Trial from ray.tune.execution.trial_runner import TrialRunner @@ -202,10 +202,10 @@ def test_trial_migration(start_connected_emptyhead_cluster, tmpdir, durable): cluster.wait_for_nodes() if durable: - upload_dir = "file://" + str(tmpdir) + experiment_path = "file://" + str(tmpdir) + "/exp" syncer_callback = SyncerCallback() else: - upload_dir = None + experiment_path = None syncer_callback = custom_driver_logdir_callback(str(tmpdir)) runner = TrialRunner( @@ -214,7 +214,7 @@ def test_trial_migration(start_connected_emptyhead_cluster, tmpdir, durable): kwargs = { "stopping_criterion": {"training_iteration": 4}, "checkpoint_config": CheckpointConfig(checkpoint_frequency=2), - "sync_config": SyncConfig(upload_dir=upload_dir), + "experiment_path": experiment_path, "experiment_dir_name": "exp", "max_failures": 2, } @@ -260,7 +260,7 @@ def test_trial_migration(start_connected_emptyhead_cluster, tmpdir, durable): # Test recovery of trial that won't be checkpointed kwargs = { "stopping_criterion": {"training_iteration": 3}, - "sync_config": SyncConfig(upload_dir=upload_dir), + "experiment_path": experiment_path, "experiment_dir_name": "exp", } @@ -289,10 +289,10 @@ def test_trial_requeue(start_connected_emptyhead_cluster, tmpdir, durable): cluster.wait_for_nodes() if durable: - upload_dir = "file://" + str(tmpdir) + experiment_path = "file://" + str(tmpdir) + "/exp" syncer_callback = SyncerCallback() else: - upload_dir = None + experiment_path = None syncer_callback = custom_driver_logdir_callback(str(tmpdir)) runner = TrialRunner( @@ -301,7 +301,7 @@ def test_trial_requeue(start_connected_emptyhead_cluster, tmpdir, durable): kwargs = { "stopping_criterion": {"training_iteration": 5}, "checkpoint_config": CheckpointConfig(checkpoint_frequency=1), - "sync_config": SyncConfig(upload_dir=upload_dir), + "experiment_path": experiment_path, "experiment_dir_name": "exp", "max_failures": 1, } @@ -335,10 +335,10 @@ def test_migration_checkpoint_removal( cluster.wait_for_nodes() if durable: - upload_dir = "file://" + str(tmpdir) + experiment_path = "file://" + str(tmpdir) + "/exp" syncer_callback = SyncerCallback() else: - upload_dir = None + experiment_path = None syncer_callback = custom_driver_logdir_callback(str(tmpdir)) runner = TrialRunner( @@ -347,7 +347,7 @@ def test_migration_checkpoint_removal( kwargs = { "stopping_criterion": {"training_iteration": 4}, "checkpoint_config": CheckpointConfig(checkpoint_frequency=2), - "sync_config": SyncConfig(upload_dir=upload_dir), + "experiment_path": experiment_path, "experiment_dir_name": "exp", "max_failures": 2, } @@ -387,10 +387,10 @@ def test_cluster_down_full(start_connected_cluster, tmpdir, durable): dirpath = str(tmpdir) if durable: - upload_dir = "file://" + str(tmpdir) + storage_path = "file://" + str(tmpdir) syncer_callback = SyncerCallback() else: - upload_dir = None + storage_path = None syncer_callback = custom_driver_logdir_callback(str(tmpdir)) from ray.tune.result import DEFAULT_RESULTS_DIR @@ -401,7 +401,7 @@ def test_cluster_down_full(start_connected_cluster, tmpdir, durable): run="__fake", stop=dict(training_iteration=3), local_dir=local_dir, - sync_config=dict(upload_dir=upload_dir), + storage_path=storage_path, ) exp1_args = base_dict diff --git a/python/ray/tune/tests/test_commands.py b/python/ray/tune/tests/test_commands.py index efe8c6fc0077..4148aee1125c 100644 --- a/python/ray/tune/tests/test_commands.py +++ b/python/ray/tune/tests/test_commands.py @@ -71,7 +71,7 @@ def test_ls(start_ray, tmpdir): name=experiment_name, stop={"training_iteration": 1}, num_samples=num_samples, - local_dir=str(tmpdir), + storage_path=str(tmpdir), ) columns = ["episode_reward_mean", "training_iteration", "trial_id"] @@ -111,7 +111,7 @@ def test_ls_with_cfg(start_ray, tmpdir): name=experiment_name, stop={"training_iteration": 1}, config={"test_variable": tune.grid_search(list(range(5)))}, - local_dir=str(tmpdir), + storage_path=str(tmpdir), ) columns = [CONFIG_PREFIX + "/test_variable", "trial_id"] @@ -135,7 +135,7 @@ def test_lsx(start_ray, tmpdir): name=experiment_name, stop={"training_iteration": 1}, num_samples=1, - local_dir=project_path, + storage_path=project_path, ) limit = 2 diff --git a/python/ray/tune/tests/test_experiment.py b/python/ray/tune/tests/test_experiment.py index be3774b9ec2a..aa7c0ccb7ada 100644 --- a/python/ray/tune/tests/test_experiment.py +++ b/python/ray/tune/tests/test_experiment.py @@ -3,27 +3,23 @@ import ray from ray.air import CheckpointConfig -from ray.tune import register_trainable, SyncConfig +from ray.tune import register_trainable from ray.tune.experiment import Experiment, Trial, _convert_to_experiment_list from ray.tune.error import TuneError from ray.tune.utils import diagnose_serialization def test_remote_checkpoint_dir_with_query_string(tmp_path): - sync_config = SyncConfig(syncer="auto", upload_dir="s3://bucket?scheme=http") experiment = Experiment( - name="spam", - run=lambda config: config, - sync_config=sync_config, + name="spam", run=lambda config: config, storage_path="s3://bucket?scheme=http" ) assert experiment.remote_checkpoint_dir == "s3://bucket/spam?scheme=http" trial = Trial( "mock", stub=True, - sync_config=sync_config, + experiment_path="s3://bucket/spam?scheme=http", experiment_dir_name="spam", - local_dir=str(tmp_path), ) trial.relative_logdir = "trial_dirname" assert trial.remote_checkpoint_dir == "s3://bucket/spam/trial_dirname?scheme=http" diff --git a/python/ray/tune/tests/test_experiment_analysis.py b/python/ray/tune/tests/test_experiment_analysis.py index d14efe6290a5..dc17d26e5b03 100644 --- a/python/ray/tune/tests/test_experiment_analysis.py +++ b/python/ray/tune/tests/test_experiment_analysis.py @@ -41,7 +41,7 @@ def run_test_exp(self): self.ea = tune.run( MyTrainableClass, name=self.test_name, - local_dir=self.test_dir, + storage_path=self.test_dir, stop={"training_iteration": 1}, checkpoint_freq=1, num_samples=self.num_samples, @@ -55,7 +55,7 @@ def nan_test_exp(self): nan_ea = tune.run( lambda x: nan, name="testing_nan", - local_dir=self.test_dir, + storage_path=self.test_dir, stop={"training_iteration": 1}, num_samples=self.num_samples, config={ @@ -179,7 +179,7 @@ def train(config): pass tune.report(**result) - ea = tune.run(train, local_dir=self.test_dir, config={"steps": 3}) + ea = tune.run(train, storage_path=self.test_dir, config={"steps": 3}) best_trial = ea.get_best_trial(metric, mode="min") best_checkpoint = ea.get_best_checkpoint(best_trial, metric, mode="min") checkpoints_metrics = ea.get_trial_checkpoints_paths(best_trial, metric=metric) @@ -198,7 +198,7 @@ def testGetLastCheckpoint(self): new_ea = tune.run( MyTrainableClass, name=self.test_name, - local_dir=self.test_dir, + storage_path=self.test_dir, stop={"training_iteration": 2}, checkpoint_freq=1, config={ @@ -216,7 +216,7 @@ def testGetLastCheckpoint(self): tune.run( MyTrainableClass, name=self.test_name, - local_dir=self.test_dir, + storage_path=self.test_dir, restore=last_checkpoint, stop={"training_iteration": 3}, checkpoint_freq=1, @@ -238,7 +238,7 @@ def testIgnoreOtherExperiment(self): analysis = tune.run( MyTrainableClass, name="test_example", - local_dir=self.test_dir, + storage_path=self.test_dir, stop={"training_iteration": 1}, num_samples=1, config={ @@ -253,7 +253,7 @@ def testGetTrialCheckpointsPathsByPathWithSpecialCharacters(self): analysis = tune.run( MyTrainableClass, name="test_example", - local_dir=self.test_dir, + storage_path=self.test_dir, stop={"training_iteration": 1}, num_samples=1, config={"test": tune.grid_search([[1, 2], [3, 4]])}, @@ -269,7 +269,7 @@ def testGetTrialCheckpointsPathsWithTemporaryCheckpoints(self): analysis = tune.run( MyTrainableClass, name="test_example", - local_dir=self.test_dir, + storage_path=self.test_dir, stop={"training_iteration": 2}, num_samples=1, config={"test": tune.grid_search([[1, 2], [3, 4]])}, diff --git a/python/ray/tune/tests/test_experiment_analysis_mem.py b/python/ray/tune/tests/test_experiment_analysis_mem.py index 1033f7ff38af..336c29668f1c 100644 --- a/python/ray/tune/tests/test_experiment_analysis_mem.py +++ b/python/ray/tune/tests/test_experiment_analysis_mem.py @@ -57,7 +57,10 @@ def tearDown(self): def testInit(self): trial = Trial( - "MockTrainable", stub=True, trial_id="abcd1234", local_dir=self.test_dir + "MockTrainable", + stub=True, + trial_id="abcd1234", + experiment_path=self.test_dir, ) trial.status = Trial.TERMINATED trial.relative_logdir = "MockTrainable_0_id=3_2020-07-12" @@ -80,7 +83,7 @@ def testCompareTrials(self): ea = run( self.MockTrainable, name="analysis_exp", - local_dir=self.test_dir, + storage_path=self.test_dir, stop={"training_iteration": len(scores[0])}, num_samples=1, config={"id": grid_search(list(range(5)))}, @@ -131,7 +134,7 @@ def testRemoveMagicResults(self): [trial] = run( self.MockTrainable, name="analysis_remove_exp", - local_dir=self.test_dir, + storage_path=self.test_dir, stop={"training_iteration": 9}, num_samples=1, config={"id": 1}, @@ -170,7 +173,7 @@ def run_test_exp(self, test_name=None): run( MyTrainableClass, name=test_name, - local_dir=self.test_dir, + storage_path=self.test_dir, stop={"training_iteration": 1}, num_samples=self.num_samples, config={ diff --git a/python/ray/tune/tests/test_integration_pytorch_lightning.py b/python/ray/tune/tests/test_integration_pytorch_lightning.py index b6dcf59e1151..502d6693178c 100644 --- a/python/ray/tune/tests/test_integration_pytorch_lightning.py +++ b/python/ray/tune/tests/test_integration_pytorch_lightning.py @@ -118,7 +118,7 @@ def train(config): train, stop={TRAINING_ITERATION: 10}, keep_checkpoints_num=100, - local_dir=tmpdir, + storage_path=tmpdir, ) checkpoints = [ @@ -149,7 +149,7 @@ def train(config): train, stop={TRAINING_ITERATION: 10}, keep_checkpoints_num=100, - local_dir=tmpdir, + storage_path=tmpdir, ) checkpoints = [ diff --git a/python/ray/tune/tests/test_multinode_sync.py b/python/ray/tune/tests/test_multinode_sync.py index 5091028e1882..c9a990395ec9 100644 --- a/python/ray/tune/tests/test_multinode_sync.py +++ b/python/ray/tune/tests/test_multinode_sync.py @@ -267,7 +267,7 @@ def on_step_begin(self, iteration, trials, **info): num_samples=3, resources_per_trial={"cpu": 4}, max_failures=0, - local_dir="/cluster/node", + storage_path="/cluster/node", trial_name_creator=lambda trial: trial.trial_id, trial_dirname_creator=lambda trial: trial.trial_id, keep_checkpoints_num=2, @@ -324,7 +324,7 @@ def get_checkpoint_dirs(node_trial_path: str) -> List[str]: train, name="checkpoint_test", resources_per_trial={"cpu": 4}, - local_dir="/cluster/node", + storage_path="/cluster/node", keep_checkpoints_num=2, resume="AUTO", verbose=2, diff --git a/python/ray/tune/tests/test_result_grid.py b/python/ray/tune/tests/test_result_grid.py index 356fec7522d7..020a9ca127a0 100644 --- a/python/ray/tune/tests/test_result_grid.py +++ b/python/ray/tune/tests/test_result_grid.py @@ -334,7 +334,7 @@ def f(config): def test_num_errors_terminated(tmpdir): error_filename = "error.txt" - trials = [Trial("foo", local_dir=str(tmpdir), stub=True) for i in range(10)] + trials = [Trial("foo", experiment_path=str(tmpdir), stub=True) for i in range(10)] # Only create 1 shared trial logdir for this test trials[0].init_local_path() @@ -380,7 +380,7 @@ def train_func(config): ), run_config=air.RunConfig( name="exp_dir", - local_dir=str(tmpdir / "ray_results"), + storage_path=str(tmpdir / "ray_results"), stop={"it": total_iters}, checkpoint_config=air.CheckpointConfig( # Keep the latest checkpoints @@ -436,7 +436,7 @@ def test_result_grid_cloud_path(ray_start_2_cpus, tmpdir): # Test that checkpoints returned by ResultGrid point to URI # if upload_dir is specified in SyncConfig. local_dir = Path(tmpdir) / "local_dir" - sync_config = tune.SyncConfig(upload_dir="s3://bucket", syncer=MockSyncer()) + sync_config = tune.SyncConfig(syncer=MockSyncer()) def trainable(config): for i in range(5): @@ -445,7 +445,11 @@ def trainable(config): tuner = tune.Tuner( trainable, - run_config=air.RunConfig(sync_config=sync_config, local_dir=str(local_dir)), + run_config=air.RunConfig( + storage_path="s3://bucket", + sync_config=sync_config, + local_dir=str(local_dir), + ), tune_config=tune.TuneConfig( metric="metric", mode="max", diff --git a/python/ray/tune/tests/test_syncer.py b/python/ray/tune/tests/test_syncer.py index 3b54b8301bb8..853b512e211b 100644 --- a/python/ray/tune/tests/test_syncer.py +++ b/python/ray/tune/tests/test_syncer.py @@ -214,24 +214,24 @@ def wait(self): def test_sync_string_invalid_uri(): with pytest.raises(ValueError): - sync_config = tune.SyncConfig(upload_dir="invalid://some/url") - sync_config.validate_upload_dir() + sync_config = tune.SyncConfig() + sync_config.validate_upload_dir("invalid://some/url") def test_sync_string_invalid_local(): with pytest.raises(ValueError): - sync_config = tune.SyncConfig(upload_dir="/invalid/dir") - sync_config.validate_upload_dir() + sync_config = tune.SyncConfig() + sync_config.validate_upload_dir("/invalid/dir") def test_sync_string_valid_local(): - sync_config = tune.SyncConfig(upload_dir="file:///valid/dir") - sync_config.validate_upload_dir() + sync_config = tune.SyncConfig() + sync_config.validate_upload_dir("file:///valid/dir") def test_sync_string_valid_s3(): - sync_config = tune.SyncConfig(upload_dir="s3://valid/bucket") - sync_config.validate_upload_dir() + sync_config = tune.SyncConfig() + sync_config.validate_upload_dir("s3://valid/bucket") def test_sync_config_validate(): @@ -245,18 +245,20 @@ class CustomSyncer(_DefaultSyncer): def validate_upload_dir(cls, upload_dir: str) -> bool: return True - sync_config = tune.SyncConfig(upload_dir="/invalid/dir", syncer=CustomSyncer()) - sync_config.validate_upload_dir() + sync_config = tune.SyncConfig(syncer=CustomSyncer()) + sync_config.validate_upload_dir("/invalid/dir") def test_sync_config_upload_dir_custom_syncer_mismatch(): # Shouldn't be able to disable syncing if upload dir is specified with pytest.raises(ValueError): - tune.SyncConfig(upload_dir="s3://valid/bucket", syncer=None) + sync_config = tune.SyncConfig(syncer=None) + sync_config.validate_upload_dir("s3://valid/bucket") # Shouldn't be able to use a custom cloud syncer without specifying cloud dir with pytest.raises(ValueError): - tune.SyncConfig(upload_dir=None, syncer=_DefaultSyncer()) + sync_config = tune.SyncConfig(syncer=_DefaultSyncer()) + sync_config.validate_upload_dir(None) def test_syncer_sync_up_down(temp_data_dirs): @@ -671,7 +673,7 @@ def mock_error(x): trainable = ray.remote(TestTrainableRetry).remote( remote_checkpoint_dir=f"file://{tmp_target}", - sync_config=SyncConfig(upload_dir="not_used", syncer=syncer), + sync_config=SyncConfig(syncer=syncer), ) ray.get(trainable.save.remote()) @@ -681,11 +683,7 @@ def test_trainable_syncer_custom(ray_start_2_cpus, temp_data_dirs): """Check that Trainable.save() triggers syncing using custom syncer""" tmp_source, tmp_target = temp_data_dirs - sync_config = SyncConfig( - # upload_dir not actually used, but needed for SyncConfig validation - upload_dir="file://not_used", - syncer=CustomSyncer(), - ) + sync_config = SyncConfig(syncer=CustomSyncer()) trainable = ray.remote(TestTrainable).remote( remote_checkpoint_dir=f"file://{tmp_target}", sync_config=sync_config, @@ -707,7 +705,6 @@ def test_trainable_syncer_custom_command(ray_start_2_cpus, temp_data_dirs): tmp_source, tmp_target = temp_data_dirs sync_config = SyncConfig( - upload_dir="file://not_used", syncer=CustomCommandSyncer( sync_up_template="cp -rf {source} `echo '{target}' | cut -c 8-`", sync_down_template="cp -rf `echo '{source}' | cut -c 8-` {target}", @@ -781,9 +778,7 @@ def test_artifact_syncing_disabled(ray_start_2_cpus, temp_data_dirs, tmp_path): trainable = ray.remote(TestTrainable).remote( remote_checkpoint_dir=f"file://{tmp_target}", logdir=str(local_dir_1), - sync_config=SyncConfig( - upload_dir="file:///not_used", syncer="auto", sync_artifacts=False - ), + sync_config=SyncConfig(sync_artifacts=False), ) ray.get(trainable.train.remote()) @@ -901,9 +896,8 @@ def train_func(config): train_func, run_config=RunConfig( name="exp_name", - sync_config=tune.SyncConfig( - upload_dir="memory:///test_upload_dir", syncer=syncer - ), + storage_path="memory:///test_upload_dir", + sync_config=tune.SyncConfig(syncer=syncer), ), ) results = tuner.fit() @@ -973,8 +967,8 @@ def train_fn(config): param_space={"id": tune.grid_search([0, 1, 2, 3])}, run_config=RunConfig( name=exp_name, + storage_path=mock_s3_bucket_uri, local_dir=local_dir, - sync_config=tune.SyncConfig(upload_dir=mock_s3_bucket_uri), ), tune_config=tune.TuneConfig( trial_dirname_creator=lambda t: str(t.config.get("id")) diff --git a/python/ray/tune/tests/test_trainable.py b/python/ray/tune/tests/test_trainable.py index 872e56cca32a..5eda2a033767 100644 --- a/python/ray/tune/tests/test_trainable.py +++ b/python/ray/tune/tests/test_trainable.py @@ -225,9 +225,7 @@ def _sync_up_command(self, local_path: str, uri: str, exclude: list = None): trainable = SavingTrainable( "object", remote_checkpoint_dir=f"memory:///test/location_hanging_{hanging}", - sync_config=tune.SyncConfig( - upload_dir="memory:///test/", syncer=HangingSyncer(sync_timeout=0.5) - ), + sync_config=tune.SyncConfig(syncer=HangingSyncer(sync_timeout=0.5)), ) with patch("ray.air.checkpoint.upload_to_uri", _hanging_upload): diff --git a/python/ray/tune/tests/test_trainable_util.py b/python/ray/tune/tests/test_trainable_util.py index 8a510ff8d25f..f077938f1d75 100644 --- a/python/ray/tune/tests/test_trainable_util.py +++ b/python/ray/tune/tests/test_trainable_util.py @@ -53,7 +53,7 @@ def tune_one(config=None, checkpoint_dir=None): name = "AnalysisTest" ray.init(local_mode=True) - ray.tune.run(tune_one, local_dir=self.checkpoint_dir, name=name) + ray.tune.run(tune_one, storage_path=self.checkpoint_dir, name=name) a = ray.tune.ExperimentAnalysis( os.path.join(self.checkpoint_dir, name), diff --git a/python/ray/tune/tests/test_trial_relative_logdir.py b/python/ray/tune/tests/test_trial_relative_logdir.py index ef754108901c..d6bf194ddb02 100644 --- a/python/ray/tune/tests/test_trial_relative_logdir.py +++ b/python/ray/tune/tests/test_trial_relative_logdir.py @@ -46,7 +46,7 @@ def testDotsInLogdir(self): local_dir = str(local_dir_path) if local_dir_path.exists(): local_dir = tempfile.mkdtemp(prefix=str(local_dir_path) + "_") - trial = Trial(trainable_name="rel_logdir", local_dir=local_dir) + trial = Trial(trainable_name="rel_logdir", experiment_path=local_dir) with self.assertRaises(ValueError): trial.local_path = "/tmp/test_rel/../dots" @@ -70,7 +70,9 @@ def testRelativeLogdir(self): else: local_dir = str(local_dir_path) - tune.run("rel_logdir", config={"a": tune.randint(0, 10)}, local_dir=local_dir) + tune.run( + "rel_logdir", config={"a": tune.randint(0, 10)}, storage_path=local_dir + ) # Copy the folder local_dir_moved = local_dir + "_moved" @@ -129,7 +131,7 @@ def testRelativeLogdirWithNestedDir(self): tune.run( "rel_logdir", config={"a": tune.randint(0, 10)}, - local_dir=local_dir, + storage_path=local_dir, # Create a nested experiment directory. name="exp_dir/deep_exp_dir", ) @@ -196,7 +198,7 @@ def testRelativeLogdirWithJson(self): tune.run( "rel_logdir", config={"a": tune.randint(0, 10)}, - local_dir=local_dir, + storage_path=local_dir, ) # Copy the folder. @@ -260,7 +262,7 @@ def test_load_trial_from_json_state(tmpdir): and then creating a new trial using the `Trial.from_json_state` alternate constructor loads the trial with equivalent state.""" trial = Trial( - "MockTrainable", stub=True, trial_id="abcd1234", local_dir=str(tmpdir) + "MockTrainable", stub=True, trial_id="abcd1234", experiment_path=str(tmpdir) ) trial.create_placement_group_factory() trial.init_local_path() @@ -283,7 +285,7 @@ def test_load_trial_from_json_state(tmpdir): def test_change_trial_local_dir(tmpdir): trial = Trial( - "MockTrainable", stub=True, trial_id="abcd1234", local_dir=str(tmpdir) + "MockTrainable", stub=True, trial_id="abcd1234", experiment_path=str(tmpdir) ) trial.init_local_path() trial.status = Trial.TERMINATED diff --git a/python/ray/tune/tests/test_trial_runner_2.py b/python/ray/tune/tests/test_trial_runner_2.py index 7eaa36dd2ec3..6ed490ee50fb 100644 --- a/python/ray/tune/tests/test_trial_runner_2.py +++ b/python/ray/tune/tests/test_trial_runner_2.py @@ -369,7 +369,7 @@ def testPauseResumeCheckpointCount(self): # checkpoint_00000/ trial = Trial( "__fake", - local_dir=tempdir, + experiment_path=tempdir, checkpoint_config=CheckpointConfig(num_to_keep=2), ) trial.init_local_path() diff --git a/python/ray/tune/tests/test_trial_runner_3.py b/python/ray/tune/tests/test_trial_runner_3.py index 93552c00c4df..fb5da5758474 100644 --- a/python/ray/tune/tests/test_trial_runner_3.py +++ b/python/ray/tune/tests/test_trial_runner_3.py @@ -751,7 +751,7 @@ def count_checkpoints(cdir): # for more details. trial = Trial( "__fake", - local_dir=tmpdir, + experiment_path=tmpdir, checkpoint_config=CheckpointConfig(checkpoint_frequency=1), ) runner = TrialRunner( @@ -889,7 +889,11 @@ def testUserCheckpoint(self): # See `test_trial_runner2.TrialRunnerTest2.testPauseResumeCheckpointCount` # for more details. runner.add_trial( - Trial("__fake", local_dir=self.tmpdir, config={"user_checkpoint_freq": 2}) + Trial( + "__fake", + experiment_path=self.tmpdir, + config={"user_checkpoint_freq": 2}, + ) ) trials = runner.get_trials() @@ -990,11 +994,9 @@ def delete(self, remote_dir: str) -> bool: pass runner = TrialRunner( - local_checkpoint_dir=self.tmpdir, checkpoint_period="auto", - sync_config=SyncConfig( - upload_dir="fake://somewhere", syncer=CustomSyncer(), sync_period=0 - ), + experiment_path="fake://somewhere/exp", + sync_config=SyncConfig(syncer=CustomSyncer(), sync_period=0), trial_executor=RayTrialExecutor(resource_manager=self._resourceManager()), ) runner.add_trial(Trial("__fake", config={"user_checkpoint_freq": 1})) @@ -1038,8 +1040,8 @@ def delete(self, remote_dir: str) -> bool: syncer = CustomSyncer() runner = TrialRunner( - local_checkpoint_dir=self.tmpdir, - sync_config=SyncConfig(upload_dir="fake://somewhere", syncer=syncer), + experiment_path="fake://somewhere", + sync_config=SyncConfig(syncer=syncer), trial_checkpoint_config=checkpoint_config, checkpoint_period=100, # Only rely on forced syncing trial_executor=RayTrialExecutor(resource_manager=self._resourceManager()), @@ -1105,8 +1107,8 @@ def testForcedCloudCheckpointSyncTimeout(self): syncer = self.getHangingSyncer(sync_period=60, sync_timeout=0.5) runner = TrialRunner( - local_checkpoint_dir=self.tmpdir, - sync_config=SyncConfig(upload_dir="fake://somewhere", syncer=syncer), + experiment_path="fake://somewhere/exp", + sync_config=SyncConfig(syncer=syncer), ) # Checkpoint for the first time starts the first sync in the background runner.checkpoint(force=True) @@ -1132,8 +1134,8 @@ def testPeriodicCloudCheckpointSyncTimeout(self): sync_period = 60 syncer = self.getHangingSyncer(sync_period=sync_period, sync_timeout=0.5) runner = TrialRunner( - local_checkpoint_dir=self.tmpdir, - sync_config=SyncConfig(upload_dir="fake://somewhere", syncer=syncer), + experiment_path="fake://somewhere/exp", + sync_config=SyncConfig(syncer=syncer), ) with freeze_time() as frozen: diff --git a/python/ray/tune/tests/test_tune_restore.py b/python/ray/tune/tests/test_tune_restore.py index fdf03ca8ef73..e97162db3f4c 100644 --- a/python/ray/tune/tests/test_tune_restore.py +++ b/python/ray/tune/tests/test_tune_restore.py @@ -39,7 +39,7 @@ def setUp(self): name=test_name, stop={"training_iteration": 1}, checkpoint_freq=1, - local_dir=tmpdir, + storage_path=tmpdir, config={ "env": "CartPole-v0", "framework": "tf", @@ -106,7 +106,7 @@ def _train(config): tune.run( _train, - local_dir=local_dir, + storage_path=local_dir, name="interrupt", callbacks=[SteppingCallback(driver_semaphore, trainer_semaphore)], ) @@ -264,7 +264,7 @@ def testFailResumeGridSearch(self): "test2": tune.grid_search([1, 2, 3]), }, stop={"training_iteration": 2}, - local_dir=self.logdir, + storage_path=self.logdir, verbose=1, ) @@ -292,7 +292,7 @@ def testResourceUpdateInResume(self): "test2": tune.grid_search([1, 2, 3]), }, stop={"training_iteration": 2}, - local_dir=self.logdir, + storage_path=self.logdir, verbose=1, ) @@ -338,7 +338,7 @@ def __init__(self, name): ), }, stop={"training_iteration": 2}, - local_dir=self.logdir, + storage_path=self.logdir, verbose=1, ) @@ -392,7 +392,7 @@ def testFailResumeWithPreset(self): "test2": tune.grid_search([1, 2, 3]), }, stop={"training_iteration": 2}, - local_dir=self.logdir, + storage_path=self.logdir, verbose=1, ) with self.assertRaises(RuntimeError): @@ -435,7 +435,7 @@ def testFailResumeAfterPreset(self): "test2": tune.grid_search([1, 2, 3]), }, stop={"training_iteration": 2}, - local_dir=self.logdir, + storage_path=self.logdir, verbose=1, ) @@ -478,7 +478,7 @@ def testMultiExperimentFail(self): "test": tune.grid_search([1, 2, 3]), }, stop={"training_iteration": 1}, - local_dir=self.logdir, + storage_path=self.logdir, ) ) @@ -509,7 +509,7 @@ def testWarningLargeGrid(self): "test5": tune.grid_search(list(range(20))), }, stop={"training_iteration": 2}, - local_dir=self.logdir, + storage_path=self.logdir, verbose=1, ) with self.assertWarnsRegex(UserWarning, "exceeds the serialization threshold"): diff --git a/python/ray/tune/tests/test_tune_restore_warm_start.py b/python/ray/tune/tests/test_tune_restore_warm_start.py index a67cf870e09d..9f9a402e1791 100644 --- a/python/ray/tune/tests/test_tune_restore_warm_start.py +++ b/python/ray/tune/tests/test_tune_restore_warm_start.py @@ -63,7 +63,7 @@ def run_part_from_scratch(self): scheduler=self.get_scheduler(), verbose=0, name=self.experiment_name, - local_dir=self.tmpdir, + storage_path=self.tmpdir, reuse_actors=True, ) checkpoint_path = os.path.join(self.tmpdir, "warmStartTest.pkl") @@ -82,7 +82,7 @@ def run_from_experiment_restore(self, random_state): scheduler=self.get_scheduler(), verbose=0, name=self.experiment_name, - local_dir=self.tmpdir, + storage_path=self.tmpdir, reuse_actors=True, ) return results diff --git a/python/ray/tune/tests/test_tune_save_restore.py b/python/ray/tune/tests/test_tune_save_restore.py index 6a30fb8b057e..ecc54fdae601 100644 --- a/python/ray/tune/tests/test_tune_save_restore.py +++ b/python/ray/tune/tests/test_tune_save_restore.py @@ -74,7 +74,7 @@ def _train(self, exp_name, local_dir, absolute_local_dir): name=exp_name, stop={"training_iteration": 1}, checkpoint_freq=1, - local_dir=local_dir, + storage_path=local_dir, config={"env": "CartPole-v0", "log_level": "DEBUG"}, ).trials diff --git a/python/ray/tune/tests/test_tuner_restore.py b/python/ray/tune/tests/test_tuner_restore.py index f4e85e149a52..cf8fb80ce951 100644 --- a/python/ray/tune/tests/test_tuner_restore.py +++ b/python/ray/tune/tests/test_tuner_restore.py @@ -146,7 +146,7 @@ def test_tuner_restore_num_trials(ray_start_2_cpus, tmpdir): _dummy_train_fn, tune_config=TuneConfig(num_samples=4, metric="_metric", mode="max"), run_config=RunConfig( - name="test_tuner_restore_num_trials", local_dir=str(tmpdir) + name="test_tuner_restore_num_trials", storage_path=str(tmpdir) ), ) results = tuner.fit() @@ -179,7 +179,7 @@ def test_tuner_restore_resume_errored(ray_start_2_cpus, tmpdir): num_samples=1, ), run_config=RunConfig( - name="test_tuner_restore_resume_errored", local_dir=str(tmpdir) + name="test_tuner_restore_resume_errored", storage_path=str(tmpdir) ), param_space={ # Second and third trial fail @@ -231,7 +231,7 @@ def test_tuner_restore_restart_errored(ray_start_2_cpus, tmpdir): tune_config=TuneConfig(num_samples=1), run_config=RunConfig( name="test_tuner_restore_restart_errored", - local_dir=str(tmpdir), + storage_path=str(tmpdir), ), param_space={ # Second and third trial fail @@ -284,7 +284,7 @@ def test_tuner_resume_unfinished(ray_start_2_cpus, tmpdir): tune_config=TuneConfig(num_samples=1), run_config=RunConfig( name="test_tuner_resume_unfinished", - local_dir=str(tmpdir), + storage_path=str(tmpdir), failure_config=FailureConfig(fail_fast=False), callbacks=[_FailOnStats(num_trials=4, num_finished=2, delay=1)], ), @@ -345,7 +345,7 @@ def test_tuner_resume_errored_only(ray_start_2_cpus, tmpdir): tune_config=TuneConfig(num_samples=1), run_config=RunConfig( name="test_tuner_resume_errored_only", - local_dir=str(tmpdir), + storage_path=str(tmpdir), failure_config=FailureConfig(fail_fast=False), callbacks=[_FailOnStats(num_trials=4, num_finished=2, delay=1)], ), @@ -399,8 +399,8 @@ def test_tuner_restore_from_cloud(ray_start_2_cpus, tmpdir, clear_memory_filesys _dummy_train_fn, run_config=RunConfig( name="exp_dir", + storage_path="memory:///test/restore", local_dir=str(tmpdir / "ray_results"), - sync_config=tune.SyncConfig(upload_dir="memory:///test/restore"), ), ) tuner.fit() @@ -460,8 +460,8 @@ def test_tuner_restore_latest_available_checkpoint( ), run_config=RunConfig( name="test_tuner_restore_latest_available_checkpoint", + storage_path=upload_uri, local_dir=str(tmpdir), - sync_config=tune.SyncConfig(upload_dir=upload_uri), ), param_space={"failing_hanging": (fail_marker, None), "num_epochs": 4}, ) @@ -556,7 +556,7 @@ def load_checkpoint(self, checkpoint_path): run_config=RunConfig( name="tryout_restore", stop={"training_iteration": 5}, - local_dir=str(tmpdir), + storage_path=str(tmpdir), failure_config=FailureConfig(max_failures=1), checkpoint_config=CheckpointConfig(checkpoint_frequency=1), ), @@ -581,7 +581,7 @@ def train_func_1(config): tuner = Tuner( train_func_1, - run_config=RunConfig(name="overwrite_trainable", local_dir=str(tmpdir)), + run_config=RunConfig(name="overwrite_trainable", storage_path=str(tmpdir)), param_space={"data": 1}, ) tuner.fit() @@ -667,7 +667,7 @@ def create_trainable_with_params(): create_trainable_with_params(), run_config=RunConfig( name=exp_name, - local_dir=str(tmp_path), + storage_path=str(tmp_path), stop={"training_iteration": 3}, failure_config=FailureConfig(max_failures=0), checkpoint_config=CheckpointConfig( @@ -714,7 +714,7 @@ def test_tuner_restore_from_moved_experiment_path( ), run_config=RunConfig( name=old_exp_name, - local_dir=str(old_local_dir), + storage_path=str(old_local_dir), checkpoint_config=CheckpointConfig(num_to_keep=num_to_keep), ), param_space={ @@ -743,7 +743,7 @@ def test_tuner_restore_from_moved_experiment_path( analysis = tune.run( _train_fn_sometimes_failing, name=new_exp_name, - local_dir=str(new_local_dir), + storage_path=str(new_local_dir), resume="AUTO+ERRORED", ) results = ResultGrid(analysis) @@ -790,8 +790,8 @@ def failing_fn(config): failing_fn, run_config=RunConfig( name="exp_dir", + storage_path="memory:///original", local_dir=str(tmp_path / "ray_results"), - sync_config=tune.SyncConfig(upload_dir="memory:///original"), ), tune_config=TuneConfig(trial_dirname_creator=lambda _: "test"), ) @@ -842,7 +842,7 @@ def failing_fn(config): def test_restore_from_relative_path(ray_start_2_cpus, chdir_tmpdir): tuner = Tuner( _dummy_train_fn_with_report, - run_config=RunConfig(local_dir="relative_dir", name="exp_name"), + run_config=RunConfig(storage_path="relative_dir", name="exp_name"), ) tuner.fit() @@ -876,7 +876,7 @@ def on_trial_result(self, runner, trial, result): tuner = Tuner( _train_fn_sometimes_failing, - run_config=RunConfig(local_dir=str(tmpdir), name="exp_name"), + run_config=RunConfig(storage_path=str(tmpdir), name="exp_name"), tune_config=TuneConfig( search_alg=MockSearcher(), scheduler=MockScheduler(), @@ -959,7 +959,7 @@ def get_checkpoints(experiment_dir): tune_config=TuneConfig(num_samples=1), run_config=RunConfig( name="exp_name", - local_dir=str(tmp_path), + storage_path=str(tmp_path), checkpoint_config=CheckpointConfig(num_to_keep=num_to_keep), ), param_space=param_space, @@ -1008,8 +1008,8 @@ def test_tuner_can_restore(tmp_path, upload_dir): lambda config: None, run_config=RunConfig( name=name, + storage_path=upload_dir, local_dir=str(tmp_path), - sync_config=tune.SyncConfig(upload_dir=upload_dir), ), tune_config=TuneConfig(trial_dirname_creator=lambda t: "trial_dir"), ) @@ -1065,7 +1065,7 @@ def train_fn(config): param_space=param_space, tune_config=TuneConfig(num_samples=1), run_config=RunConfig( - local_dir=str(tmp_path), + storage_path=str(tmp_path), name="param_space_overwrite", callbacks=[_FailOnStats(num_trials=4, num_finished=2)], ), diff --git a/python/ray/tune/utils/release_test_util.py b/python/ray/tune/utils/release_test_util.py index 08a9e746bf36..db7d848c12aa 100644 --- a/python/ray/tune/utils/release_test_util.py +++ b/python/ray/tune/utils/release_test_util.py @@ -107,9 +107,9 @@ def timed_tune_run( **tune_kwargs, ): durable = ( - "sync_config" in tune_kwargs - and tune_kwargs["sync_config"].upload_dir - and tune_kwargs["sync_config"].upload_dir.startswith("s3://") + "storage_path" in tune_kwargs + and tune_kwargs["storage_path"] + and tune_kwargs["storage_path"].startswith("s3://") ) sleep_time = 1.0 / results_per_second diff --git a/release/tune_tests/cloud_tests/workloads/_tune_script.py b/release/tune_tests/cloud_tests/workloads/_tune_script.py index 81c03a64b128..4b4ba903a6c0 100644 --- a/release/tune_tests/cloud_tests/workloads/_tune_script.py +++ b/release/tune_tests/cloud_tests/workloads/_tune_script.py @@ -63,7 +63,7 @@ def on_step_begin(self, iteration, trials, **info): def run_tune( no_syncer: bool, - upload_dir: Optional[str] = None, + storage_path: Optional[str] = None, experiment_name: str = "cloud_test", indicator_file: str = "/tmp/tune_cloud_indicator", trainable: str = "function", @@ -106,9 +106,9 @@ def run_tune( resume="AUTO", num_samples=1, # 4 trials from the grid search config=config, + storage_path=storage_path, sync_config=tune.SyncConfig( syncer="auto" if not no_syncer else None, - upload_dir=upload_dir, sync_on_checkpoint=True, sync_period=0.5, sync_artifacts=True, @@ -125,7 +125,7 @@ def run_tune( parser.add_argument("--no-syncer", action="store_true", default=False) - parser.add_argument("--upload-dir", required=False, default=None, type=str) + parser.add_argument("--storage-path", required=False, default=None, type=str) parser.add_argument("--experiment-name", required=False, default=None, type=str) @@ -143,7 +143,7 @@ def run_tune( run_kwargs = dict( no_syncer=args.no_syncer or False, - upload_dir=args.upload_dir or None, + storage_path=args.storage_path or None, experiment_name=args.experiment_name or "cloud_test", indicator_file=args.indicator_file, trainable=trainable, diff --git a/release/tune_tests/cloud_tests/workloads/run_cloud_test.py b/release/tune_tests/cloud_tests/workloads/run_cloud_test.py index 682a940f10fb..6093007e4cfb 100644 --- a/release/tune_tests/cloud_tests/workloads/run_cloud_test.py +++ b/release/tune_tests/cloud_tests/workloads/run_cloud_test.py @@ -6,7 +6,7 @@ following Tune properties: syncer ("auto" or None) -upload_dir +storage_path Generally the flow is as follows: @@ -207,7 +207,7 @@ def wait_for_nodes( def start_run( no_syncer: bool, - upload_dir: Optional[str] = None, + storage_path: Optional[str] = None, experiment_name: str = "cloud_test", indicator_file: str = "/tmp/tune_cloud_indicator", ) -> subprocess.Popen: @@ -215,8 +215,8 @@ def start_run( if no_syncer: args.append("--no-syncer") - if upload_dir: - args.extend(["--upload-dir", upload_dir]) + if storage_path: + args.extend(["--storage-path", storage_path]) if experiment_name: args.extend(["--experiment-name", experiment_name]) @@ -309,13 +309,13 @@ def run_tune_script_for_time( experiment_name: str, indicator_file: str, no_syncer: bool, - upload_dir: Optional[str], + storage_path: Optional[str], run_start_timeout: int = 30, ): # Start run process = start_run( no_syncer=no_syncer, - upload_dir=upload_dir, + storage_path=storage_path, experiment_name=experiment_name, indicator_file=indicator_file, ) @@ -339,7 +339,7 @@ def run_resume_flow( experiment_name: str, indicator_file: str, no_syncer: bool, - upload_dir: Optional[str], + storage_path: Optional[str], first_run_time: int = 33, second_run_time: int = 33, run_start_timeout: int = 30, @@ -377,7 +377,7 @@ def run_resume_flow( experiment_name=experiment_name, indicator_file=indicator_file, no_syncer=no_syncer, - upload_dir=upload_dir, + storage_path=storage_path, run_start_timeout=run_start_timeout, ) @@ -397,7 +397,7 @@ def run_resume_flow( experiment_name=experiment_name, indicator_file=indicator_file, no_syncer=no_syncer, - upload_dir=upload_dir, + storage_path=storage_path, ) if after_experiments_callback: @@ -883,7 +883,7 @@ def test_no_sync_down(): No down syncing, so: syncer=None - upload_dir=None + storage_path=None Expected results after first checkpoint: @@ -1014,7 +1014,7 @@ def after_experiments(): experiment_name=experiment_name, indicator_file=indicator_file, no_syncer=True, - upload_dir=None, + storage_path=None, first_run_time=run_time, second_run_time=run_time, between_experiments_callback=between_experiments, @@ -1027,7 +1027,7 @@ def test_ssh_sync(): SSH syncing, so: syncer="auto" - upload_dir=None + storage_path=None Expected results after first checkpoint: @@ -1145,7 +1145,7 @@ def after_experiments(): experiment_name=experiment_name, indicator_file=indicator_file, no_syncer=False, - upload_dir=None, + storage_path=None, first_run_time=run_time + 10, # More time because of SSH syncing second_run_time=run_time + 10, between_experiments_callback=between_experiments, @@ -1158,7 +1158,7 @@ def test_durable_upload(bucket: str): Sync trial and experiment checkpoints to cloud, so: syncer="auto" - upload_dir="s3://" + storage_path="s3://" Expected results after first checkpoint: @@ -1320,7 +1320,7 @@ def after_experiments(): experiment_name=experiment_name, indicator_file=indicator_file, no_syncer=False, - upload_dir=bucket, + storage_path=bucket, first_run_time=run_time, second_run_time=run_time, run_start_timeout=run_start_timeout, diff --git a/release/tune_tests/fault_tolerance_tests/workloads/test_tune_worker_fault_tolerance.py b/release/tune_tests/fault_tolerance_tests/workloads/test_tune_worker_fault_tolerance.py index a5565a777de5..01467088eae7 100644 --- a/release/tune_tests/fault_tolerance_tests/workloads/test_tune_worker_fault_tolerance.py +++ b/release/tune_tests/fault_tolerance_tests/workloads/test_tune_worker_fault_tolerance.py @@ -26,7 +26,6 @@ import gc import ray -from ray import tune from ray.air import session, Checkpoint from ray.air.config import RunConfig, FailureConfig, CheckpointConfig from ray.tune.tune_config import TuneConfig @@ -34,10 +33,6 @@ from terminate_node_aws import create_instance_killer -import faulthandler - -faulthandler.enable() - MAX_ITERS = 40 ITER_TIME_BOUNDS = (60, 90) @@ -71,7 +66,7 @@ def main(bucket_uri: str): run_config=RunConfig( verbose=2, failure_config=FailureConfig(max_failures=-1), - sync_config=tune.SyncConfig(upload_dir=bucket_uri), + storage_path=bucket_uri, checkpoint_config=CheckpointConfig(num_to_keep=2), ), ) @@ -80,8 +75,11 @@ def main(bucket_uri: str): probability=0.03, time_between_checks_s=10, warmup_time_s=WARMUP_TIME_S ) results = tuner.fit() + print("Fitted:", results) del instance_killer + print("Deleted instance killer") gc.collect() + print("Collected garbage") for result in results: checkpoint_dict = result.checkpoint.to_dict() @@ -97,3 +95,4 @@ def main(bucket_uri: str): args, _ = parser.parse_known_args() main(args.bucket or "s3://tune-cloud-tests/worker_fault_tolerance") + print("Finished test.") diff --git a/release/tune_tests/scalability_tests/workloads/test_durable_trainable.py b/release/tune_tests/scalability_tests/workloads/test_durable_trainable.py index 9dd077397623..1b293a3fddd0 100644 --- a/release/tune_tests/scalability_tests/workloads/test_durable_trainable.py +++ b/release/tune_tests/scalability_tests/workloads/test_durable_trainable.py @@ -16,7 +16,6 @@ import os import ray -from ray import tune from ray.tune.utils.release_test_util import timed_tune_run @@ -77,9 +76,7 @@ def main(bucket): checkpoint_size_b=int(10 * 1000**2), # 10 MB keep_checkpoints_num=2, resources_per_trial={"cpu": 2}, - sync_config=tune.SyncConfig( - upload_dir=f"s3://{bucket}/durable/", - ), + storage_path=f"s3://{bucket}/durable/", )