Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Add from_tf #29591

Merged
merged 9 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ HuggingFace

.. autofunction:: ray.data.from_huggingface

TensorFlow
----------

.. autofunction:: ray.data.from_tf


.. _data_source_api:

Expand Down Expand Up @@ -190,9 +195,6 @@ Built-in Datasources
.. autoclass:: ray.data.datasource.RangeDatasource
:members:

.. autoclass:: ray.data.datasource.SimpleTensorFlowDatasource
:members:

.. autoclass:: ray.data.datasource.SimpleTorchDatasource
:members:

Expand Down
23 changes: 8 additions & 15 deletions doc/source/data/creating-datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -499,29 +499,22 @@ From Torch and TensorFlow
.. tabbed:: TensorFlow

If you already have a TensorFlow dataset available, you can create a Ray Dataset
using :py:class:`SimpleTensorFlowDatasource`.
using :class:`~ray.data.from_tf`.

.. warning::
:py:class:`SimpleTensorFlowDatasource` doesn't support parallel reads. You
should only use this datasource for small datasets like MNIST or CIFAR.
:class:`~ray.data.from_tf` doesn't support parallel reads. You
should only use this function with small datasets like MNIST or CIFAR.

.. code-block:: python

import ray.data
from ray.data.datasource import SimpleTensorFlowDatasource
import ray
import tensorflow_datasets as tfds

def dataset_factory():
return tfds.load("cifar10", split=["train"], as_supervised=True)[0]
dataset, _ = tfds.load("cifar10", split=["train", "test"])
dataset = ray.data.from_tf(dataset)

dataset = ray.data.read_datasource(
SimpleTensorFlowDatasource(),
parallelism=1,
dataset_factory=dataset_factory
)
features, label = dataset.take(1)[0]
features.shape # TensorShape([32, 32, 3])
label # <tf.Tensor: shape=(), dtype=int64, numpy=7>
dataset
# -> Dataset(num_blocks=200, num_rows=50000, schema={id: binary, image: ArrowTensorType(shape=(32, 32, 3), dtype=uint8), label: int64})

.. _dataset_from_huggingface:

Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from_pandas,
from_pandas_refs,
from_spark,
from_tf,
range,
range_arrow,
range_table,
Expand Down Expand Up @@ -64,6 +65,7 @@
"from_pandas",
"from_pandas_refs",
"from_spark",
"from_tf",
"from_huggingface",
"range",
"range_table",
Expand Down
2 changes: 0 additions & 2 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
Partitioning,
)
from ray.data.datasource.tfrecords_datasource import TFRecordDatasource
from ray.data.datasource.tensorflow_datasource import SimpleTensorFlowDatasource
from ray.data.datasource.text_datasource import TextDatasource
from ray.data.datasource.torch_datasource import SimpleTorchDatasource

Expand Down Expand Up @@ -70,7 +69,6 @@
"RangeDatasource",
"ReadTask",
"Reader",
"SimpleTensorFlowDatasource",
"SimpleTorchDatasource",
"TextDatasource",
"TFRecordDatasource",
Expand Down
73 changes: 0 additions & 73 deletions python/ray/data/datasource/tensorflow_datasource.py

This file was deleted.

53 changes: 53 additions & 0 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import pandas
import pyarrow
import pyspark
import tensorflow as tf


T = TypeVar("T")
Expand Down Expand Up @@ -1326,6 +1327,58 @@ def convert(ds: "datasets.Dataset") -> Dataset[ArrowRow]:
)


@PublicAPI
def from_tf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we expand this?

Suggested change
def from_tf(
def from_tensorflow(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a method called Dataset.to_tf. I prefer tensorflow over tf, but we should be consistent.

What do you think? We could always rename Dataset.to_tf to Dataset.to_tensorflow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right... Maybe we can:

  1. Name this from_tensorflow.
  2. Create a to_tensorflow.
  3. Have to_tf redirect to to_tensorflow and eventually deprecate it.

But will leave to folks like @clarkzinzow @c21 to make the call here.

Copy link
Contributor

@clarkzinzow clarkzinzow Nov 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd actually vote for renaming these to from_tf_dataset, to_tf_dataset, from_torch_dataset, to_torch_dataset, etc. This is less ambiguous ("Am I getting a TensorFlow tensor or dataset with this API?"), discoverable, and better matches what the rest of the ecosystem is doing, e.g. HuggingFace and Petastorm.

I think that tf is a common enough shorthand that we shouldn't expand it to tensorflow, but happy to hear others' opinions, and maybe we can do a quick ecosystem check to verify this.

dataset: "tf.data.Dataset",
) -> Dataset:
"""Create a dataset from a TensorFlow dataset.

This function is inefficient. Use it to read small datasets or prototype.

.. warning::
If your dataset is large, this function may execute slowly or raise an
out-of-memory error. To avoid issues, read the underyling data with a function
like :meth:`~ray.data.read_images`.

.. note::
This function isn't paralellized. It loads the entire dataset into the head
node's memory before moving the data to the distributed object store.

Examples:
>>> import ray
>>> import tensorflow_datasets as tfds
>>> dataset, _ = tfds.load('cifar10', split=["train", "test"])
>>> dataset = ray.data.from_tf(dataset)
>>> dataset
Dataset(num_blocks=200, num_rows=50000, schema={id: binary, image: ArrowTensorType(shape=(32, 32, 3), dtype=uint8), label: int64})
>>> dataset.take(1) # doctest: +SKIP
[{'id': b'train_16399', 'image': array([[[143, 96, 70],
[141, 96, 72],
[135, 93, 72],
...,
[ 96, 37, 19],
[105, 42, 18],
[104, 38, 20]],

...,

[[195, 161, 126],
[187, 153, 123],
[186, 151, 128],
...,
[212, 177, 147],
[219, 185, 155],
[221, 187, 157]]], dtype=uint8), 'label': 7}]

Args:
dataset: A TensorFlow dataset.

Returns:
A :class:`Dataset` that contains the samples stored in the TensorFlow dataset.
""" # noqa: E501
return from_items(list(dataset.as_numpy_iterator()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misc. thing to note that I hit before in tests: .as_numpy_iterator() doesn't work with ragged tensors. tensorflow/tensorflow#53149



def _df_to_block(df: "pandas.DataFrame") -> Block[ArrowRow]:
stats = BlockExecStats.builder()
import pyarrow as pa
Expand Down
14 changes: 4 additions & 10 deletions python/ray/data/tests/test_dataset_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from ray.data.datasource import (
Datasource,
DummyOutputDatasource,
SimpleTensorFlowDatasource,
SimpleTorchDatasource,
WriteResult,
)
Expand Down Expand Up @@ -192,23 +191,18 @@ def test_write_datasource(ray_start_regular_shared, pipelined):
assert ray.get(output.data_sink.get_rows_written.remote()) == 10


def test_tensorflow_datasource(ray_start_regular_shared):
def test_from_tf(ray_start_regular_shared):
import tensorflow as tf
import tensorflow_datasets as tfds

tf_dataset = tfds.load("mnist", split=["train"], as_supervised=True)[0]
tf_dataset = tf_dataset.take(8) # Use subset to make test run faster.

def dataset_factory():
return tfds.load("mnist", split=["train"], as_supervised=True)[0]

ray_dataset = ray.data.read_datasource(
SimpleTensorFlowDatasource(), parallelism=1, dataset_factory=dataset_factory
).fully_executed()

assert ray_dataset.num_blocks() == 1
ray_dataset = ray.data.from_tf(tf_dataset)

actual_data = ray_dataset.take_all()
expected_data = list(tf_dataset)
assert len(actual_data) == len(expected_data)
for (expected_features, expected_label), (actual_features, actual_label) in zip(
expected_data, actual_data
):
Expand Down