Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIR] Introduce better scoring API for BatchPredictor #26451

Merged
merged 10 commits into from
Jul 14, 2022
46 changes: 37 additions & 9 deletions python/ray/train/batch_predictor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import inspect
from typing import Any, Dict, Optional, Type, Union
from typing import Any, Dict, Optional, List, Type, Union

import ray
from ray.air import Checkpoint
Expand Down Expand Up @@ -47,6 +47,8 @@ def predict(
self,
data: Union[ray.data.Dataset, ray.data.DatasetPipeline],
*,
feature_columns: Optional[List[str]] = None,
keep_columns: Optional[List[str]] = None,
batch_size: int = 4096,
min_scoring_workers: int = 1,
max_scoring_workers: Optional[int] = None,
Expand All @@ -63,24 +65,40 @@ def predict(
>>> from ray.air import Checkpoint
>>> from ray.train.predictor import Predictor
>>> from ray.train.batch_predictor import BatchPredictor
>>> # Create a dummy predictor that always returns `42` for each input.
>>> # Create a dummy predictor that returns identity as the predictions.
>>> class DummyPredictor(Predictor):
... @classmethod
... def from_checkpoint(cls, checkpoint, **kwargs):
... return DummyPredictor()
... def predict(self, data, **kwargs):
... return pd.DataFrame({"a": [42] * len(data)})
... def _predict_pandas(self, data_df, **kwargs):
... return data_df
>>> # Create a batch predictor for this dummy predictor.
>>> batch_pred = BatchPredictor( # doctest: +SKIP
... Checkpoint.from_dict({"x": 0}), DummyPredictor)
>>> # Create a dummy dataset.
>>> ds = ray.data.range_tensor(1000, parallelism=4) # doctest: +SKIP
>>> ds = ray.data.from_pandas(pd.DataFrame({ # doctest: +SKIP
... "feature_1": [1, 2, 3], "label": [1, 2, 3]}))
>>> # Execute batch prediction using this predictor.
>>> print(batch_pred.predict(ds)) # doctest: +SKIP
Dataset(num_blocks=4, num_rows=1000, schema={a: int64})
>>> predictions = batch_pred.predict(ds, # doctest: +SKIP
... feature_columns=["feature_1"], keep_columns=["label"])
>>> print(predictions)
Dataset(num_blocks=1, num_rows=3, schema={a: int64, label: int64})
>>> # Calculate final accuracy.
>>> def calculate_accuracy(df):
... return pd.DataFrame({"correct": df["predictions"] == df["label"]})
>>> correct = predictions.map_batches(calculate_accuracy)
>>> print("Final accuracy:", correct.sum(on="correct") / correct.count())
Final accuracy: 1.0000

Args:
data: Ray dataset or pipeline to run batch prediction on.
feature_columns: List of columns in data to use for prediction. Columns not
specified will be dropped from `data` before being passed to the
predictor. If None, use all columns.
keep_columns: List of columns in `data` to include in the prediction result.
This is useful for calculating final accuracies/metrics on the result
dataset. If None, the columns in the output dataset will contain just
the prediction results.
batch_size: Split dataset into batches of this size for prediction.
min_scoring_workers: Minimum number of scoring actors.
max_scoring_workers: If set, specify the maximum number of scoring actors.
Expand Down Expand Up @@ -114,7 +132,15 @@ def __init__(self):
)

def __call__(self, batch):
prediction_output = self.predictor.predict(batch, **predict_kwargs)
if feature_columns:
prediction_batch = batch[feature_columns]
else:
prediction_batch = batch
prediction_output = self.predictor.predict(
prediction_batch, **predict_kwargs
)
if keep_columns:
prediction_output[keep_columns] = batch[keep_columns]
return convert_batch_type_to_pandas(prediction_output)

compute = ray.data.ActorPoolStrategy(
Expand All @@ -125,14 +151,16 @@ def __call__(self, batch):
ray_remote_args["num_cpus"] = num_cpus_per_worker
ray_remote_args["num_gpus"] = num_gpus_per_worker

return data.map_batches(
prediction_results = data.map_batches(
ScoringWrapper,
compute=compute,
batch_format="pandas",
batch_size=batch_size,
**ray_remote_args,
)

return prediction_results

def predict_pipelined(
self,
data: ray.data.Dataset,
Expand Down
31 changes: 31 additions & 0 deletions python/ray/train/tests/test_batch_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,37 @@ def test_batch_prediction_fs():
)


def test_batch_prediction_feature_cols():
batch_predictor = BatchPredictor.from_checkpoint(
Checkpoint.from_dict({"factor": 2.0}), DummyPredictor
)

test_dataset = ray.data.from_pandas(pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}))

assert batch_predictor.predict(
test_dataset, feature_columns=["a"]
).to_pandas().to_numpy().squeeze().tolist() == [4.0, 8.0, 12.0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if dummy predictor returns data * self.factor why would [1, 2, 3] maps to a factor of 4 here o.0 ??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a preprocessor which multiplies by 2 again



def test_batch_prediction_keep_cols():
batch_predictor = BatchPredictor.from_checkpoint(
Checkpoint.from_dict({"factor": 2.0}), DummyPredictor
)

test_dataset = ray.data.from_pandas(
pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]})
)

output_df = batch_predictor.predict(
test_dataset, feature_columns=["a"], keep_columns=["b"]
).to_pandas()

assert set(output_df.columns) == {"a", "b"}

assert output_df["a"].tolist() == [4.0, 8.0, 12.0]
assert output_df["b"].tolist() == [4, 5, 6]


amogkam marked this conversation as resolved.
Show resolved Hide resolved
def test_automatic_enable_gpu_from_num_gpus_per_worker():
"""
Test we automatically set underlying Predictor creation use_gpu to True if
Expand Down