Skip to content

Commit

Permalink
[AIR] Fix reserved CPU warning if no CPUs are used (ray-project#30598)
Browse files Browse the repository at this point in the history
If a ScalingConfig is configured to require no CPUs at all for a trial, TunerInternal._expected_utilization will encounter a divide by zero error. This can be encountered when using a GPU enabled trainer on Google Colab. This PR fixes this issue.

Signed-off-by: Antoni Baum <[email protected]>
Signed-off-by: Weichen Xu <[email protected]>
  • Loading branch information
Yard1 authored and WeichenXu123 committed Dec 19, 2022
1 parent 02bc78e commit b27f395
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 79 deletions.
2 changes: 1 addition & 1 deletion python/ray/train/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ py_test(
size = "medium",
srcs = ["tests/test_base_trainer.py"],
tags = ["team:ml", "exclusive", "ray_air"],
deps = [":train_lib"]
deps = [":train_lib", ":conftest"]
)

py_test(
Expand Down
168 changes: 91 additions & 77 deletions python/ray/train/tests/test_base_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,28 @@


@pytest.fixture
def ray_start_4_cpus():
address_info = ray.init(num_cpus=4)
yield address_info
def mock_tuner_internal_logger():
class MockLogger:
def __init__(self):
self.warnings = []

def warning(self, msg):
self.warnings.append(msg)

def warn(self, msg, **kwargs):
self.warnings.append(msg)

def info(self, msg):
print(msg)

def clear(self):
self.warnings = []

old = tuner_internal.warnings
tuner_internal.warnings = MockLogger()
yield tuner_internal.warnings
# The code after the yield will run as teardown code.
ray.shutdown()
tuner_internal.warnings = old


class DummyPreprocessor(Preprocessor):
Expand Down Expand Up @@ -218,90 +235,87 @@ def train_loop(self):
tune.run(trainer.as_trainable(), num_samples=4)


def test_reserved_cpu_warnings(ray_start_4_cpus):
def test_reserved_cpu_warnings(ray_start_4_cpus, mock_tuner_internal_logger):
def train_loop(config):
pass

class MockLogger:
def __init__(self):
self.warnings = []
# Fraction correctly specified.
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1, _max_cpu_fraction_per_node=0.9),
datasets={"train": ray.data.range(10)},
)
trainer.fit()
assert not mock_tuner_internal_logger.warnings

def warning(self, msg):
self.warnings.append(msg)
# No datasets, no fraction.
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1),
)
trainer.fit()
assert not mock_tuner_internal_logger.warnings

def warn(self, msg, **kwargs):
self.warnings.append(msg)
# Should warn.
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": ray.data.range(10)},
)
trainer.fit()
assert (
len(mock_tuner_internal_logger.warnings) == 1
), mock_tuner_internal_logger.warnings
assert "_max_cpu_fraction_per_node" in mock_tuner_internal_logger.warnings[0]
mock_tuner_internal_logger.clear()

def info(self, msg):
print(msg)
# Warn if num_samples is configured
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1),
datasets={"train": ray.data.range(10)},
)
tuner = tune.Tuner(trainer, tune_config=tune.TuneConfig(num_samples=3))
tuner.fit()
assert (
len(mock_tuner_internal_logger.warnings) == 1
), mock_tuner_internal_logger.warnings
assert "_max_cpu_fraction_per_node" in mock_tuner_internal_logger.warnings[0]
mock_tuner_internal_logger.clear()

# Don't warn if resources * samples < 0.8
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1, trainer_resources={"CPU": 0}),
datasets={"train": ray.data.range(10)},
)
tuner = tune.Tuner(trainer, tune_config=tune.TuneConfig(num_samples=3))
tuner.fit()
assert not mock_tuner_internal_logger.warnings

def clear(self):
self.warnings = []
# Don't warn if Trainer is not used
tuner = tune.Tuner(train_loop, tune_config=tune.TuneConfig(num_samples=3))
tuner.fit()
assert not mock_tuner_internal_logger.warnings

try:
old = tuner_internal.warnings
tuner_internal.warnings = MockLogger()

# Fraction correctly specified.
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1, _max_cpu_fraction_per_node=0.9),
datasets={"train": ray.data.range(10)},
)
trainer.fit()
assert not tuner_internal.warnings.warnings
def test_reserved_cpu_warnings_no_cpu_usage(
ray_start_1_cpu_1_gpu, mock_tuner_internal_logger
):
"""Ensure there is no divide by zero error if trial requires no CPUs."""

# No datasets, no fraction.
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1),
)
trainer.fit()
assert not tuner_internal.warnings.warnings
def train_loop(config):
pass

# Should warn.
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": ray.data.range(10)},
)
trainer.fit()
assert (
len(tuner_internal.warnings.warnings) == 1
), tuner_internal.warnings.warnings
assert "_max_cpu_fraction_per_node" in tuner_internal.warnings.warnings[0]
tuner_internal.warnings.clear()

# Warn if num_samples is configured
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1),
datasets={"train": ray.data.range(10)},
)
tuner = tune.Tuner(trainer, tune_config=tune.TuneConfig(num_samples=3))
tuner.fit()
assert (
len(tuner_internal.warnings.warnings) == 1
), tuner_internal.warnings.warnings
assert "_max_cpu_fraction_per_node" in tuner_internal.warnings.warnings[0]
tuner_internal.warnings.clear()

# Don't warn if resources * samples < 0.8
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(num_workers=1, trainer_resources={"CPU": 0}),
datasets={"train": ray.data.range(10)},
)
tuner = tune.Tuner(trainer, tune_config=tune.TuneConfig(num_samples=3))
tuner.fit()
assert not tuner_internal.warnings.warnings

# Don't warn if Trainer is not used
tuner = tune.Tuner(train_loop, tune_config=tune.TuneConfig(num_samples=3))
tuner.fit()
assert not tuner_internal.warnings.warnings
finally:
tuner_internal.warnings = old
trainer = DummyTrainer(
train_loop,
scaling_config=ScalingConfig(
num_workers=1, use_gpu=True, trainer_resources={"CPU": 0}
),
datasets={"train": ray.data.range(10)},
)
trainer.fit()
assert not mock_tuner_internal_logger.warnings


def test_setup(ray_start_4_cpus):
Expand Down
6 changes: 5 additions & 1 deletion python/ray/tune/impl/tuner_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,11 @@ def _expected_utilization(self, cpus_per_trial, cpus_total):
concurrent_trials = math.inf

actual_concurrency = min(
(cpus_total // cpus_per_trial, num_samples, concurrent_trials)
(
(cpus_total // cpus_per_trial) if cpus_per_trial else 0,
num_samples,
concurrent_trials,
)
)
return (actual_concurrency * cpus_per_trial) / (cpus_total + 0.001)

Expand Down

0 comments on commit b27f395

Please sign in to comment.