Skip to content

Commit

Permalink
[AIR] Execute GPU inference in a separate stage in BatchPredictor (ra…
Browse files Browse the repository at this point in the history
…y-project#26616)

Signed-off-by: Rohan138 <[email protected]>
  • Loading branch information
ericl authored and Rohan138 committed Jul 28, 2022
1 parent 823a1da commit a8f305b
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 24 deletions.
36 changes: 32 additions & 4 deletions python/ray/train/batch_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ray.air import Checkpoint
from ray.air.util.data_batch_conversion import convert_batch_type_to_pandas
from ray.data import Preprocessor
from ray.data.preprocessors import BatchMapper
from ray.train.predictor import Predictor
from ray.util.annotations import PublicAPI

Expand Down Expand Up @@ -81,8 +82,9 @@ def predict(
batch_size: int = 4096,
min_scoring_workers: int = 1,
max_scoring_workers: Optional[int] = None,
num_cpus_per_worker: int = 1,
num_gpus_per_worker: int = 0,
num_cpus_per_worker: Optional[int] = None,
num_gpus_per_worker: Optional[int] = None,
separate_gpu_stage: bool = True,
ray_remote_args: Optional[Dict[str, Any]] = None,
**predict_kwargs,
) -> ray.data.Dataset:
Expand Down Expand Up @@ -127,6 +129,9 @@ def predict(
max_scoring_workers: If set, specify the maximum number of scoring actors.
num_cpus_per_worker: Number of CPUs to allocate per scoring worker.
num_gpus_per_worker: Number of GPUs to allocate per scoring worker.
separate_gpu_stage: If using GPUs, specifies whether to execute GPU
processing in a separate stage (enabled by default). This avoids
running expensive preprocessing steps on GPU workers.
ray_remote_args: Additional resource requirements to request from
ray.
predict_kwargs: Keyword arguments passed to the predictor's
Expand All @@ -136,6 +141,16 @@ def predict(
Dataset containing scoring results.
"""
if num_gpus_per_worker is None:
num_gpus_per_worker = 0
if num_cpus_per_worker is None:
if num_gpus_per_worker > 0:
# Don't request a CPU here, to avoid unnecessary contention. The GPU
# resource request suffices for scheduling.
num_cpus_per_worker = 0
else:
num_cpus_per_worker = 1

predictor_cls = self._predictor_cls
checkpoint_ref = self._checkpoint_ref
predictor_kwargs = self._predictor_kwargs
Expand Down Expand Up @@ -177,6 +192,14 @@ def __call__(self, batch):
ray_remote_args["num_cpus"] = num_cpus_per_worker
ray_remote_args["num_gpus"] = num_gpus_per_worker

if separate_gpu_stage and num_gpus_per_worker > 0:
preprocessor = self.get_preprocessor()
if preprocessor:
# Set the in-predictor preprocessing to a no-op when using a separate
# GPU stage. Otherwise, the preprocessing will be applied twice.
override_prep = BatchMapper(lambda x: x)
data = preprocessor.transform(data)

prediction_results = data.map_batches(
ScoringWrapper,
compute=compute,
Expand All @@ -199,8 +222,9 @@ def predict_pipelined(
batch_size: int = 4096,
min_scoring_workers: int = 1,
max_scoring_workers: Optional[int] = None,
num_cpus_per_worker: int = 1,
num_gpus_per_worker: int = 0,
num_cpus_per_worker: Optional[int] = None,
num_gpus_per_worker: Optional[int] = None,
separate_gpu_stage: bool = True,
ray_remote_args: Optional[Dict[str, Any]] = None,
**predict_kwargs,
) -> ray.data.DatasetPipeline:
Expand Down Expand Up @@ -251,6 +275,9 @@ def predict_pipelined(
max_scoring_workers: If set, specify the maximum number of scoring actors.
num_cpus_per_worker: Number of CPUs to allocate per scoring worker.
num_gpus_per_worker: Number of GPUs to allocate per scoring worker.
separate_gpu_stage: If using GPUs, specifies whether to execute GPU
processing in a separate stage (enabled by default). This avoids
running expensive preprocessing steps on GPU workers.
ray_remote_args: Additional resource requirements to request from
ray.
predict_kwargs: Keyword arguments passed to the predictor's
Expand Down Expand Up @@ -279,6 +306,7 @@ def predict_pipelined(
max_scoring_workers=max_scoring_workers,
num_cpus_per_worker=num_cpus_per_worker,
num_gpus_per_worker=num_gpus_per_worker,
separate_gpu_stage=separate_gpu_stage,
ray_remote_args=ray_remote_args,
**predict_kwargs,
)
73 changes: 53 additions & 20 deletions python/ray/train/tests/test_batch_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
import ray
from ray.air.checkpoint import Checkpoint
from ray.data import Preprocessor
from ray.tests.conftest import * # noqa
from ray.train.batch_predictor import BatchPredictor
from ray.train.predictor import Predictor


class DummyPreprocessor(Preprocessor):
_is_fittable = False

def __init__(self, multiplier=2):
self.multiplier = multiplier

def transform_batch(self, df):
def _transform_pandas(self, df):
return df * self.multiplier


Expand Down Expand Up @@ -44,7 +47,7 @@ def from_checkpoint(
def _predict_pandas(self, data: pd.DataFrame, **kwargs) -> pd.DataFrame:
# Need to throw exception here instead of constructor to surface the
# exception to pytest rather than ray worker.
if self.use_gpu:
if self.use_gpu and "allow_gpu" not in kwargs:
raise ValueError("DummyPredictor does not support GPU prediction.")
else:
return data * self.factor
Expand All @@ -61,6 +64,54 @@ def from_checkpoint(cls, checkpoint: Checkpoint, **kwargs) -> "DummyPredictor":
return cls(checkpoint_data["factor"], preprocessor=preprocessor)


def test_separate_gpu_stage(shutdown_only):
ray.init(num_gpus=1)
batch_predictor = BatchPredictor.from_checkpoint(
Checkpoint.from_dict({"factor": 2.0, PREPROCESSOR_KEY: DummyPreprocessor()}),
DummyPredictor,
)
ds = batch_predictor.predict(
ray.data.range_table(10),
num_gpus_per_worker=1,
separate_gpu_stage=True,
allow_gpu=True,
)
stats = ds.stats()
assert "Stage 1 read->map_batches:" in stats, stats
assert "Stage 2 map_batches:" in stats, stats
assert ds.max("value") == 36.0, ds

ds = batch_predictor.predict(
ray.data.range_table(10),
num_gpus_per_worker=1,
separate_gpu_stage=False,
allow_gpu=True,
)
stats = ds.stats()
assert "Stage 1 read:" in stats, stats
assert "Stage 2 map_batches:" in stats, stats
assert ds.max("value") == 36.0, ds


def test_automatic_enable_gpu_from_num_gpus_per_worker(shutdown_only):
"""
Test we automatically set underlying Predictor creation use_gpu to True if
we found num_gpus_per_worker > 0 in BatchPredictor's predict() call.
"""
ray.init(num_gpus=1)

batch_predictor = BatchPredictor.from_checkpoint(
Checkpoint.from_dict({"factor": 2.0, PREPROCESSOR_KEY: DummyPreprocessor()}),
DummyPredictor,
)
test_dataset = ray.data.range_table(4)

with pytest.raises(
ValueError, match="DummyPredictor does not support GPU prediction"
):
_ = batch_predictor.predict(test_dataset, num_gpus_per_worker=1)


def test_batch_prediction():
batch_predictor = BatchPredictor.from_checkpoint(
Checkpoint.from_dict({"factor": 2.0, PREPROCESSOR_KEY: DummyPreprocessor()}),
Expand Down Expand Up @@ -164,24 +215,6 @@ def check_truth(df, all_true=False):
assert output == [True, True, True]


def test_automatic_enable_gpu_from_num_gpus_per_worker():
"""
Test we automatically set underlying Predictor creation use_gpu to True if
we found num_gpus_per_worker > 0 in BatchPredictor's predict() call.
"""

batch_predictor = BatchPredictor.from_checkpoint(
Checkpoint.from_dict({"factor": 2.0, PREPROCESSOR_KEY: DummyPreprocessor()}),
DummyPredictor,
)
test_dataset = ray.data.range(4)

with pytest.raises(
ValueError, match="DummyPredictor does not support GPU prediction"
):
_ = batch_predictor.predict(test_dataset, num_gpus_per_worker=1)


def test_get_and_set_preprocessor():
"""Test preprocessor can be set and get."""

Expand Down

0 comments on commit a8f305b

Please sign in to comment.