Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIR][Predictor] Enable numpy based predictor #28917

Merged
merged 82 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 79 commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
dd9922f
initial commit
jiaodong Sep 30, 2022
18423ac
predictor.py adding numpy and use enum type rather than string
jiaodong Sep 30, 2022
f66c438
fix tests related to keep columns
jiaodong Sep 30, 2022
2053c9b
add arrow format for comp
jiaodong Sep 30, 2022
4183d28
move model outputs to cpu and convert nump
jiaodong Sep 30, 2022
43f067b
add batch_format arg
jiaodong Sep 30, 2022
c2ea0ea
default format to pandas if batch_format is missing, and remove pyarr…
jiaodong Sep 30, 2022
1a109b3
single column selection + output handling
jiaodong Oct 1, 2022
ef0d919
fix torch image batch prediction example
jiaodong Oct 2, 2022
e98ade8
fix training and inference preprocessor consistency + incremental lea…
jiaodong Oct 2, 2022
f9d00e4
convert batch type to numpy to adapt DL with pandas preprocessor, thu…
jiaodong Oct 2, 2022
72432ce
fix torch image example preprocessor
jiaodong Oct 3, 2022
e501d76
Merge branch 'master' into dl_predictor_np
jiaodong Oct 3, 2022
1704036
fix notebook
jiaodong Oct 3, 2022
9234923
batch data type
jiaodong Oct 3, 2022
f56eacd
Merge branch 'master' into dl_predictor_np
jiaodong Oct 6, 2022
0bbc159
address comments, remove predict with batch_format and delegate to pr…
jiaodong Oct 6, 2022
194a21a
Merge branch 'master' into dl_predictor_np
jiaodong Oct 6, 2022
7d5bba6
Merge branch 'master' of https://github.com/ray-project/ray into dl_p…
jiaodong Oct 11, 2022
2ce30a1
wip
jiaodong Oct 12, 2022
f05049f
Merge branch 'master' into dl_predictor_np
jiaodong Oct 21, 2022
f992a0c
Merge branch 'dl_predictor_np' of github.com:jiaodong/ray into dl_pre…
jiaodong Oct 24, 2022
6606555
torch_predictor passed after reemoving batch_format from BatchPredictor
jiaodong Oct 24, 2022
16c9f13
fix tensorflow tests
jiaodong Oct 24, 2022
5748ca1
fix batch predictor
jiaodong Oct 25, 2022
bfcbcdf
add fallback pandas path to base predictor if _predict_numpy does not…
jiaodong Oct 25, 2022
1f5ba7a
fix some tests
jiaodong Oct 25, 2022
c70e15c
fix torch image example
jiaodong Oct 25, 2022
03dbbbc
Merge branch 'dl_predictor_np' of https://github.com/jiaodong/ray int…
jiaodong Oct 25, 2022
a3e047f
fix xgboost docstring
jiaodong Oct 25, 2022
1d5d9cd
try fix serve air_integrations by handling pandas with raw ndarray
jiaodong Oct 26, 2022
5f76d30
fix notebooks and add pandas format path to predictor.py
jiaodong Oct 26, 2022
1ec5438
Merge branch 'master' into dl_predictor_np
jiaodong Oct 26, 2022
e5ba13a
fix last notebook
jiaodong Oct 26, 2022
035e673
enhance air_integration tests
jiaodong Oct 26, 2022
7b1dfbd
refactor test_predictor tests to cover all pandas + numpy preprocesso…
jiaodong Oct 27, 2022
4aad0c5
update all batch_predictor level test combinations, remove ScoreWrapp…
jiaodong Oct 27, 2022
d3d192c
fix docs and docstring
jiaodong Oct 27, 2022
59006f6
fix test_batch_predictor test to work with python cli
jiaodong Oct 27, 2022
4a85371
add keep_col tests with preserved single column prediction for numpy …
jiaodong Oct 28, 2022
a97b3df
fix tests for keep column and single column output
jiaodong Oct 28, 2022
e4ea455
Merge branch 'master' into dl_predictor_np
jiaodong Oct 28, 2022
d0ed91d
fix notebook
jiaodong Oct 28, 2022
f0790a8
Merge branch 'master' into dl_predictor_np
jiaodong Nov 3, 2022
07c8fd8
Update python/ray/serve/air_integrations.py
jiaodong Nov 3, 2022
3140673
Update python/ray/serve/air_integrations.py
jiaodong Nov 3, 2022
4dbb7de
Update python/ray/train/tests/test_predictor.py
jiaodong Nov 3, 2022
f682d5b
Update python/ray/train/batch_predictor.py
jiaodong Nov 3, 2022
7c5fe89
Update python/ray/train/_internal/dl_predictor.py
jiaodong Nov 3, 2022
3019220
Update python/ray/train/_internal/dl_predictor.py
jiaodong Nov 3, 2022
6decdfd
Merge branch 'dl_predictor_np' of https://github.com/jiaodong/ray int…
jiaodong Nov 3, 2022
d8e729b
address comments and make predictor more self contained
jiaodong Nov 3, 2022
0efda24
fix tests
jiaodong Nov 4, 2022
74412c2
revert not needed notebook changes
jiaodong Nov 4, 2022
6f920af
change BlockFormat
jiaodong Nov 4, 2022
8cdbe82
update dataset_format
jiaodong Nov 4, 2022
10b0972
Merge branch 'master' into dl_predictor_np
jiaodong Nov 7, 2022
c90030e
simply preferred batch format at Predictor class level
jiaodong Nov 8, 2022
58fa01e
Merge branch 'master' into dl_predictor_np
jiaodong Nov 8, 2022
1f9ba1b
Merge branch 'master' into dl_predictor_np
jiaodong Nov 8, 2022
71231c8
fix test_air_integration
jiaodong Nov 8, 2022
aa9d3d0
fix lint
jiaodong Nov 8, 2022
291ffc7
address comments
jiaodong Nov 10, 2022
e4d3af4
Merge branch 'master' into dl_predictor_np
jiaodong Nov 10, 2022
a79c4cf
remove casting flags
jiaodong Nov 10, 2022
e66ef51
address comment regarding preprocessor format and predictor format de…
jiaodong Nov 11, 2022
4bee5b5
Merge branch 'dl_predictor_np' of https://github.com/jiaodong/ray int…
jiaodong Nov 11, 2022
b5cd7f7
Merge branch 'master' into dl_predictor_np
jiaodong Nov 11, 2022
f912662
tensor cast column nit
jiaodong Nov 11, 2022
26e9320
fix torch predictor doc test
jiaodong Nov 11, 2022
1e291bb
support chain of preprocessors
jiaodong Nov 11, 2022
c14ca57
fix torch predictor doctest
jiaodong Nov 11, 2022
5c8cef1
nit
jiaodong Nov 11, 2022
329cce2
fix tests
jiaodong Nov 11, 2022
1e7a6e6
fix output
jiaodong Nov 11, 2022
3a8e7ae
fix testouput docstring
jiaodong Nov 12, 2022
3df9446
predictor changes
jiaodong Nov 15, 2022
551c7ac
address comments
jiaodong Nov 15, 2022
dbc3bd8
make determine_transform_to_use private
jiaodong Nov 15, 2022
1ee1ede
Apply suggestions from code review
jiaodong Nov 15, 2022
461eb1f
lint
jiaodong Nov 15, 2022
80546a4
fix test
jiaodong Nov 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@
"\n",
"sample_images = x_test[:3]\n",
"sample_labels = y_test[:3]\n",
"preds = predictor.predict(sample_images).argmax(1)\n",
"preds = predictor.predict(sample_images)[\"predictions\"].argmax(1)\n",
"for image, pred, label in zip(sample_images, preds, sample_labels):\n",
" plt.figure(figsize=(2, 2))\n",
" plt.title(f\"Prediction = {pred}, Label = {label}\")\n",
Expand Down
14 changes: 7 additions & 7 deletions doc/source/ray-air/examples/torch_image_batch_pretrained.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pandas as pd
import numpy as np

import torch
from torchvision import transforms
from torchvision.models import resnet18

Expand All @@ -10,20 +10,20 @@
from ray.data.preprocessors import BatchMapper


def preprocess(batch: np.ndarray) -> pd.DataFrame:
def preprocess(image_batch: np.ndarray) -> np.ndarray:
"""
User Pytorch code to transform user image. Note we still use pandas as
intermediate format to hold images as shorthand of python dictionary.
User Pytorch code to transform user image with outer dimension of batch size.
"""
preprocess = transforms.Compose(
[
transforms.ToTensor(),
jiaodong marked this conversation as resolved.
Show resolved Hide resolved
transforms.Resize(256),
# Torchvision's ToTensor does not accept outer batch dimension
transforms.CenterCrop(224),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
]
)
return pd.DataFrame({"image": [preprocess(image) for image in batch]})
# Outer dimension is batch size such as (10, 256, 256, 3) -> (10, 3, 256, 256)
transposed_torch_tensor = torch.Tensor(image_batch.transpose(0, 3, 1, 2))
return preprocess(transposed_torch_tensor).numpy()
jiaodong marked this conversation as resolved.
Show resolved Hide resolved


data_url = "s3://anonymous@air-example-data-2/1G-image-data-synthetic-raw"
Expand Down
30 changes: 16 additions & 14 deletions doc/source/ray-air/examples/torch_incremental_learning.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -517,24 +517,26 @@
},
"outputs": [],
"source": [
"from ray.data.preprocessors import BatchMapper\n",
"from typing import Dict\n",
"import numpy as np\n",
"\n",
"import torch\n",
"from torchvision import transforms\n",
"\n",
"def preprocess_images(df: pd.DataFrame) -> pd.DataFrame:\n",
" \"\"\"Preprocess images by scaling each channel in the image.\"\"\"\n",
"from ray.data.preprocessors import BatchMapper\n",
"\n",
"def preprocess_images(image_batch_dict: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:\n",
" \"\"\"Preprocess images by scaling each channel in the image.\"\"\"\n",
" torchvision_transforms = transforms.Compose(\n",
" [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]\n",
" [transforms.Normalize((0.1307,), (0.3081,))]\n",
" )\n",
" # Outer dimension is batch size such as (4096, 28, 28)\n",
" image_batch_dict[\"image\"] = torchvision_transforms(\n",
" torch.Tensor(image_batch_dict[\"image\"])\n",
" ).numpy()\n",
jiaodong marked this conversation as resolved.
Show resolved Hide resolved
" return image_batch_dict\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to do the transpose somewhere?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one should be fine, input is just (4096, 28, 28), not RGB images

"\n",
" df = df.copy()\n",
" df.loc[:, \"image\"] = [\n",
" torchvision_transforms(image).numpy() for image in df[\"image\"]\n",
" ]\n",
" return df\n",
"\n",
"mnist_normalize_preprocessor = BatchMapper(fn=preprocess_images)"
"mnist_normalize_preprocessor = BatchMapper(fn=preprocess_images, batch_format=\"numpy\")"
]
},
{
Expand Down Expand Up @@ -1404,7 +1406,7 @@
" # Have to specify trainer_resources as 0 so that the example works on Colab. \n",
" scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu, trainer_resources={\"CPU\": 0}),\n",
" datasets={\"train\": train_dataset},\n",
" preprocessor=BatchMapper(fn=preprocess_images),\n",
" preprocessor=mnist_normalize_preprocessor,\n",
" resume_from_checkpoint=latest_checkpoint,\n",
" )\n",
" result = trainer.fit()\n",
Expand Down Expand Up @@ -1715,7 +1717,7 @@
" # Have to specify trainer_resources as 0 so that the example works on Colab. \n",
" scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu, trainer_resources={\"CPU\": 0}),\n",
" datasets={\"train\": combined_training_dataset},\n",
" preprocessor=BatchMapper(fn=preprocess_images),\n",
" preprocessor=mnist_normalize_preprocessor,\n",
" )\n",
"result = trainer.fit()\n",
"full_training_checkpoint = result.checkpoint"
Expand Down Expand Up @@ -1841,7 +1843,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.6"
"version": "3.8.13"
},
"vscode": {
"interpreter": {
Expand Down
12 changes: 5 additions & 7 deletions doc/source/ray-air/predictors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Ray AIR Predictors are a class that loads models from `Checkpoint` to perform in

Predictors are used by `BatchPredictor` and `PredictorDeployment` to do large-scale scoring or online inference.

Let's walk through a basic usage of the Predictor. In the below example, we create `Checkpoint` object from a model definition.
Let's walk through a basic usage of the Predictor. In the below example, we create `Checkpoint` object from a model definition.
Checkpoints can be generated from a variety of different ways -- see the :ref:`Checkpoints <air-checkpoints-doc>` user guide for more details.

The checkpoint then is used to create a framework specific Predictor (in our example, a `TensorflowPredictor`), which then can be used for inference:
Expand All @@ -46,7 +46,7 @@ Batch Prediction

Ray AIR provides a ``BatchPredictor`` utility for large-scale batch inference.

The BatchPredictor takes in a checkpoint and a predictor class and executes
The BatchPredictor takes in a checkpoint and a predictor class and executes
large-scale batch prediction on a given dataset in a parallel/distributed fashion when calling ``predict()``.

.. note::
Expand Down Expand Up @@ -117,10 +117,10 @@ Coming soon!
Lazy/Pipelined Prediction (experimental)
----------------------------------------

If you have a large dataset but not a lot of available memory, you can use the
If you have a large dataset but not a lot of available memory, you can use the
:meth:`predict_pipelined <ray.train.batch_predictor.BatchPredictor.predict_pipelined>` method.

Unlike :py:meth:`predict` which will load the entire data into memory, ``predict_pipelined`` will create a
Unlike :py:meth:`predict` which will load the entire data into memory, ``predict_pipelined`` will create a
:class:`DatasetPipeline` object, which will *lazily* load the data and perform inference on a smaller batch of data at a time.

The lazy loading of the data will allow you to operate on datasets much greater than your available memory.
Expand All @@ -145,6 +145,4 @@ To implement a new Predictor for your particular framework, you should subclass

1. ``_predict_pandas``: Given a pandas.DataFrame input, return a pandas.DataFrame containing predictions.
2. ``from_checkpoint``: Logic for creating a Predictor from an :ref:`AIR Checkpoint <air-checkpoint-ref>`.
3. Optionally ``_predict_arrow`` for better performance when working with tensor data to avoid extra copies from Pandas conversions.


3. Optionally ``_predict_numpy`` for better performance when working with tensor data to avoid extra copies from Pandas conversions.
4 changes: 3 additions & 1 deletion doc/source/train/doc_code/xgboost_train_predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
# __train_predict_end__

# __batch_predict_start__
import pandas as pd
from ray.train.batch_predictor import BatchPredictor

batch_predictor = BatchPredictor.from_checkpoint(result.checkpoint, XGBoostPredictor)
predict_dataset = ray.data.from_pandas(pd.DataFrame({"x": np.arange(32)}))
predictions = batch_predictor.predict(
data=ray.data.from_items([{"x": x} for x in range(32)]),
data=predict_dataset,
batch_size=8,
min_scoring_workers=2,
)
Expand Down
Empty file added python/ray/air/data
Empty file.
5 changes: 1 addition & 4 deletions python/ray/air/data_batch_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,5 @@
if TYPE_CHECKING:
import numpy
import pandas
import pyarrow

DataBatchType = Union[
"numpy.ndarray", "pandas.DataFrame", "pyarrow.Table", Dict[str, "numpy.ndarray"]
]
DataBatchType = Union["numpy.ndarray", "pandas.DataFrame", Dict[str, "numpy.ndarray"]]
22 changes: 11 additions & 11 deletions python/ray/air/tests/test_data_batch_conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
convert_pandas_to_batch_type,
_convert_batch_type_to_numpy,
)
from ray.air.util.data_batch_conversion import DataType
from ray.air.util.data_batch_conversion import BatchFormat
from ray.air.util.tensor_extensions.pandas import TensorArray
from ray.air.util.tensor_extensions.arrow import ArrowTensorArray

Expand All @@ -22,7 +22,7 @@ def test_pandas_pandas():
actual_output = convert_batch_type_to_pandas(input_data)
pd.testing.assert_frame_equal(expected_output, actual_output)

actual_output = convert_pandas_to_batch_type(actual_output, type=DataType.PANDAS)
actual_output = convert_pandas_to_batch_type(actual_output, type=BatchFormat.PANDAS)
pd.testing.assert_frame_equal(actual_output, input_data)


Expand Down Expand Up @@ -144,7 +144,7 @@ def test_pandas_multi_dim_pandas(cast_tensor_columns, use_tensor_extension_for_i
pd.testing.assert_frame_equal(expected_output, actual_output)

actual_output = convert_pandas_to_batch_type(
actual_output, type=DataType.PANDAS, cast_tensor_columns=cast_tensor_columns
actual_output, type=BatchFormat.PANDAS, cast_tensor_columns=cast_tensor_columns
)
pd.testing.assert_frame_equal(actual_output, input_data)

Expand All @@ -157,7 +157,7 @@ def test_numpy_pandas(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

output_array = convert_pandas_to_batch_type(
actual_output, type=DataType.NUMPY, cast_tensor_columns=cast_tensor_columns
actual_output, type=BatchFormat.NUMPY, cast_tensor_columns=cast_tensor_columns
)
np.testing.assert_equal(output_array, input_data)

Expand All @@ -170,7 +170,7 @@ def test_numpy_multi_dim_pandas(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

output_array = convert_pandas_to_batch_type(
actual_output, type=DataType.NUMPY, cast_tensor_columns=cast_tensor_columns
actual_output, type=BatchFormat.NUMPY, cast_tensor_columns=cast_tensor_columns
)
np.testing.assert_array_equal(np.array(list(output_array)), input_data)

Expand All @@ -182,7 +182,7 @@ def test_numpy_object_pandas():
pd.testing.assert_frame_equal(expected_output, actual_output)

np.testing.assert_array_equal(
convert_pandas_to_batch_type(actual_output, type=DataType.NUMPY), input_data
convert_pandas_to_batch_type(actual_output, type=BatchFormat.NUMPY), input_data
)


Expand Down Expand Up @@ -211,7 +211,7 @@ def test_dict_pandas(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

output_array = convert_pandas_to_batch_type(
actual_output, type=DataType.NUMPY, cast_tensor_columns=cast_tensor_columns
actual_output, type=BatchFormat.NUMPY, cast_tensor_columns=cast_tensor_columns
)
np.testing.assert_array_equal(output_array, input_data["x"])

Expand All @@ -225,7 +225,7 @@ def test_dict_multi_dim_to_pandas(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

output_array = convert_pandas_to_batch_type(
actual_output, type=DataType.NUMPY, cast_tensor_columns=cast_tensor_columns
actual_output, type=BatchFormat.NUMPY, cast_tensor_columns=cast_tensor_columns
)
np.testing.assert_array_equal(np.array(list(output_array)), input_data["x"])

Expand All @@ -238,7 +238,7 @@ def test_dict_pandas_multi_column(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

output_dict = convert_pandas_to_batch_type(
actual_output, type=DataType.NUMPY, cast_tensor_columns=cast_tensor_columns
actual_output, type=BatchFormat.NUMPY, cast_tensor_columns=cast_tensor_columns
)
for k, v in output_dict.items():
np.testing.assert_array_equal(v, array_dict[k])
Expand All @@ -251,7 +251,7 @@ def test_arrow_pandas():
actual_output = convert_batch_type_to_pandas(input_data)
pd.testing.assert_frame_equal(expected_output, actual_output)

assert convert_pandas_to_batch_type(actual_output, type=DataType.ARROW).equals(
assert convert_pandas_to_batch_type(actual_output, type=BatchFormat.ARROW).equals(
input_data
)

Expand All @@ -270,7 +270,7 @@ def test_arrow_tensor_pandas(cast_tensor_columns):
pd.testing.assert_frame_equal(expected_output, actual_output)

arrow_output = convert_pandas_to_batch_type(
actual_output, type=DataType.ARROW, cast_tensor_columns=cast_tensor_columns
actual_output, type=BatchFormat.ARROW, cast_tensor_columns=cast_tensor_columns
)
assert arrow_output.equals(input_data)

Expand Down
34 changes: 22 additions & 12 deletions python/ray/air/util/data_batch_conversion.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from enum import Enum, auto
from enum import Enum
from typing import Dict, Union, List

import numpy as np
Expand All @@ -16,10 +16,20 @@


@DeveloperAPI
class DataType(Enum):
PANDAS = auto()
ARROW = auto()
NUMPY = auto() # Either a single numpy array or a Dict of numpy arrays.
class BatchFormat(str, Enum):
PANDAS = "pandas"
# TODO: Remove once Arrow is deprecated as user facing batch format
ARROW = "arrow"
NUMPY = "numpy" # Either a single numpy array or a Dict of numpy arrays.


@DeveloperAPI
class BlockFormat(str, Enum):
"""Internal Dataset block format enum."""

PANDAS = "pandas"
ARROW = "arrow"
SIMPLE = "simple"


@DeveloperAPI
Expand Down Expand Up @@ -65,14 +75,14 @@ def convert_batch_type_to_pandas(
@DeveloperAPI
def convert_pandas_to_batch_type(
data: pd.DataFrame,
type: DataType,
type: BatchFormat,
cast_tensor_columns: bool = False,
) -> DataBatchType:
"""Convert the provided Pandas dataframe to the provided ``type``.

Args:
data: A Pandas DataFrame
type: The specific ``DataBatchType`` to convert to.
type: The specific ``BatchFormat`` to convert to.
cast_tensor_columns: Whether tensor columns should be cast to our tensor
extension type.

Expand All @@ -81,10 +91,10 @@ def convert_pandas_to_batch_type(
"""
if cast_tensor_columns:
data = _cast_ndarray_columns_to_tensor_extension(data)
if type == DataType.PANDAS:
if type == BatchFormat.PANDAS:
return data

elif type == DataType.NUMPY:
elif type == BatchFormat.NUMPY:
if len(data.columns) == 1:
# If just a single column, return as a single numpy array.
return data.iloc[:, 0].to_numpy()
Expand All @@ -95,7 +105,7 @@ def convert_pandas_to_batch_type(
output_dict[column] = data[column].to_numpy()
return output_dict

elif type == DataType.ARROW:
elif type == BatchFormat.ARROW:
if not pyarrow:
raise ValueError(
"Attempted to convert data to Pyarrow Table but Pyarrow "
Expand All @@ -106,7 +116,7 @@ def convert_pandas_to_batch_type(

else:
raise ValueError(
f"Received type {type}, but expected it to be one of {DataType}"
f"Received type {type}, but expected it to be one of {DataBatchType}"
)


Expand Down Expand Up @@ -164,7 +174,7 @@ def _convert_batch_type_to_numpy(
output_dict[col_name] = col.to_numpy(zero_copy_only=False)
return output_dict
elif isinstance(data, pd.DataFrame):
return convert_pandas_to_batch_type(data, DataType.NUMPY)
return convert_pandas_to_batch_type(data, BatchFormat.NUMPY)
else:
raise ValueError(
f"Received data of type: {type(data)}, but expected it to be one "
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/fast_repartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def fast_repartition(blocks, num_blocks):
)
# Compute the (n-1) indices needed for an equal split of the data.
count = wrapped_ds.count()
dataset_format = wrapped_ds._dataset_format()
dataset_format = wrapped_ds.dataset_format()
indices = []
cur_idx = 0
for _ in range(num_blocks - 1):
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
def _validate_key_fn(ds: "Dataset", key: KeyFn) -> None:
"""Check the key function is valid on the given dataset."""
try:
fmt = ds._dataset_format()
fmt = ds.dataset_format()
except ValueError:
# Dataset is empty/cleared, validation not possible.
return
Expand Down
Loading