Skip to content

Commit

Permalink
Revert "[AIR][Predictor] Enable numpy based predictor (#28917)"
Browse files Browse the repository at this point in the history
This reverts commit 326d84f.
  • Loading branch information
fishbone authored Nov 16, 2022
1 parent 693b9ce commit f400b33
Show file tree
Hide file tree
Showing 38 changed files with 273 additions and 799 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@
"\n",
"sample_images = x_test[:3]\n",
"sample_labels = y_test[:3]\n",
"preds = predictor.predict(sample_images)[\"predictions\"].argmax(1)\n",
"preds = predictor.predict(sample_images).argmax(1)\n",
"for image, pred, label in zip(sample_images, preds, sample_labels):\n",
" plt.figure(figsize=(2, 2))\n",
" plt.title(f\"Prediction = {pred}, Label = {label}\")\n",
Expand Down
14 changes: 7 additions & 7 deletions doc/source/ray-air/examples/torch_image_batch_pretrained.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pandas as pd
import numpy as np

import torch
from torchvision import transforms
from torchvision.models import resnet18

Expand All @@ -10,20 +10,20 @@
from ray.data.preprocessors import BatchMapper


def preprocess(image_batch: np.ndarray) -> np.ndarray:
def preprocess(batch: np.ndarray) -> pd.DataFrame:
"""
User Pytorch code to transform user image with outer dimension of batch size.
User Pytorch code to transform user image. Note we still use pandas as
intermediate format to hold images as shorthand of python dictionary.
"""
preprocess = transforms.Compose(
[
# Torchvision's ToTensor does not accept outer batch dimension
transforms.ToTensor(),
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
]
)
# Outer dimension is batch size such as (10, 256, 256, 3) -> (10, 3, 256, 256)
transposed_torch_tensor = torch.Tensor(image_batch.transpose(0, 3, 1, 2))
return preprocess(transposed_torch_tensor).numpy()
return pd.DataFrame({"image": [preprocess(image) for image in batch]})


data_url = "s3://anonymous@air-example-data-2/1G-image-data-synthetic-raw"
Expand Down
30 changes: 14 additions & 16 deletions doc/source/ray-air/examples/torch_incremental_learning.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -517,26 +517,24 @@
},
"outputs": [],
"source": [
"from typing import Dict\n",
"import numpy as np\n",
"from ray.data.preprocessors import BatchMapper\n",
"\n",
"import torch\n",
"from torchvision import transforms\n",
"\n",
"from ray.data.preprocessors import BatchMapper\n",
"\n",
"def preprocess_images(image_batch_dict: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:\n",
"def preprocess_images(df: pd.DataFrame) -> pd.DataFrame:\n",
" \"\"\"Preprocess images by scaling each channel in the image.\"\"\"\n",
"\n",
" torchvision_transforms = transforms.Compose(\n",
" [transforms.Normalize((0.1307,), (0.3081,))]\n",
" [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]\n",
" )\n",
" # Outer dimension is batch size such as (4096, 28, 28)\n",
" image_batch_dict[\"image\"] = torchvision_transforms(\n",
" torch.Tensor(image_batch_dict[\"image\"])\n",
" ).numpy()\n",
" return image_batch_dict\n",
"\n",
"mnist_normalize_preprocessor = BatchMapper(fn=preprocess_images, batch_format=\"numpy\")"
" df = df.copy()\n",
" df.loc[:, \"image\"] = [\n",
" torchvision_transforms(image).numpy() for image in df[\"image\"]\n",
" ]\n",
" return df\n",
"\n",
"mnist_normalize_preprocessor = BatchMapper(fn=preprocess_images)"
]
},
{
Expand Down Expand Up @@ -1406,7 +1404,7 @@
" # Have to specify trainer_resources as 0 so that the example works on Colab. \n",
" scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu, trainer_resources={\"CPU\": 0}),\n",
" datasets={\"train\": train_dataset},\n",
" preprocessor=mnist_normalize_preprocessor,\n",
" preprocessor=BatchMapper(fn=preprocess_images),\n",
" resume_from_checkpoint=latest_checkpoint,\n",
" )\n",
" result = trainer.fit()\n",
Expand Down Expand Up @@ -1717,7 +1715,7 @@
" # Have to specify trainer_resources as 0 so that the example works on Colab. \n",
" scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu, trainer_resources={\"CPU\": 0}),\n",
" datasets={\"train\": combined_training_dataset},\n",
" preprocessor=mnist_normalize_preprocessor,\n",
" preprocessor=BatchMapper(fn=preprocess_images),\n",
" )\n",
"result = trainer.fit()\n",
"full_training_checkpoint = result.checkpoint"
Expand Down Expand Up @@ -1843,7 +1841,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.13"
"version": "3.10.6"
},
"vscode": {
"interpreter": {
Expand Down
12 changes: 7 additions & 5 deletions doc/source/ray-air/predictors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Ray AIR Predictors are a class that loads models from `Checkpoint` to perform in

Predictors are used by `BatchPredictor` and `PredictorDeployment` to do large-scale scoring or online inference.

Let's walk through a basic usage of the Predictor. In the below example, we create `Checkpoint` object from a model definition.
Let's walk through a basic usage of the Predictor. In the below example, we create `Checkpoint` object from a model definition.
Checkpoints can be generated from a variety of different ways -- see the :ref:`Checkpoints <air-checkpoints-doc>` user guide for more details.

The checkpoint then is used to create a framework specific Predictor (in our example, a `TensorflowPredictor`), which then can be used for inference:
Expand All @@ -46,7 +46,7 @@ Batch Prediction

Ray AIR provides a ``BatchPredictor`` utility for large-scale batch inference.

The BatchPredictor takes in a checkpoint and a predictor class and executes
The BatchPredictor takes in a checkpoint and a predictor class and executes
large-scale batch prediction on a given dataset in a parallel/distributed fashion when calling ``predict()``.

.. note::
Expand Down Expand Up @@ -117,10 +117,10 @@ Coming soon!
Lazy/Pipelined Prediction (experimental)
----------------------------------------

If you have a large dataset but not a lot of available memory, you can use the
If you have a large dataset but not a lot of available memory, you can use the
:meth:`predict_pipelined <ray.train.batch_predictor.BatchPredictor.predict_pipelined>` method.

Unlike :py:meth:`predict` which will load the entire data into memory, ``predict_pipelined`` will create a
Unlike :py:meth:`predict` which will load the entire data into memory, ``predict_pipelined`` will create a
:class:`DatasetPipeline` object, which will *lazily* load the data and perform inference on a smaller batch of data at a time.

The lazy loading of the data will allow you to operate on datasets much greater than your available memory.
Expand All @@ -145,4 +145,6 @@ To implement a new Predictor for your particular framework, you should subclass

1. ``_predict_pandas``: Given a pandas.DataFrame input, return a pandas.DataFrame containing predictions.
2. ``from_checkpoint``: Logic for creating a Predictor from an :ref:`AIR Checkpoint <air-checkpoint-ref>`.
3. Optionally ``_predict_numpy`` for better performance when working with tensor data to avoid extra copies from Pandas conversions.
3. Optionally ``_predict_arrow`` for better performance when working with tensor data to avoid extra copies from Pandas conversions.


4 changes: 1 addition & 3 deletions doc/source/train/doc_code/xgboost_train_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
# __train_predict_end__

# __batch_predict_start__
import pandas as pd
from ray.train.batch_predictor import BatchPredictor

batch_predictor = BatchPredictor.from_checkpoint(result.checkpoint, XGBoostPredictor)
predict_dataset = ray.data.from_pandas(pd.DataFrame({"x": np.arange(32)}))
predictions = batch_predictor.predict(
data=predict_dataset,
data=ray.data.from_items([{"x": x} for x in range(32)]),
batch_size=8,
min_scoring_workers=2,
)
Expand Down
Empty file removed python/ray/air/data
Empty file.
5 changes: 4 additions & 1 deletion python/ray/air/data_batch_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
if TYPE_CHECKING:
import numpy
import pandas
import pyarrow

DataBatchType = Union["numpy.ndarray", "pandas.DataFrame", Dict[str, "numpy.ndarray"]]
DataBatchType = Union[
"numpy.ndarray", "pandas.DataFrame", "pyarrow.Table", Dict[str, "numpy.ndarray"]
]
22 changes: 11 additions & 11 deletions python/ray/air/tests/test_data_batch_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
_cast_ndarray_columns_to_tensor_extension,
_cast_tensor_columns_to_ndarrays,
)
from ray.air.util.data_batch_conversion import BatchFormat
from ray.air.util.data_batch_conversion import DataType
from ray.air.util.tensor_extensions.pandas import TensorArray
from ray.air.util.tensor_extensions.arrow import ArrowTensorArray

Expand All @@ -25,7 +25,7 @@ def test_pandas_pandas():
actual_output = convert_batch_type_to_pandas(input_data)
pd.testing.assert_frame_equal(expected_output, actual_output)

actual_output = convert_pandas_to_batch_type(actual_output, type=BatchFormat.PANDAS)
actual_output = convert_pandas_to_batch_type(actual_output, type=DataType.PANDAS)
pd.testing.assert_frame_equal(actual_output, input_data)


Expand Down Expand Up @@ -147,7 +147,7 @@ def test_pandas_multi_dim_pandas(cast_tensor_columns, use_tensor_extension_for_i
pd.testing.assert_frame_equal(expected_output, actual_output)

actual_output = convert_pandas_to_batch_type(
actual_output, type=BatchFormat.PANDAS, cast_tensor_columns=cast_tensor_columns
actual_output, type=DataType.PANDAS, cast_tensor_columns=cast_tensor_columns
)
pd.testing.assert_frame_equal(actual_output, input_data)

Expand All @@ -173,7 +173,7 @@ def test_numpy_pandas(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

output_array = convert_pandas_to_batch_type(
actual_output, type=BatchFormat.NUMPY, cast_tensor_columns=cast_tensor_columns
actual_output, type=DataType.NUMPY, cast_tensor_columns=cast_tensor_columns
)
np.testing.assert_equal(output_array, input_data)

Expand All @@ -186,7 +186,7 @@ def test_numpy_multi_dim_pandas(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

output_array = convert_pandas_to_batch_type(
actual_output, type=BatchFormat.NUMPY, cast_tensor_columns=cast_tensor_columns
actual_output, type=DataType.NUMPY, cast_tensor_columns=cast_tensor_columns
)
np.testing.assert_array_equal(np.array(list(output_array)), input_data)

Expand All @@ -198,7 +198,7 @@ def test_numpy_object_pandas():
pd.testing.assert_frame_equal(expected_output, actual_output)

np.testing.assert_array_equal(
convert_pandas_to_batch_type(actual_output, type=BatchFormat.NUMPY), input_data
convert_pandas_to_batch_type(actual_output, type=DataType.NUMPY), input_data
)


Expand Down Expand Up @@ -227,7 +227,7 @@ def test_dict_pandas(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

output_array = convert_pandas_to_batch_type(
actual_output, type=BatchFormat.NUMPY, cast_tensor_columns=cast_tensor_columns
actual_output, type=DataType.NUMPY, cast_tensor_columns=cast_tensor_columns
)
np.testing.assert_array_equal(output_array, input_data["x"])

Expand All @@ -241,7 +241,7 @@ def test_dict_multi_dim_to_pandas(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

output_array = convert_pandas_to_batch_type(
actual_output, type=BatchFormat.NUMPY, cast_tensor_columns=cast_tensor_columns
actual_output, type=DataType.NUMPY, cast_tensor_columns=cast_tensor_columns
)
np.testing.assert_array_equal(np.array(list(output_array)), input_data["x"])

Expand All @@ -254,7 +254,7 @@ def test_dict_pandas_multi_column(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

output_dict = convert_pandas_to_batch_type(
actual_output, type=BatchFormat.NUMPY, cast_tensor_columns=cast_tensor_columns
actual_output, type=DataType.NUMPY, cast_tensor_columns=cast_tensor_columns
)
for k, v in output_dict.items():
np.testing.assert_array_equal(v, array_dict[k])
Expand All @@ -267,7 +267,7 @@ def test_arrow_pandas():
actual_output = convert_batch_type_to_pandas(input_data)
pd.testing.assert_frame_equal(expected_output, actual_output)

assert convert_pandas_to_batch_type(actual_output, type=BatchFormat.ARROW).equals(
assert convert_pandas_to_batch_type(actual_output, type=DataType.ARROW).equals(
input_data
)

Expand All @@ -286,7 +286,7 @@ def test_arrow_tensor_pandas(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

arrow_output = convert_pandas_to_batch_type(
actual_output, type=BatchFormat.ARROW, cast_tensor_columns=cast_tensor_columns
actual_output, type=DataType.ARROW, cast_tensor_columns=cast_tensor_columns
)
assert arrow_output.equals(input_data)

Expand Down
34 changes: 12 additions & 22 deletions python/ray/air/util/data_batch_conversion.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from enum import Enum
from enum import Enum, auto
from typing import Dict, Union, List
import warnings

Expand All @@ -17,20 +17,10 @@


@DeveloperAPI
class BatchFormat(str, Enum):
PANDAS = "pandas"
# TODO: Remove once Arrow is deprecated as user facing batch format
ARROW = "arrow"
NUMPY = "numpy" # Either a single numpy array or a Dict of numpy arrays.


@DeveloperAPI
class BlockFormat(str, Enum):
"""Internal Dataset block format enum."""

PANDAS = "pandas"
ARROW = "arrow"
SIMPLE = "simple"
class DataType(Enum):
PANDAS = auto()
ARROW = auto()
NUMPY = auto() # Either a single numpy array or a Dict of numpy arrays.


@DeveloperAPI
Expand Down Expand Up @@ -76,14 +66,14 @@ def convert_batch_type_to_pandas(
@DeveloperAPI
def convert_pandas_to_batch_type(
data: pd.DataFrame,
type: BatchFormat,
type: DataType,
cast_tensor_columns: bool = False,
) -> DataBatchType:
"""Convert the provided Pandas dataframe to the provided ``type``.
Args:
data: A Pandas DataFrame
type: The specific ``BatchFormat`` to convert to.
type: The specific ``DataBatchType`` to convert to.
cast_tensor_columns: Whether tensor columns should be cast to our tensor
extension type.
Expand All @@ -92,10 +82,10 @@ def convert_pandas_to_batch_type(
"""
if cast_tensor_columns:
data = _cast_ndarray_columns_to_tensor_extension(data)
if type == BatchFormat.PANDAS:
if type == DataType.PANDAS:
return data

elif type == BatchFormat.NUMPY:
elif type == DataType.NUMPY:
if len(data.columns) == 1:
# If just a single column, return as a single numpy array.
return data.iloc[:, 0].to_numpy()
Expand All @@ -106,7 +96,7 @@ def convert_pandas_to_batch_type(
output_dict[column] = data[column].to_numpy()
return output_dict

elif type == BatchFormat.ARROW:
elif type == DataType.ARROW:
if not pyarrow:
raise ValueError(
"Attempted to convert data to Pyarrow Table but Pyarrow "
Expand All @@ -117,7 +107,7 @@ def convert_pandas_to_batch_type(

else:
raise ValueError(
f"Received type {type}, but expected it to be one of {DataBatchType}"
f"Received type {type}, but expected it to be one of {DataType}"
)


Expand Down Expand Up @@ -175,7 +165,7 @@ def _convert_batch_type_to_numpy(
output_dict[col_name] = col.to_numpy(zero_copy_only=False)
return output_dict
elif isinstance(data, pd.DataFrame):
return convert_pandas_to_batch_type(data, BatchFormat.NUMPY)
return convert_pandas_to_batch_type(data, DataType.NUMPY)
else:
raise ValueError(
f"Received data of type: {type(data)}, but expected it to be one "
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/fast_repartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def fast_repartition(blocks, num_blocks):
)
# Compute the (n-1) indices needed for an equal split of the data.
count = wrapped_ds.count()
dataset_format = wrapped_ds.dataset_format()
dataset_format = wrapped_ds._dataset_format()
indices = []
cur_idx = 0
for _ in range(num_blocks - 1):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
def _validate_key_fn(ds: "Dataset", key: KeyFn) -> None:
"""Check the key function is valid on the given dataset."""
try:
fmt = ds.dataset_format()
fmt = ds._dataset_format()
except ValueError:
# Dataset is empty/cleared, validation not possible.
return
Expand Down
Loading

0 comments on commit f400b33

Please sign in to comment.