Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air] Fix test_tune_torch_get_device_gpu race condition #35004

Merged
merged 2 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions python/ray/train/tests/test_torch_trainer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import contextlib
import uuid

import pytest
import time
import torch
Expand All @@ -11,7 +13,7 @@
from ray.train.batch_predictor import BatchPredictor
from ray.train.constants import DISABLE_LAZY_CHECKPOINTING_ENV
from ray.train.torch import TorchPredictor, TorchTrainer
from ray.air.config import ScalingConfig
from ray.air.config import RunConfig, ScalingConfig
from ray.train.torch import TorchConfig
from ray.train.trainer import TrainingFailedError
import ray.train as train
Expand Down Expand Up @@ -258,7 +260,6 @@ def test_tune_torch_get_device_gpu(num_gpus_per_worker):
(for example when used with Tune).
"""
from ray.air.config import ScalingConfig
import time

num_samples = 2
num_workers = 2
Expand All @@ -269,6 +270,7 @@ def test_tune_torch_get_device_gpu(num_gpus_per_worker):
# Divide by two because of a 2 node cluster.
gpus_per_node = total_gpus_required // 2

exception = None
# Use the same number of cpus per node as gpus per node.
with ray_start_2_node_cluster(
num_cpus_per_node=gpus_per_node, num_gpus_per_node=gpus_per_node
Expand All @@ -290,12 +292,14 @@ def train_fn():
@ray.remote(num_cpus=0)
class TrialActor:
def __init__(self, warmup_steps):
# adding warmup_steps to the config
# to avoid the error of checkpoint name conflict
time.sleep(2 * warmup_steps)
self.trainer = TorchTrainer(
train_fn,
torch_config=TorchConfig(backend="gloo"),
run_config=RunConfig(
# Use a unique name to avoid using the same
# experiment directory
name=f"test_tune_torch_get_device_gpu_{uuid.uuid4()}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe simply os.getpid() will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it with the uuid for now, it's just a test anyway :-)

),
scaling_config=ScalingConfig(
num_workers=num_workers,
use_gpu=True,
Expand All @@ -313,8 +317,15 @@ def __init__(self, warmup_steps):
def run(self):
return self.trainer.fit()

actors = [TrialActor.remote(1) for _ in range(num_samples)]
ray.get([actor.run.remote() for actor in actors])
try:
actors = [TrialActor.remote(1) for _ in range(num_samples)]
ray.get([actor.run.remote() for actor in actors])
except Exception as exc:
exception = exc

# Raise exception after Ray cluster has been shutdown to avoid corrupted state
if exception:
raise exception


def test_torch_auto_unwrap(ray_start_4_cpus):
Expand Down
5 changes: 4 additions & 1 deletion python/ray/tune/execution/trial_runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from typing import Any, Dict, List, Optional, Union, Tuple, Set

from datetime import datetime
Expand Down Expand Up @@ -364,7 +365,9 @@ def save_to_dir(self, experiment_dir: Optional[str] = None):
},
}

tmp_file_name = os.path.join(experiment_dir, ".tmp_experiment_state")
tmp_file_name = os.path.join(
experiment_dir, f".tmp_experiment_state_{uuid.uuid4()}"
)

with open(tmp_file_name, "w") as f:
json.dump(runner_state, f, indent=2, cls=TuneFunctionEncoder)
Expand Down