Skip to content

Commit

Permalink
[tune] Cleanup path-related properties in experiment classes (ray-pro…
Browse files Browse the repository at this point in the history
…ject#33370)

The naming of different path-related components in Ray Tune is currently messy. For instance, `Experiment.local_dir` refers to main results directory, e.g. `~/ray_results`, while `Trial.local_dir` refers to the experiment directory, e.g. `~/ray_results/experiment`. The same is true for properties, where it's unclear if it refers to the object's sub directory or to its parent directory.

To disentangle this information, this PR introduces a new naming convention.

- All entities receiving a "parent path" receive it in a unambiguous naming scheme.
- For instance, `Experiment(storage_path)`, `TrialRunner(experiment_path)`, `Trial(experiment_path)`
- Outputs are also normalized. E.g. `Trial.remote_experiment_path` and `Trial.local_experiment_path`
- We keep existing arguments and properties for backwards compatibility

Signed-off-by: Kai Fricke <[email protected]>
Signed-off-by: elliottower <[email protected]>
  • Loading branch information
krfricke authored and elliottower committed Apr 22, 2023
1 parent a90a0d9 commit ec730c0
Show file tree
Hide file tree
Showing 43 changed files with 443 additions and 246 deletions.
9 changes: 9 additions & 0 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1595,13 +1595,22 @@ def simulate_storage(
elif storage_type == "s3":
from moto.server import ThreadedMotoServer

old_env = os.environ
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"

root = root or uuid.uuid4().hex
s3_server = f"http://localhost:{port}"
server = ThreadedMotoServer(port=port)
server.start()
url = f"s3://{root}?region={region}&endpoint_override={s3_server}"
yield url
server.stop()

os.environ = old_env

else:
raise NotImplementedError(f"Unknown storage type: {storage_type}")

Expand Down
2 changes: 1 addition & 1 deletion python/ray/air/integrations/mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def log_trial_end(self, trial: "Trial", failed: bool = False):

# Log the artifact if set_artifact is set to True.
if self.should_save_artifact:
self.mlflow_util.save_artifacts(run_id=run_id, dir=trial.logdir)
self.mlflow_util.save_artifacts(run_id=run_id, dir=trial.local_path)

# Stop the run once trial finishes.
status = "FINISHED" if not failed else "FAILED"
Expand Down
2 changes: 1 addition & 1 deletion python/ray/air/integrations/wandb.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ def _start_logging_actor(
actor_options={"num_cpus": 0, **_force_on_current_node()}
)
self._trial_logging_actors[trial] = self._remote_logger_class.remote(
logdir=trial.logdir,
logdir=trial.local_path,
queue=self._trial_queues[trial],
exclude=exclude_results,
to_config=self.AUTO_CONFIG_KEYS,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/air/tests/mocked_wandb_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Trial(
"trial_name",
"experiment_dir_name",
"placement_group_factory",
"logdir",
"local_path",
],
)
):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/air/tests/test_integration_mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


class MockTrial(
namedtuple("MockTrial", ["config", "trial_name", "trial_id", "logdir"])
namedtuple("MockTrial", ["config", "trial_name", "trial_id", "local_path"])
):
def __hash__(self):
return hash(self.trial_id)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/air/tests/test_integration_wandb.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def test_wandb_logger_exclude_config(self):
trial_name="trial_0",
experiment_dir_name="trainable",
placement_group_factory=PlacementGroupFactory([{"CPU": 1}]),
logdir=tempfile.gettempdir(),
local_path=tempfile.gettempdir(),
)
logger = WandbTestExperimentLogger(
project="test_project",
Expand Down
23 changes: 13 additions & 10 deletions python/ray/tune/analysis/experiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ def __init__(
# If only a mode was passed, use anonymous metric
self.default_metric = DEFAULT_METRIC

self._local_base_dir = self._checkpoints_and_paths[0][1].parent

self._local_experiment_path = self._checkpoints_and_paths[0][1]
if not pd:
logger.warning(
"pandas not installed. Run `pip install pandas` for "
Expand All @@ -110,15 +109,19 @@ def __init__(
else:
self.fetch_trial_dataframes()

self._sync_config = sync_config
remote_storage_path = None
if sync_config and sync_config.upload_dir:
remote_storage_path = sync_config.upload_dir

self._remote_storage_path = remote_storage_path

def _parse_cloud_path(self, local_path: str):
"""Convert local path into cloud storage path"""
if not self._sync_config or not self._sync_config.upload_dir:
if not self._remote_storage_path:
return None

return local_path.replace(
str(self._local_base_dir), self._sync_config.upload_dir
str(Path(self._local_experiment_path).parent), self._remote_storage_path
)

def _load_checkpoints(self, experiment_checkpoint_path: str) -> List[str]:
Expand Down Expand Up @@ -686,7 +689,7 @@ def get_best_logdir(
based on `mode`, and compare trials based on `mode=[min,max]`.
"""
best_trial = self.get_best_trial(metric, mode, scope)
return best_trial.logdir if best_trial else None
return best_trial.local_path if best_trial else None

def get_last_checkpoint(self, trial=None, metric="training_iteration", mode="max"):
"""Gets the last persistent checkpoint path of the provided trial,
Expand Down Expand Up @@ -783,8 +786,8 @@ def runner_data(self) -> Dict:
def _get_trial_paths(self) -> List[str]:
if self.trials:
# We do not need to set the relative path here
# Maybe assert that t.logdir is in local_base_path?
_trial_paths = [str(t.logdir) for t in self.trials]
# Maybe assert that t.local_path is in local_base_path?
_trial_paths = [str(t.local_path) for t in self.trials]
else:
logger.info(
"No `self.trials`. Drawing logdirs from checkpoint "
Expand All @@ -795,7 +798,7 @@ def _get_trial_paths(self) -> List[str]:
for trial_json_state, path in self._checkpoints_and_paths:
try:
trial = Trial.from_json_state(trial_json_state, stub=True)
trial.local_dir = str(path)
trial.local_experiment_path = str(path)
except Exception:
logger.warning(
f"Could not load trials from experiment checkpoint. "
Expand All @@ -808,7 +811,7 @@ def _get_trial_paths(self) -> List[str]:
self.trials.append(trial)

self.trials.sort(key=lambda trial: trial.trial_id)
_trial_paths = [str(trial.logdir) for trial in self.trials]
_trial_paths = [str(trial.local_path) for trial in self.trials]

if not _trial_paths:
raise TuneError("No trials found.")
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tune/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def train(config):
"""

# File templates for any artifacts written by this callback
# These files should live in the `trial.logdir` for each trial.
# These files should live in the `trial.local_path` for each trial.
# TODO(ml-team): Make this more visible to users to override. Internal use for now.
_SAVED_FILE_TEMPLATES = []

Expand Down
8 changes: 4 additions & 4 deletions python/ray/tune/execution/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ def reset_trial(
trainable.reset.remote(
extra_config,
logger_creator=logger_creator,
remote_checkpoint_dir=trial.remote_checkpoint_dir,
remote_checkpoint_dir=trial.remote_path,
),
timeout=DEFAULT_GET_TIMEOUT,
)
Expand Down Expand Up @@ -938,8 +938,8 @@ def save(
metrics=result,
local_to_remote_path_fn=partial(
TrainableUtil.get_remote_storage_path,
logdir=trial.logdir,
remote_checkpoint_dir=trial.remote_checkpoint_dir,
logdir=trial.local_path,
remote_checkpoint_dir=trial.remote_path,
)
if trial.uses_cloud_checkpointing
else None,
Expand Down Expand Up @@ -1070,7 +1070,7 @@ def _change_working_directory(self, trial):
if ray._private.worker._mode() == ray._private.worker.LOCAL_MODE:
old_dir = os.getcwd()
try:
os.chdir(trial.logdir)
os.chdir(trial.local_path)
yield
finally:
os.chdir(old_dir)
Expand Down
Loading

0 comments on commit ec730c0

Please sign in to comment.