Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Cleanup path-related properties in experiment classes #33370

Merged
merged 11 commits into from
Mar 17, 2023
Merged
9 changes: 9 additions & 0 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1595,13 +1595,22 @@ def simulate_storage(
elif storage_type == "s3":
from moto.server import ThreadedMotoServer

old_env = os.environ
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need theses?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests don't pass locally when no aws credentials are setup. They are setup in the buildkite runners automatically (as env vars). This is just piggybacking a small fix in the current CI

os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"

root = root or uuid.uuid4().hex
s3_server = f"http://localhost:{port}"
server = ThreadedMotoServer(port=port)
server.start()
url = f"s3://{root}?region={region}&endpoint_override={s3_server}"
yield url
server.stop()

os.environ = old_env

else:
raise NotImplementedError(f"Unknown storage type: {storage_type}")

Expand Down
2 changes: 1 addition & 1 deletion python/ray/air/integrations/mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def log_trial_end(self, trial: "Trial", failed: bool = False):

# Log the artifact if set_artifact is set to True.
if self.should_save_artifact:
self.mlflow_util.save_artifacts(run_id=run_id, dir=trial.logdir)
self.mlflow_util.save_artifacts(run_id=run_id, dir=trial.local_path)

# Stop the run once trial finishes.
status = "FINISHED" if not failed else "FAILED"
Expand Down
2 changes: 1 addition & 1 deletion python/ray/air/integrations/wandb.py
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ def _start_logging_actor(
actor_options={"num_cpus": 0, **_force_on_current_node()}
)
self._trial_logging_actors[trial] = self._remote_logger_class.remote(
logdir=trial.logdir,
logdir=trial.local_path,
queue=self._trial_queues[trial],
exclude=exclude_results,
to_config=self.AUTO_CONFIG_KEYS,
Expand Down
2 changes: 1 addition & 1 deletion python/ray/air/tests/mocked_wandb_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Trial(
"trial_name",
"experiment_dir_name",
"placement_group_factory",
"logdir",
"local_path",
],
)
):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/air/tests/test_integration_mlflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@


class MockTrial(
namedtuple("MockTrial", ["config", "trial_name", "trial_id", "logdir"])
namedtuple("MockTrial", ["config", "trial_name", "trial_id", "local_path"])
):
def __hash__(self):
return hash(self.trial_id)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/air/tests/test_integration_wandb.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def test_wandb_logger_exclude_config(self):
trial_name="trial_0",
experiment_dir_name="trainable",
placement_group_factory=PlacementGroupFactory([{"CPU": 1}]),
logdir=tempfile.gettempdir(),
local_path=tempfile.gettempdir(),
)
logger = WandbTestExperimentLogger(
project="test_project",
Expand Down
23 changes: 13 additions & 10 deletions python/ray/tune/analysis/experiment_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ def __init__(
# If only a mode was passed, use anonymous metric
self.default_metric = DEFAULT_METRIC

self._local_base_dir = self._checkpoints_and_paths[0][1].parent

self._local_experiment_path = self._checkpoints_and_paths[0][1]
if not pd:
logger.warning(
"pandas not installed. Run `pip install pandas` for "
Expand All @@ -110,15 +109,19 @@ def __init__(
else:
self.fetch_trial_dataframes()

self._sync_config = sync_config
remote_storage_path = None
if sync_config and sync_config.upload_dir:
remote_storage_path = sync_config.upload_dir

self._remote_storage_path = remote_storage_path

def _parse_cloud_path(self, local_path: str):
"""Convert local path into cloud storage path"""
if not self._sync_config or not self._sync_config.upload_dir:
if not self._remote_storage_path:
return None

return local_path.replace(
str(self._local_base_dir), self._sync_config.upload_dir
str(Path(self._local_experiment_path).parent), self._remote_storage_path
)

def _load_checkpoints(self, experiment_checkpoint_path: str) -> List[str]:
Expand Down Expand Up @@ -686,7 +689,7 @@ def get_best_logdir(
based on `mode`, and compare trials based on `mode=[min,max]`.
"""
best_trial = self.get_best_trial(metric, mode, scope)
return best_trial.logdir if best_trial else None
return best_trial.local_path if best_trial else None

def get_last_checkpoint(self, trial=None, metric="training_iteration", mode="max"):
"""Gets the last persistent checkpoint path of the provided trial,
Expand Down Expand Up @@ -783,8 +786,8 @@ def runner_data(self) -> Dict:
def _get_trial_paths(self) -> List[str]:
if self.trials:
# We do not need to set the relative path here
# Maybe assert that t.logdir is in local_base_path?
_trial_paths = [str(t.logdir) for t in self.trials]
# Maybe assert that t.local_path is in local_base_path?
_trial_paths = [str(t.local_path) for t in self.trials]
else:
logger.info(
"No `self.trials`. Drawing logdirs from checkpoint "
Expand All @@ -795,7 +798,7 @@ def _get_trial_paths(self) -> List[str]:
for trial_json_state, path in self._checkpoints_and_paths:
try:
trial = Trial.from_json_state(trial_json_state, stub=True)
trial.local_dir = str(path)
trial.local_experiment_path = str(path)
except Exception:
logger.warning(
f"Could not load trials from experiment checkpoint. "
Expand All @@ -808,7 +811,7 @@ def _get_trial_paths(self) -> List[str]:
self.trials.append(trial)

self.trials.sort(key=lambda trial: trial.trial_id)
_trial_paths = [str(trial.logdir) for trial in self.trials]
_trial_paths = [str(trial.local_path) for trial in self.trials]

if not _trial_paths:
raise TuneError("No trials found.")
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tune/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def train(config):
"""

# File templates for any artifacts written by this callback
# These files should live in the `trial.logdir` for each trial.
# These files should live in the `trial.local_path` for each trial.
# TODO(ml-team): Make this more visible to users to override. Internal use for now.
_SAVED_FILE_TEMPLATES = []

Expand Down
8 changes: 4 additions & 4 deletions python/ray/tune/execution/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ def reset_trial(
trainable.reset.remote(
extra_config,
logger_creator=logger_creator,
remote_checkpoint_dir=trial.remote_checkpoint_dir,
remote_checkpoint_dir=trial.remote_path,
),
timeout=DEFAULT_GET_TIMEOUT,
)
Expand Down Expand Up @@ -938,8 +938,8 @@ def save(
metrics=result,
local_to_remote_path_fn=partial(
TrainableUtil.get_remote_storage_path,
logdir=trial.logdir,
remote_checkpoint_dir=trial.remote_checkpoint_dir,
logdir=trial.local_path,
remote_checkpoint_dir=trial.remote_path,
)
if trial.uses_cloud_checkpointing
else None,
Expand Down Expand Up @@ -1070,7 +1070,7 @@ def _change_working_directory(self, trial):
if ray._private.worker._mode() == ray._private.worker.LOCAL_MODE:
old_dir = os.getcwd()
try:
os.chdir(trial.logdir)
os.chdir(trial.local_path)
yield
finally:
os.chdir(old_dir)
Expand Down
Loading