diff --git a/python/ray/train/batch_predictor.py b/python/ray/train/batch_predictor.py index 41836b234e6e..78b3e9d45085 100644 --- a/python/ray/train/batch_predictor.py +++ b/python/ray/train/batch_predictor.py @@ -6,6 +6,7 @@ from ray.air import Checkpoint from ray.air.util.data_batch_conversion import convert_batch_type_to_pandas from ray.data import Preprocessor +from ray.data.preprocessors import BatchMapper from ray.train.predictor import Predictor from ray.util.annotations import PublicAPI @@ -81,8 +82,9 @@ def predict( batch_size: int = 4096, min_scoring_workers: int = 1, max_scoring_workers: Optional[int] = None, - num_cpus_per_worker: int = 1, - num_gpus_per_worker: int = 0, + num_cpus_per_worker: Optional[int] = None, + num_gpus_per_worker: Optional[int] = None, + separate_gpu_stage: bool = True, ray_remote_args: Optional[Dict[str, Any]] = None, **predict_kwargs, ) -> ray.data.Dataset: @@ -127,6 +129,9 @@ def predict( max_scoring_workers: If set, specify the maximum number of scoring actors. num_cpus_per_worker: Number of CPUs to allocate per scoring worker. num_gpus_per_worker: Number of GPUs to allocate per scoring worker. + separate_gpu_stage: If using GPUs, specifies whether to execute GPU + processing in a separate stage (enabled by default). This avoids + running expensive preprocessing steps on GPU workers. ray_remote_args: Additional resource requirements to request from ray. predict_kwargs: Keyword arguments passed to the predictor's @@ -136,6 +141,16 @@ def predict( Dataset containing scoring results. """ + if num_gpus_per_worker is None: + num_gpus_per_worker = 0 + if num_cpus_per_worker is None: + if num_gpus_per_worker > 0: + # Don't request a CPU here, to avoid unnecessary contention. The GPU + # resource request suffices for scheduling. + num_cpus_per_worker = 0 + else: + num_cpus_per_worker = 1 + predictor_cls = self._predictor_cls checkpoint_ref = self._checkpoint_ref predictor_kwargs = self._predictor_kwargs @@ -177,6 +192,14 @@ def __call__(self, batch): ray_remote_args["num_cpus"] = num_cpus_per_worker ray_remote_args["num_gpus"] = num_gpus_per_worker + if separate_gpu_stage and num_gpus_per_worker > 0: + preprocessor = self.get_preprocessor() + if preprocessor: + # Set the in-predictor preprocessing to a no-op when using a separate + # GPU stage. Otherwise, the preprocessing will be applied twice. + override_prep = BatchMapper(lambda x: x) + data = preprocessor.transform(data) + prediction_results = data.map_batches( ScoringWrapper, compute=compute, @@ -199,8 +222,9 @@ def predict_pipelined( batch_size: int = 4096, min_scoring_workers: int = 1, max_scoring_workers: Optional[int] = None, - num_cpus_per_worker: int = 1, - num_gpus_per_worker: int = 0, + num_cpus_per_worker: Optional[int] = None, + num_gpus_per_worker: Optional[int] = None, + separate_gpu_stage: bool = True, ray_remote_args: Optional[Dict[str, Any]] = None, **predict_kwargs, ) -> ray.data.DatasetPipeline: @@ -251,6 +275,9 @@ def predict_pipelined( max_scoring_workers: If set, specify the maximum number of scoring actors. num_cpus_per_worker: Number of CPUs to allocate per scoring worker. num_gpus_per_worker: Number of GPUs to allocate per scoring worker. + separate_gpu_stage: If using GPUs, specifies whether to execute GPU + processing in a separate stage (enabled by default). This avoids + running expensive preprocessing steps on GPU workers. ray_remote_args: Additional resource requirements to request from ray. predict_kwargs: Keyword arguments passed to the predictor's @@ -279,6 +306,7 @@ def predict_pipelined( max_scoring_workers=max_scoring_workers, num_cpus_per_worker=num_cpus_per_worker, num_gpus_per_worker=num_gpus_per_worker, + separate_gpu_stage=separate_gpu_stage, ray_remote_args=ray_remote_args, **predict_kwargs, ) diff --git a/python/ray/train/tests/test_batch_predictor.py b/python/ray/train/tests/test_batch_predictor.py index dd1d5d080eba..3457a98aeedc 100644 --- a/python/ray/train/tests/test_batch_predictor.py +++ b/python/ray/train/tests/test_batch_predictor.py @@ -8,15 +8,18 @@ import ray from ray.air.checkpoint import Checkpoint from ray.data import Preprocessor +from ray.tests.conftest import * # noqa from ray.train.batch_predictor import BatchPredictor from ray.train.predictor import Predictor class DummyPreprocessor(Preprocessor): + _is_fittable = False + def __init__(self, multiplier=2): self.multiplier = multiplier - def transform_batch(self, df): + def _transform_pandas(self, df): return df * self.multiplier @@ -44,7 +47,7 @@ def from_checkpoint( def _predict_pandas(self, data: pd.DataFrame, **kwargs) -> pd.DataFrame: # Need to throw exception here instead of constructor to surface the # exception to pytest rather than ray worker. - if self.use_gpu: + if self.use_gpu and "allow_gpu" not in kwargs: raise ValueError("DummyPredictor does not support GPU prediction.") else: return data * self.factor @@ -61,6 +64,54 @@ def from_checkpoint(cls, checkpoint: Checkpoint, **kwargs) -> "DummyPredictor": return cls(checkpoint_data["factor"], preprocessor=preprocessor) +def test_separate_gpu_stage(shutdown_only): + ray.init(num_gpus=1) + batch_predictor = BatchPredictor.from_checkpoint( + Checkpoint.from_dict({"factor": 2.0, PREPROCESSOR_KEY: DummyPreprocessor()}), + DummyPredictor, + ) + ds = batch_predictor.predict( + ray.data.range_table(10), + num_gpus_per_worker=1, + separate_gpu_stage=True, + allow_gpu=True, + ) + stats = ds.stats() + assert "Stage 1 read->map_batches:" in stats, stats + assert "Stage 2 map_batches:" in stats, stats + assert ds.max("value") == 36.0, ds + + ds = batch_predictor.predict( + ray.data.range_table(10), + num_gpus_per_worker=1, + separate_gpu_stage=False, + allow_gpu=True, + ) + stats = ds.stats() + assert "Stage 1 read:" in stats, stats + assert "Stage 2 map_batches:" in stats, stats + assert ds.max("value") == 36.0, ds + + +def test_automatic_enable_gpu_from_num_gpus_per_worker(shutdown_only): + """ + Test we automatically set underlying Predictor creation use_gpu to True if + we found num_gpus_per_worker > 0 in BatchPredictor's predict() call. + """ + ray.init(num_gpus=1) + + batch_predictor = BatchPredictor.from_checkpoint( + Checkpoint.from_dict({"factor": 2.0, PREPROCESSOR_KEY: DummyPreprocessor()}), + DummyPredictor, + ) + test_dataset = ray.data.range_table(4) + + with pytest.raises( + ValueError, match="DummyPredictor does not support GPU prediction" + ): + _ = batch_predictor.predict(test_dataset, num_gpus_per_worker=1) + + def test_batch_prediction(): batch_predictor = BatchPredictor.from_checkpoint( Checkpoint.from_dict({"factor": 2.0, PREPROCESSOR_KEY: DummyPreprocessor()}), @@ -164,24 +215,6 @@ def check_truth(df, all_true=False): assert output == [True, True, True] -def test_automatic_enable_gpu_from_num_gpus_per_worker(): - """ - Test we automatically set underlying Predictor creation use_gpu to True if - we found num_gpus_per_worker > 0 in BatchPredictor's predict() call. - """ - - batch_predictor = BatchPredictor.from_checkpoint( - Checkpoint.from_dict({"factor": 2.0, PREPROCESSOR_KEY: DummyPreprocessor()}), - DummyPredictor, - ) - test_dataset = ray.data.range(4) - - with pytest.raises( - ValueError, match="DummyPredictor does not support GPU prediction" - ): - _ = batch_predictor.predict(test_dataset, num_gpus_per_worker=1) - - def test_get_and_set_preprocessor(): """Test preprocessor can be set and get."""