Skip to content

Commit

Permalink
[air] Fix test_tune_torch_get_device_gpu race condition (#35004)
Browse files Browse the repository at this point in the history
The `test_tune_torch_get_device_gpu` test is flaky. Recently, the flakiness has been increased after switching to the new execution backend (presumably because of speedups in experiment start).

Due to the way the test is constructed, it keeps a Ray cluster alive. This then leads later tests in the same test suite to fail, as they try to re-initialize a Ray cluster.

This PR fixes the underlying cause of the race condition and implements a mitigation.

Signed-off-by: Kai Fricke <[email protected]>
  • Loading branch information
krfricke authored May 3, 2023
1 parent 41ad9ca commit dc6fd82
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
25 changes: 18 additions & 7 deletions python/ray/train/tests/test_torch_trainer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import contextlib
import uuid

import pytest
import time
import torch
Expand All @@ -11,7 +13,7 @@
from ray.train.batch_predictor import BatchPredictor
from ray.train.constants import DISABLE_LAZY_CHECKPOINTING_ENV
from ray.train.torch import TorchPredictor, TorchTrainer
from ray.air.config import ScalingConfig
from ray.air.config import RunConfig, ScalingConfig
from ray.train.torch import TorchConfig
from ray.train.trainer import TrainingFailedError
import ray.train as train
Expand Down Expand Up @@ -258,7 +260,6 @@ def test_tune_torch_get_device_gpu(num_gpus_per_worker):
(for example when used with Tune).
"""
from ray.air.config import ScalingConfig
import time

num_samples = 2
num_workers = 2
Expand All @@ -269,6 +270,7 @@ def test_tune_torch_get_device_gpu(num_gpus_per_worker):
# Divide by two because of a 2 node cluster.
gpus_per_node = total_gpus_required // 2

exception = None
# Use the same number of cpus per node as gpus per node.
with ray_start_2_node_cluster(
num_cpus_per_node=gpus_per_node, num_gpus_per_node=gpus_per_node
Expand All @@ -290,12 +292,14 @@ def train_fn():
@ray.remote(num_cpus=0)
class TrialActor:
def __init__(self, warmup_steps):
# adding warmup_steps to the config
# to avoid the error of checkpoint name conflict
time.sleep(2 * warmup_steps)
self.trainer = TorchTrainer(
train_fn,
torch_config=TorchConfig(backend="gloo"),
run_config=RunConfig(
# Use a unique name to avoid using the same
# experiment directory
name=f"test_tune_torch_get_device_gpu_{uuid.uuid4()}"
),
scaling_config=ScalingConfig(
num_workers=num_workers,
use_gpu=True,
Expand All @@ -313,8 +317,15 @@ def __init__(self, warmup_steps):
def run(self):
return self.trainer.fit()

actors = [TrialActor.remote(1) for _ in range(num_samples)]
ray.get([actor.run.remote() for actor in actors])
try:
actors = [TrialActor.remote(1) for _ in range(num_samples)]
ray.get([actor.run.remote() for actor in actors])
except Exception as exc:
exception = exc

# Raise exception after Ray cluster has been shutdown to avoid corrupted state
if exception:
raise exception


def test_torch_auto_unwrap(ray_start_4_cpus):
Expand Down
5 changes: 4 additions & 1 deletion python/ray/tune/execution/trial_runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from typing import Any, Dict, List, Optional, Union, Tuple, Set

from datetime import datetime
Expand Down Expand Up @@ -364,7 +365,9 @@ def save_to_dir(self, experiment_dir: Optional[str] = None):
},
}

tmp_file_name = os.path.join(experiment_dir, ".tmp_experiment_state")
tmp_file_name = os.path.join(
experiment_dir, f".tmp_experiment_state_{uuid.uuid4()}"
)

with open(tmp_file_name, "w") as f:
json.dump(runner_state, f, indent=2, cls=TuneFunctionEncoder)
Expand Down

0 comments on commit dc6fd82

Please sign in to comment.