From d15342bec3eb1fa2d19f3df7fa6fb2c6c3ab9d66 Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Wed, 7 Dec 2022 13:08:07 -0800 Subject: [PATCH] Revert "Revert "[AIR] Change ScalingConfig to be optional for `DataParallelTrainer`s if already in Tuner `param_space`"" (#30918) (#30920) #30715 introduced a test that was failing due to a change in master, and this PR wasn't fully up to date with master when the CI tests were all passing. Signed-off-by: Justin Yu Signed-off-by: tmynn --- python/ray/train/data_parallel_trainer.py | 48 ++++++++++++------- .../train/tests/test_data_parallel_trainer.py | 24 ++++++++++ 2 files changed, 55 insertions(+), 17 deletions(-) diff --git a/python/ray/train/data_parallel_trainer.py b/python/ray/train/data_parallel_trainer.py index 03b42fe76827..5bf4e75d3022 100644 --- a/python/ray/train/data_parallel_trainer.py +++ b/python/ray/train/data_parallel_trainer.py @@ -282,23 +282,6 @@ def __init__( def _validate_attributes(self): super()._validate_attributes() - if not self.scaling_config.use_gpu and "GPU" in ray.available_resources(): - logger.info( - "GPUs are detected in your Ray cluster, but GPU " - "training is not enabled for this trainer. To enable " - "GPU training, make sure to set `use_gpu` to True " - "in your scaling config." - ) - - if self.scaling_config.num_workers is None: - raise ValueError("You must specify the 'num_workers' in scaling_config.") - - if self.scaling_config.num_workers <= 0: - raise ValueError( - "'num_workers' in `scaling_config` must be a positive " - f"integer. Received {self.scaling_config.num_workers}" - ) - self._validate_train_loop_per_worker( self._train_loop_per_worker, "train_loop_per_worker" ) @@ -320,6 +303,37 @@ def _validate_train_loop_per_worker( f"but it accepts {num_params} arguments instead." ) + @classmethod + def _validate_scaling_config(cls, scaling_config: ScalingConfig) -> ScalingConfig: + scaling_config = super(DataParallelTrainer, cls)._validate_scaling_config( + scaling_config + ) + + # This validation happens after the scaling config is updated from + # its specification in the Tuner `param_space` + if not scaling_config.use_gpu and "GPU" in ray.available_resources(): + logger.info( + "GPUs are detected in your Ray cluster, but GPU " + "training is not enabled for this trainer. To enable " + "GPU training, make sure to set `use_gpu` to True " + "in your scaling config." + ) + + if scaling_config.num_workers is None: + raise ValueError( + "You must specify the 'num_workers' in `scaling_config` as either an " + f"argument of `{cls.__name__}` or through the `param_space` of a " + "`Tuner` (if performing hyperparameter tuning)." + ) + + if scaling_config.num_workers <= 0: + raise ValueError( + "'num_workers' in `scaling_config` must be a positive " + f"integer. Received {scaling_config.num_workers}" + ) + + return scaling_config + def _report(self, training_iterator: TrainingIterator) -> None: for results in training_iterator: # TODO(ml-team): add ability to report results from multiple workers. diff --git a/python/ray/train/tests/test_data_parallel_trainer.py b/python/ray/train/tests/test_data_parallel_trainer.py index a23c20ed02a4..4247bb32815d 100644 --- a/python/ray/train/tests/test_data_parallel_trainer.py +++ b/python/ray/train/tests/test_data_parallel_trainer.py @@ -258,6 +258,30 @@ def train_func(config): assert trainer._train_loop_config["x"] == 100 +def test_scaling_config_validation(ray_start_4_cpus): + def train_func(config): + session.report({"loss": config["x"]}) + + # Should be able to create a DataParallelTrainer w/o scaling_config, + # but it should fail on fit + trainer = DataParallelTrainer( + train_loop_per_worker=train_func, + train_loop_config={"x": 100}, + ) + with pytest.raises(ValueError): + trainer.fit() + + # Scaling config must be passed in through Tuner param space if not + # included in the initial trainer + tuner = Tuner(trainer) + with pytest.raises(ValueError): + tuner.fit() + + tuner = Tuner(trainer, param_space={"scaling_config": ScalingConfig(num_workers=1)}) + results = tuner.fit() + assert not results.errors + + def test_fast_slow(ray_start_4_cpus): def train_func(): for i in range(2):