Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIR] Add experimental read_images #28256

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
5e50b46
Add experimental `read_images`
bveeramani Sep 2, 2022
675ca6c
Merge branch 'master' into bveeramani/read-images
bveeramani Sep 6, 2022
b8d3974
Mark as experimental
bveeramani Sep 6, 2022
4f1d5d7
Rename `PathPartitionScheme` as `Partitioning`
bveeramani Sep 9, 2022
9afc041
Update input_output.rst
bveeramani Sep 9, 2022
d6b2667
Update partitioning.py
bveeramani Sep 9, 2022
517c390
Update partitioning.py
bveeramani Sep 9, 2022
d7a2ae3
Add CSV tests
bveeramani Sep 9, 2022
9416d3c
Merge remote-tracking branch 'upstream/master' into bveeramani/partition
bveeramani Sep 9, 2022
e9a9c5c
Merge remote-tracking branch 'upstream/master' into bveeramani/partition
bveeramani Sep 9, 2022
644878f
Support `None` field name
bveeramani Sep 9, 2022
9c65eb9
Update test_partitioning.py
bveeramani Sep 9, 2022
7372987
Merge branch 'bveeramani/dir-partitioning' into bveeramani/partition
bveeramani Sep 9, 2022
6980079
Merge stuff
bveeramani Sep 9, 2022
2253c47
Move code to `FileBasedDatasource`
bveeramani Sep 9, 2022
d34acc9
Delete tmp.csv
bveeramani Sep 9, 2022
0cfeb58
Merge remote-tracking branch 'upstream/master' into bveeramani/partition
bveeramani Sep 15, 2022
38ba956
Add files
bveeramani Sep 15, 2022
308bc68
Appease lint
bveeramani Sep 15, 2022
a8432e4
Update csv_datasource.py
bveeramani Sep 15, 2022
b5657a8
Delete test_csv_partitioning.py
bveeramani Sep 15, 2022
f96a498
Update file_based_datasource.py
bveeramani Sep 15, 2022
44ec745
Rename
bveeramani Sep 15, 2022
00aac7d
Make changes
bveeramani Sep 15, 2022
a2f2ab0
Appease lint
bveeramani Sep 15, 2022
3fd0aac
Update read_api.py
bveeramani Sep 15, 2022
e0cb06a
Add Numpy
bveeramani Sep 15, 2022
4f08b73
Update files
bveeramani Sep 15, 2022
a839514
Update read_api.py
bveeramani Sep 16, 2022
fc087f1
Update files
bveeramani Sep 16, 2022
bca3925
Merge remote-tracking branch 'upstream/master' into bveeramani/read-i…
bveeramani Sep 16, 2022
5f7ea9f
Merge branch 'bveeramani/partition' into bveeramani/read-images
bveeramani Sep 16, 2022
34b016f
Update read_api.py
bveeramani Sep 19, 2022
e4eb840
Update error messages
bveeramani Sep 19, 2022
3f1c361
Temp
bveeramani Sep 19, 2022
9924029
Merge branch 'bveeramani/partition' into bveeramani/read-images
bveeramani Sep 19, 2022
5d7b7fe
Update files
bveeramani Sep 19, 2022
e4a2cb9
Bug fix and lint
bveeramani Sep 19, 2022
0715fc8
Update files
bveeramani Sep 19, 2022
d7fccfa
Appease lint and fix install
bveeramani Sep 19, 2022
7f88436
Merge branch 'bveeramani/partition' into bveeramani/read-images
bveeramani Sep 19, 2022
edf1b9f
Fix parameter
bveeramani Sep 19, 2022
578edc2
Update creating-datasets.rst
bveeramani Sep 19, 2022
249bafc
Fix test
bveeramani Sep 20, 2022
27d9a59
Address review comments
bveeramani Sep 23, 2022
c993f2d
Update test_dataset_formats.py
bveeramani Sep 23, 2022
65dc78f
Merge branch 'master' into bveeramani/partition
bveeramani Sep 23, 2022
92d6af5
Update test_dataset_formats.py
bveeramani Sep 23, 2022
8dc0501
Update test_dataset_formats.py
bveeramani Sep 23, 2022
343c995
Merge branch 'master' into bveeramani/partition
bveeramani Sep 26, 2022
29ed734
Update test_dataset_formats.py
bveeramani Sep 26, 2022
0ef5585
Update python/ray/data/datasource/text_datasource.py
bveeramani Sep 28, 2022
2fb3451
Update python/ray/data/tests/test_dataset_formats.py
bveeramani Sep 28, 2022
baf096e
Address review comments
bveeramani Sep 28, 2022
a3d5729
Update test_partitioning.py
bveeramani Sep 28, 2022
ef2e79e
Address review comments
bveeramani Sep 28, 2022
fbf2bb1
Merge remote-tracking branch 'upstream/master' into bveeramani/partition
bveeramani Sep 28, 2022
01be922
Merge branch 'master' into bveeramani/read-images
bveeramani Sep 29, 2022
6f6855d
Update test_dataset_image.py
bveeramani Sep 29, 2022
c3cdf7b
Merge branch 'master' into bveeramani/partition
bveeramani Sep 29, 2022
5eaa52b
Tests
bveeramani Sep 29, 2022
0604d3a
Delete x.npy
bveeramani Sep 29, 2022
50f99ca
Appease lint
bveeramani Sep 29, 2022
b1d9b33
Merge branch 'bveeramani/partition' into bveeramani/read-images
bveeramani Sep 29, 2022
2f65750
Delete model
bveeramani Sep 29, 2022
2d23510
Update pytorch_training_e2e.py
bveeramani Sep 29, 2022
3138d7b
Merge branch 'master' into bveeramani/read-images
bveeramani Oct 4, 2022
2dfd0fd
Appease lint
bveeramani Oct 4, 2022
ad8f81c
Minor fixes
bveeramani Oct 4, 2022
151309b
Update documentation
bveeramani Oct 4, 2022
d827ccb
Remove references
bveeramani Oct 4, 2022
5d6af8b
Update creating-datasets.rst
bveeramani Oct 4, 2022
46f0292
Update read_benchmark.py
bveeramani Oct 4, 2022
9c1c277
Minor fixes
bveeramani Oct 4, 2022
208089b
Fix CI
bveeramani Oct 4, 2022
ddd342f
Update read_api.py
bveeramani Oct 4, 2022
0dc6dbe
Address review comments
bveeramani Oct 6, 2022
0bf9734
Merge branch 'master' into bveeramani/read-images
bveeramani Oct 6, 2022
bdae9f4
Merge branch 'master' into bveeramani/read-images
bveeramani Oct 6, 2022
4c98cf8
Update test_dataset_image.py
bveeramani Oct 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ Text

.. autofunction:: ray.data.read_text

Images (experimental)
---------------------

.. autofunction:: ray.data.read_images

Binary
------

Expand Down Expand Up @@ -167,7 +172,7 @@ Built-in Datasources
.. autoclass:: ray.data.datasource.FileBasedDatasource
:members:

.. autoclass:: ray.data.datasource.ImageFolderDatasource
.. autoclass:: ray.data.datasource.ImageDatasource
:members:

.. autoclass:: ray.data.datasource.JSONDatasource
Expand Down
29 changes: 16 additions & 13 deletions doc/source/data/creating-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,22 @@ Supported File Formats

See the API docs for :func:`read_text() <ray.data.read_text>`.

.. tabbed:: Images (experimental)

Call :func:`~ray.data.read_images` to read images into a :class:`~ray.data.Dataset`.

This function stores image data in single-column
`Arrow Table <https://arrow.apache.org/docs/python/generated/pyarrow.Table.html>`__
blocks using the
:class:`tensor extension type <ray.data.extensions.tensor_extension.ArrowTensorType>`.
For more information on working with tensors in Datasets, read the
:ref:`tensor data guide <datasets_tensor_support>`.

.. literalinclude:: ./doc_code/creating_datasets.py
:language: python
:start-after: __read_images_begin__
:end-before: __read_images_end__

.. tabbed:: Binary

Read binary files into a ``Dataset``. Each binary file will be treated as a single row
Expand Down Expand Up @@ -533,19 +549,6 @@ converts it into a Ray Dataset directly.
ray_datasets["train"].take(2)
# [{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}]

.. _datasets_from_images:

-------------------------------
From Image Files (experimental)
-------------------------------

Load image data stored as individual files using :py:class:`~ray.data.datasource.ImageFolderDatasource`:

.. literalinclude:: ./doc_code/tensor.py
:language: python
:start-after: __create_images_begin__
:end-before: __create_images_end__

.. _datasets_custom_datasource:

------------------
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/dataset-tensor-support.rst
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ This section shows how to create single and multi-column tensor datasets.

.. tabbed:: Images (experimental)

Load image data stored as individual files using :class:`~ray.data.datasource.ImageFolderDatasource`:
Load image data stored as individual files using :func:`~ray.data.read_images`:

**Image and label columns**:

Expand Down
3 changes: 3 additions & 0 deletions doc/source/data/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ Supported Input Formats
* - Text Files
- :func:`ray.data.read_text()`
- ✅
* - Image Files (experimental)
- :func:`ray.data.read_images()`
- 🚧
* - Binary Files
- :func:`ray.data.read_binary_files()`
- ✅
Expand Down
17 changes: 17 additions & 0 deletions doc/source/data/doc_code/creating_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,23 @@
# __from_numpy_end__
# fmt: on

# fmt: off
# __read_images_begin__
ds = ray.data.read_images("example://image-datasets/simple")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={__value__: ArrowTensorType(shape=(32, 32, 3), dtype=uint8)})

ds.take(1)
# -> [array([[[ 88, 70, 68],
# [103, 88, 85],
# [112, 96, 97],
# ...,
# [168, 151, 81],
# [167, 149, 83],
# [166, 148, 82]]], dtype=uint8)]
# __read_images_end__
# fmt: on

# fmt: off
# __from_numpy_mult_begin__
import numpy as np
Expand Down
33 changes: 10 additions & 23 deletions doc/source/data/doc_code/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,31 +194,18 @@ def cast_udf(block: pa.Table) -> pa.Table:
ds.fully_executed()

# __create_images_begin__
from ray.data.datasource import ImageFolderDatasource

ds = ray.data.read_datasource(
ImageFolderDatasource(), root="example://image-folders/simple", size=(128, 128))
# -> Dataset(num_blocks=3, num_rows=3,
# schema={image: TensorDtype(shape=(128, 128, 3), dtype=uint8),
# label: object})
ds = ray.data.read_images("example://image-datasets/simple")
# -> Dataset(num_blocks=3, num_rows=3,
# schema={__value__: ArrowTensorType(shape=(32, 32, 3), dtype=uint8)})

ds.take(1)
# -> [{'image':
# array([[[ 92, 71, 57],
# [107, 87, 72],
# ...,
# [141, 161, 185],
# [139, 158, 184]],
#
# ...,
#
# [[135, 135, 109],
# [135, 135, 108],
# ...,
# [167, 150, 89],
# [165, 146, 90]]], dtype=uint8),
# 'label': 'cat',
# }]
# -> [array([[[ 88, 70, 68],
# [103, 88, 85],
# [112, 96, 97],
# ...,
# [168, 151, 81],
# [167, 149, 83],
# [166, 148, 82]]], dtype=uint8)]
# __create_images_end__


Expand Down
15 changes: 6 additions & 9 deletions doc/source/ray-air/examples/torch_image_batch_pretrained.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pandas as pd
import numpy as np

from torchvision import transforms
from torchvision.models import resnet18
Expand All @@ -7,10 +8,9 @@
from ray.train.torch import TorchCheckpoint, TorchPredictor
from ray.train.batch_predictor import BatchPredictor
from ray.data.preprocessors import BatchMapper
from ray.data.datasource import ImageFolderDatasource


def preprocess(df: pd.DataFrame) -> pd.DataFrame:
def preprocess(batch: np.ndarray) -> pd.DataFrame:
"""
User Pytorch code to transform user image. Note we still use pandas as
intermediate format to hold images as shorthand of python dictionary.
Expand All @@ -23,20 +23,17 @@ def preprocess(df: pd.DataFrame) -> pd.DataFrame:
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
]
)
df.loc[:, "image"] = [preprocess(x).numpy() for x in df["image"]]
return df
return pd.DataFrame({"image": [preprocess(image) for image in batch]})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're showing off a NumPy-only UDF, we shouldn't return a pandas DataFrame; instead, we can return a single ndarray (or dict of ndarrays, if we're wanting to change to a human-readable column name), which Datasets will convert back into a tabular format. This is both better UX for the UDF developer and should be more efficient under-the-hood (Datasets will represent the imagery tensor column in an Arrow Table rather than a Pandas DataFrame, which is more reliably zero-copy and has a smaller wire footprint).

Suggested change
return pd.DataFrame({"image": [preprocess(image) for image in batch]})
return np.array([preprocess(image).numpy() for image in batch])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, could we match what @jiaodong is doing in their NumPy narrow waist for prediction PR, where the torchvision transform is vectorized over the input ndarray? That should be doable with the current API, just need to do the same transpose as in that PR: https://github.com/ray-project/ray/pull/28917/files#diff-e2bccb297d421f0dcff1892c4f23993064f52b17710787c41c3a2ae9dbc84159

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I.e. basically this:

def preprocess(image_batch: np.ndarray) -> np.ndarray:
    """
    User Pytorch code to transform user image with outer dimension of batch size.
    """
    preprocess = transforms.Compose(
        [
            # Torchvision's ToTensor does not accept outer batch dimension
            transforms.CenterCrop(224),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]
    )
    # Outer dimension is batch size such as (8, 256, 256, 3) -> (8, 3, 256, 256)
    image_batch = torch.as_tensor(image_batch.transpose(0, 3, 1, 2))
    return preprocess(transposed_torch_tensor).numpy()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't return an ndarray, because then I get

Traceback (most recent call last):
  File "/Users/balaji/Documents/GitHub/ray/doc/source/ray-air/examples/torch_image_batch_pretrained.py", line 39, in <module>
    predictor.predict(dataset)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/train/batch_predictor.py", line 228, in predict
    prediction_results = data.map_batches(
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/dataset.py", line 561, in map_batches
    return Dataset(plan, self._epoch, self._lazy)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/dataset.py", line 217, in __init__
    self._plan.execute(allow_clear_input_blocks=False)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/plan.py", line 308, in execute
    blocks, stage_info = stage(
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/plan.py", line 662, in __call__
    blocks = compute._apply(
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/compute.py", line 378, in _apply
    raise e
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/compute.py", line 366, in _apply
    new_metadata = ray.get(new_metadata)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/_private/worker.py", line 2279, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(AttributeError): ray::BlockWorker.map_block_nosplit() (pid=18412, ip=127.0.0.1, repr=<ray.data._internal.compute.BlockWorker object at 0x11fa37010>)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/compute.py", line 274, in map_block_nosplit
    return _map_block_nosplit(
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/_internal/compute.py", line 439, in _map_block_nosplit
    for new_block in block_fn(block, *fn_args, **fn_kwargs):
  File "/Users/balaji/Documents/GitHub/ray/python/ray/data/dataset.py", line 523, in transform
    applied = batch_fn(view, *fn_args, **fn_kwargs)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/train/batch_predictor.py", line 202, in __call__
    prediction_output = self._predictor.predict(
  File "/Users/balaji/Documents/GitHub/ray/python/ray/train/torch/torch_predictor.py", line 198, in predict
    return super(TorchPredictor, self).predict(data=data, dtype=dtype)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/train/predictor.py", line 158, in predict
    predictions_df = self._predict_pandas(data_df, **kwargs)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/train/_internal/dl_predictor.py", line 67, in _predict_pandas
    tensors = convert_pandas_to_batch_type(
  File "/Users/balaji/Documents/GitHub/ray/python/ray/air/util/data_batch_conversion.py", line 89, in convert_pandas_to_batch_type
    data = _cast_ndarray_columns_to_tensor_extension(data)
  File "/Users/balaji/Documents/GitHub/ray/python/ray/air/util/data_batch_conversion.py", line 217, in _cast_ndarray_columns_to_tensor_extension
    for col_name, col in df.items():
AttributeError: 'numpy.ndarray' object has no attribute 'items'

Could we address this in a follow-up PR? I can create an issue to track.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I forgot that the preprocessor is going to be applied within the predictor, which doesn't have the NumPy narrow waist merged yet.

Since we need to convert NumPy ndarray batches to pandas DataFrame batches with read_images() now returning a tensor dataset, I suppose this is fine as-is, with the expectation that whichever PR is merged second will need to resolve merge conflicts and converge to what I gave above (ndarray in, ndarray out, vectorized torchvision transform).



data_url = "s3://anonymous@air-example-data-2/1G-image-data-synthetic-raw"
print(f"Running GPU batch prediction with 1GB data from {data_url}")
dataset = ray.data.read_datasource(
ImageFolderDatasource(), root=data_url, size=(256, 256)
)
dataset = ray.data.read_images(data_url, size=(256, 256)).limit(10)

model = resnet18(pretrained=True)

preprocessor = BatchMapper(preprocess)
preprocessor = BatchMapper(preprocess, batch_format="numpy")
ckpt = TorchCheckpoint.from_model(model=model, preprocessor=preprocessor)

predictor = BatchPredictor.from_checkpoint(ckpt, TorchPredictor)
predictor.predict(dataset, feature_columns=["image"], batch_size=80)
predictor.predict(dataset, batch_size=80)
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
read_binary_files,
read_csv,
read_datasource,
read_images,
read_json,
read_numpy,
read_parquet,
Expand Down Expand Up @@ -57,6 +58,7 @@
"read_binary_files",
"read_csv",
"read_datasource",
"read_images",
"read_json",
"read_numpy",
"read_parquet",
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
FileMetadataProvider,
ParquetMetadataProvider,
)
from ray.data.datasource.image_folder_datasource import ImageFolderDatasource
from ray.data.datasource.image_datasource import ImageDatasource
from ray.data.datasource.json_datasource import JSONDatasource
from ray.data.datasource.numpy_datasource import NumpyDatasource
from ray.data.datasource.parquet_base_datasource import ParquetBaseDatasource
Expand Down Expand Up @@ -55,7 +55,7 @@
"FileBasedDatasource",
"FileExtensionFilter",
"FileMetadataProvider",
"ImageFolderDatasource",
"ImageDatasource",
"JSONDatasource",
"NumpyDatasource",
"ParquetBaseDatasource",
Expand Down
Loading