Skip to content

Commit

Permalink
[Data] Extend API to enable passing sample weights via ray.dataset.to…
Browse files Browse the repository at this point in the history
…_tf (#45701) (#46784)

`tensorflow.keras.model.fit` supports callers to pass in sample weights
along with input dataset
(https://www.tensorflow.org/versions/r2.8/api_docs/python/tf/keras/Model#fit).
However, `ray.data.Dataset.to_tf()` does not support this feature.
Currently, callers can only generate data instances comprised of
features and labels.

This PR extends `ray.data.Dataset.to_tf()` to allow callers to specify
additional metadata associated with data samples. Specifically,
`additional_columns` is introduced as a forced kwargs to
`ray.data.Dataset.to_tf()` and `ray.DataIterator.to_tf()`. When
`additional_columns` is not specified, there is no change to the API
behavior. By contrast, while `additional_columns` is provided, the APIs
will yield `additional_metadata` along with `features` and `labels`. To
leverage the functionality for passing sample weights along with sample
features and labels in tensorflow, one can create `tf.data.Dataset` from
`ray.data.Dataset.to_tf()` and specifying `additional_columns="weight"`
as `weight` is the column for storing sample weights in
`ray.data.Dataset`.

We do not explicitly name the new argument `sample_weight` or `weight`
and do not limit its type to be `str` as there may be other metadata
associated with each sample that we want to yield while iterating
through the dataset.

We follow the heuristic of inferring `feature_type_spec` and
`label_type_spec` when determining `additional_type_spec`. Existing
tests are extended to validate when the APIs are invoked with and
without the `additional_columns` argument to ensure existing callers
won't fail to invoke `ray.data.Dataset.to_tf()` and
`ray.DataIterator.to_tf()` as this new change rolls out.

## Related issue number
Resolves #45701 
---------

Signed-off-by: jeffreyjeffreywang <[email protected]>
Co-authored-by: jeffreyjeffreywang <[email protected]>
Co-authored-by: Scott Lee <[email protected]>
  • Loading branch information
3 people authored Aug 5, 2024
1 parent 1a02e3c commit 06fb5fc
Show file tree
Hide file tree
Showing 4 changed files with 388 additions and 79 deletions.
42 changes: 41 additions & 1 deletion python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4063,13 +4063,15 @@ def to_tf(
feature_columns: Union[str, List[str]],
label_columns: Union[str, List[str]],
*,
additional_columns: Union[str, List[str]] = None,
prefetch_batches: int = 1,
batch_size: int = 1,
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
feature_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
label_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
additional_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
) -> "tf.data.Dataset":
"""Return a `TensorFlow Dataset <https://www.tensorflow.org/api_docs/python/tf/data/Dataset/>`_
over this :class:`~ray.data.Dataset`.
Expand Down Expand Up @@ -4125,13 +4127,45 @@ def to_tf(
>>> ds.to_tf("features", "target")
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
If your model accepts different types, shapes, or names of tensors as input, specify the type spec.
If type specs are not specified, they are automatically inferred from the schema of the dataset.
>>> import tensorflow as tf
>>> ds.to_tf(
... feature_columns="features",
... label_columns="target",
... feature_type_spec=tf.TensorSpec(shape=(None, 4), dtype=tf.float32, name="features"),
... label_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="label")
... )
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float32, name='features'), TensorSpec(shape=(None,), dtype=tf.float32, name='label'))>
If your model accepts additional metadata aside from features and label, specify a single additional column or a list of additional columns.
A common use case is to include sample weights in the data samples and train a ``tf.keras.Model`` with ``tf.keras.Model.fit``.
>>> ds = ds.add_column("sample weights", lambda df: 1)
>>> ds.to_tf(feature_columns="features", label_columns="target", additional_columns="sample weights")
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.int64, name='sample weights'))>
If your model accepts different types, shapes, or names for the additional metadata, specify the type spec of the additional column.
>>> ds.to_tf(
... feature_columns="features",
... label_columns="target",
... additional_columns="sample weights",
... additional_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="weight")
... )
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.float32, name='weight'))>
Args:
feature_columns: Columns that correspond to model inputs. If this is a
string, the input data is a tensor. If this is a list, the input data
is a ``dict`` that maps column names to their tensor representation.
label_column: Columns that correspond to model targets. If this is a
label_columns: Columns that correspond to model targets. If this is a
string, the target data is a tensor. If this is a list, the target data
is a ``dict`` that maps column names to their tensor representation.
additional_columns: Columns that correspond to sample weights or other metadata.
If this is a string, the weight data is a tensor. If this is a list, the
weight data is a ``dict`` that maps column names to their tensor representation.
prefetch_batches: The number of batches to fetch ahead of the current batch
to fetch. If set to greater than 0, a separate threadpool is used
to fetch the objects to the local node, format the batches, and apply
Expand All @@ -4158,6 +4192,10 @@ def to_tf(
only one column, specify a `tf.TypeSpec`. If there are multiple columns,
specify a ``dict`` that maps column names to their `tf.TypeSpec`.
Default is `None` to automatically infer the type of each column.
additional_type_spec: The `tf.TypeSpec` of `additional_columns`. If there
is only one column, specify a `tf.TypeSpec`. If there are multiple
columns, specify a ``dict`` that maps column names to their `tf.TypeSpec`.
Default is `None` to automatically infer the type of each column.
Returns:
A `TensorFlow Dataset`_ that yields inputs and targets.
Expand All @@ -4171,13 +4209,15 @@ def to_tf(
return self.iterator().to_tf(
feature_columns=feature_columns,
label_columns=label_columns,
additional_columns=additional_columns,
prefetch_batches=prefetch_batches,
drop_last=drop_last,
batch_size=batch_size,
local_shuffle_buffer_size=local_shuffle_buffer_size,
local_shuffle_seed=local_shuffle_seed,
feature_type_spec=feature_type_spec,
label_type_spec=label_type_spec,
additional_type_spec=additional_type_spec,
)

@ConsumptionAPI(pattern="Time complexity:")
Expand Down
89 changes: 78 additions & 11 deletions python/ray/data/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,13 +676,17 @@ def to_tf(
feature_columns: Union[str, List[str]],
label_columns: Union[str, List[str]],
*,
additional_columns: Union[Optional[str], Optional[List[str]]] = None,
prefetch_batches: int = 1,
batch_size: int = 1,
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
feature_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
label_type_spec: Union["tf.TypeSpec", Dict[str, "tf.TypeSpec"]] = None,
additional_type_spec: Union[
Optional["tf.TypeSpec"], Optional[Dict[str, "tf.TypeSpec"]]
] = None,
) -> "tf.data.Dataset":
"""Return a TF Dataset over this dataset.
Expand All @@ -709,12 +713,12 @@ def to_tf(
If your model accepts a single tensor as input, specify a single feature column.
>>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target") # doctest: +SKIP
>>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target")
<_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
If your model accepts a dictionary as input, specify a list of feature columns.
>>> it.to_tf(["sepal length (cm)", "sepal width (cm)"], "target") # doctest: +SKIP
>>> it.to_tf(["sepal length (cm)", "sepal width (cm)"], "target")
<_OptionsDataset element_spec=({'sepal length (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), 'sepal width (cm)': TensorSpec(shape=(None,), dtype=tf.float64, name='sepal width (cm)')}, TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
If your dataset contains multiple features but your model accepts a single
Expand All @@ -736,16 +740,49 @@ def to_tf(
target: int64
}
))
>>> it.to_tf("features", "target") # doctest: +SKIP
>>> it.to_tf("features", "target")
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float64, name='features'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'))>
If your model accepts different types, shapes, or names of tensors as input, specify the type spec.
If type specs are not specified, they are automatically inferred from the schema of the iterator.
>>> import tensorflow as tf
>>> it.to_tf(
... feature_columns="features",
... label_columns="target",
... feature_type_spec=tf.TensorSpec(shape=(None, 4), dtype=tf.float32, name="features"),
... label_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="label")
... )
<_OptionsDataset element_spec=(TensorSpec(shape=(None, 4), dtype=tf.float32, name='features'), TensorSpec(shape=(None,), dtype=tf.float32, name='label'))>
If your model accepts additional metadata aside from features and label, specify a single additional column or a list of additional columns.
A common use case is to include sample weights in the data samples and train a ``tf.keras.Model`` with ``tf.keras.Model.fit``.
>>> ds = ds.add_column("sample weights", lambda df: 1)
>>> it = ds.iterator()
>>> it.to_tf(feature_columns="sepal length (cm)", label_columns="target", additional_columns="sample weights")
<_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.int64, name='sample weights'))>
If your model accepts different types, shapes, or names for the additional metadata, specify the type spec of the additional column.
>>> it.to_tf(
... feature_columns="sepal length (cm)",
... label_columns="target",
... additional_columns="sample weights",
... additional_type_spec=tf.TensorSpec(shape=(None,), dtype=tf.float32, name="weight")
... )
<_OptionsDataset element_spec=(TensorSpec(shape=(None,), dtype=tf.float64, name='sepal length (cm)'), TensorSpec(shape=(None,), dtype=tf.int64, name='target'), TensorSpec(shape=(None,), dtype=tf.float32, name='weight'))>
Args:
feature_columns: Columns that correspond to model inputs. If this is a
string, the input data is a tensor. If this is a list, the input data
is a ``dict`` that maps column names to their tensor representation.
label_column: Columns that correspond to model targets. If this is a
label_columns: Columns that correspond to model targets. If this is a
string, the target data is a tensor. If this is a list, the target data
is a ``dict`` that maps column names to their tensor representation.
additional_columns: Columns that correspond to sample weights or other metadata.
If this is a string, the weight data is a tensor. If this is a list, the
weight data is a ``dict`` that maps column names to their tensor representation.
prefetch_batches: The number of batches to fetch ahead of the current batch
to fetch. If set to greater than 0, a separate threadpool will be used
to fetch the objects to the local node, format the batches, and apply
Expand All @@ -772,6 +809,10 @@ def to_tf(
only one column, specify a `tf.TypeSpec`. If there are multiple columns,
specify a ``dict`` that maps column names to their `tf.TypeSpec`.
Default is `None` to automatically infer the type of each column.
additional_type_spec: The `tf.TypeSpec` of `additional_columns`. If there
is only one column, specify a `tf.TypeSpec`. If there are multiple
columns, specify a ``dict`` that maps column names to their `tf.TypeSpec`.
Default is `None` to automatically infer the type of each column.
Returns:
A ``tf.data.Dataset`` that yields inputs and targets.
Expand All @@ -790,9 +831,10 @@ def to_tf(
def validate_column(column: str) -> None:
if column not in valid_columns:
raise ValueError(
f"You specified '{column}' in `feature_columns` or "
f"`label_columns`, but there's no column named '{column}' in the "
f"dataset. Valid column names are: {valid_columns}."
f"You specified '{column}' in `feature_columns`, "
f"`label_columns`, or `additional_columns`, but there's no "
f"column named '{column}' in the dataset. "
f"Valid column names are: {valid_columns}."
)

def validate_columns(columns: Union[str, List]) -> None:
Expand Down Expand Up @@ -832,7 +874,16 @@ def generator():
labels = convert_batch_to_tensors(
batch, columns=label_columns, type_spec=label_type_spec
)
yield features, labels

if additional_columns is None:
yield features, labels
else:
additional_metadata = convert_batch_to_tensors(
batch,
columns=additional_columns,
type_spec=additional_type_spec,
)
yield features, labels, additional_metadata

if feature_type_spec is None or label_type_spec is None:
schema = self.schema()
Expand All @@ -842,9 +893,25 @@ def generator():
feature_type_spec = get_type_spec(schema, columns=feature_columns)
label_type_spec = get_type_spec(schema, columns=label_columns)

dataset = tf.data.Dataset.from_generator(
generator, output_signature=(feature_type_spec, label_type_spec)
)
if additional_columns is not None and additional_type_spec is None:
schema = self.schema()
valid_columns = set(schema.names)
validate_columns(additional_columns)
additional_type_spec = get_type_spec(schema, columns=additional_columns)

if additional_columns is not None:
dataset = tf.data.Dataset.from_generator(
generator,
output_signature=(
feature_type_spec,
label_type_spec,
additional_type_spec,
),
)
else:
dataset = tf.data.Dataset.from_generator(
generator, output_signature=(feature_type_spec, label_type_spec)
)

options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = (
Expand Down
21 changes: 17 additions & 4 deletions python/ray/data/tests/test_iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,35 @@ def test_basic_dataset_iter_rows(ray_start_regular_shared):
# assert it.stats() == ds.stats()


def test_tf_conversion(ray_start_regular_shared):
@pytest.mark.parametrize("include_additional_columns", [False, True])
def test_tf_conversion(ray_start_regular_shared, include_additional_columns):
ds = ray.data.range(5)
it = ds.iterator()
tf_dataset = it.to_tf("id", "id")

if include_additional_columns:
tf_dataset = it.to_tf("id", "id", additional_columns="id")
else:
tf_dataset = it.to_tf("id", "id")

for i, row in enumerate(tf_dataset):
assert all(row[0] == i)
assert all(row[1] == i)
assert isinstance(row[0], tf.Tensor)
assert isinstance(row[1], tf.Tensor)
if include_additional_columns:
assert all(row[2] == i)
assert isinstance(row[2], tf.Tensor)


def test_tf_e2e(ray_start_regular_shared):
@pytest.mark.parametrize("include_additional_columns", [False, True])
def test_tf_e2e(ray_start_regular_shared, include_additional_columns):
ds = ray.data.range(5)
it = ds.iterator()
model = build_model()
model.fit(it.to_tf("id", "id"), epochs=3)
if include_additional_columns:
model.fit(it.to_tf("id", "id", additional_columns="id"), epochs=3)
else:
model.fit(it.to_tf("id", "id"), epochs=3)


def test_torch_conversion(ray_start_regular_shared):
Expand Down
Loading

0 comments on commit 06fb5fc

Please sign in to comment.