diff --git a/doc/source/data/dataset.rst b/doc/source/data/dataset.rst index cfddc6e472cb..06f07bb19ba6 100644 --- a/doc/source/data/dataset.rst +++ b/doc/source/data/dataset.rst @@ -289,8 +289,8 @@ Supported Output Formats * - PyTorch Tensor Iterator - :meth:`ds.iter_torch_batches() ` - ✅ - * - TensorFlow Iterable Dataset - - :meth:`ds.to_tf() ` + * - TensorFlow Tensor Iterator + - :meth:`ds.iter_tf_batches() ` - ✅ * - Random Access Dataset - :meth:`ds.to_random_access_dataset() ` diff --git a/doc/source/data/doc_code/accessing_datasets.py b/doc/source/data/doc_code/accessing_datasets.py index 303a6148c4f1..bea47ea825b5 100644 --- a/doc/source/data/doc_code/accessing_datasets.py +++ b/doc/source/data/doc_code/accessing_datasets.py @@ -142,13 +142,12 @@ def consume(data: ray.data.Dataset[int]) -> int: ds = ray.data.range(10000) -tf_ds: tf.data.Dataset = ds.to_tf( +tf_batches = ds.iter_tf_batches( batch_size=2, - output_signature=tf.TensorSpec(shape=(None, 1), dtype=tf.int64), ) num_batches = 0 -for batch in tf_ds: +for batch in tf_batches: assert isinstance(batch, tf.Tensor) assert batch.shape[0] == 2, batch.shape num_batches += 1 @@ -171,26 +170,21 @@ def consume(data: ray.data.Dataset[int]) -> int: }) ds = ray.data.from_pandas(df) -# Specify the label column; all other columns will be treated as feature columns and -# will be concatenated into the same TensorFlow tensor. -tf_ds: tf.data.Dataset = ds.to_tf( - label_column="label", +tf_batches = ds.iter_tf_batches( batch_size=2, - output_signature=( - tf.TensorSpec(shape=(None, 2), dtype=tf.int64), - tf.TensorSpec(shape=(None,), dtype=tf.int64), - ), ) num_batches = 0 -for feature, label in tf_ds: - assert isinstance(feature, tf.Tensor) +for batch in tf_batches: + feature1 = batch["feature1"] + feature2 = batch["feature2"] + label = batch["label"] + assert isinstance(feature1, tf.Tensor) + assert isinstance(feature2, tf.Tensor) assert isinstance(label, tf.Tensor) # Batch dimension. - assert feature.shape[0] == 2 - # Column dimension. - assert feature.shape[1] == 2 - # Batch dimension. + assert feature1.shape[0] == 2 + assert feature2.shape[0] == 2 assert label.shape[0] == 2 num_batches += 1 diff --git a/doc/source/ray-air/doc_code/tf_starter.py b/doc/source/ray-air/doc_code/tf_starter.py index 8fd69bcd1081..4ae15573d948 100644 --- a/doc/source/ray-air/doc_code/tf_starter.py +++ b/doc/source/ray-air/doc_code/tf_starter.py @@ -52,18 +52,25 @@ def train_func(config: dict): dataset = session.get_dataset_shard("train") + def to_tf_dataset(dataset, batch_size): + def to_tensor_iterator(): + for batch in dataset.iter_tf_batches( + batch_size=batch_size, dtypes=tf.float32 + ): + yield batch["x"], batch["y"] + + output_signature = ( + tf.TensorSpec(shape=(None), dtype=tf.float32), + tf.TensorSpec(shape=(None), dtype=tf.float32), + ) + tf_dataset = tf.data.Dataset.from_generator( + to_tensor_iterator, output_signature=output_signature + ) + return prepare_dataset_shard(tf_dataset) + results = [] for _ in range(epochs): - tf_dataset = prepare_dataset_shard( - dataset.to_tf( - label_column="y", - output_signature=( - tf.TensorSpec(shape=(None, 1), dtype=tf.float32), - tf.TensorSpec(shape=(None), dtype=tf.float32), - ), - batch_size=batch_size, - ) - ) + tf_dataset = to_tf_dataset(dataset=dataset, batch_size=batch_size) history = multi_worker_model.fit(tf_dataset, callbacks=[Callback()]) results.append(history.history) return results diff --git a/doc/source/ray-air/examples/tfx_tabular_train_to_serve.ipynb b/doc/source/ray-air/examples/tfx_tabular_train_to_serve.ipynb index 700362a30e1e..d62b2bfbe052 100644 --- a/doc/source/ray-air/examples/tfx_tabular_train_to_serve.ipynb +++ b/doc/source/ray-air/examples/tfx_tabular_train_to_serve.ipynb @@ -206,6 +206,7 @@ "source": [ "import pandas as pd\n", "\n", + "INPUT = \"input\"\n", "LABEL = \"is_big_tip\"\n", "\n", "def get_data() -> pd.DataFrame:\n", @@ -574,7 +575,7 @@ " from ray.data.extensions import TensorArray\n", " result = {}\n", " feature_cols = [col for col in dataframe.columns if col != LABEL]\n", - " result[\"input\"] = TensorArray(dataframe[feature_cols].to_numpy(dtype=np.float32))\n", + " result[INPUT] = TensorArray(dataframe[feature_cols].to_numpy(dtype=np.float32))\n", " if LABEL in dataframe.columns:\n", " result[LABEL] = dataframe[LABEL]\n", " return pd.DataFrame(result)\n", @@ -694,20 +695,29 @@ " metrics=[\"accuracy\"],\n", " )\n", "\n", + " def to_tf_dataset(dataset, batch_size):\n", + " def to_tensor_iterator():\n", + " for batch in dataset.iter_tf_batches(\n", + " batch_size=batch_size, dtypes=tf.float32, drop_last=True,\n", + " ):\n", + " yield batch[INPUT], batch[LABEL]\n", + "\n", + " output_signature = (\n", + " tf.TensorSpec(shape=(BATCH_SIZE, INPUT_SIZE), dtype=tf.float32),\n", + " tf.TensorSpec(shape=(BATCH_SIZE), dtype=tf.int64),\n", + " )\n", + " tf_dataset = tf.data.Dataset.from_generator(\n", + " to_tensor_iterator, output_signature=output_signature\n", + " )\n", + " return prepare_dataset_shard(tf_dataset)\n", + "\n", " for epoch in range(EPOCH): \n", " # This will make sure that the training workers will get their own\n", " # share of batch to work on.\n", " # See `ray.train.tensorflow.prepare_dataset_shard` for more information.\n", - " tf_dataset = prepare_dataset_shard(\n", - " dataset_shard.to_tf(\n", - " label_column=LABEL,\n", - " output_signature=(\n", - " tf.TensorSpec(shape=(BATCH_SIZE, INPUT_SIZE), dtype=tf.float32),\n", - " tf.TensorSpec(shape=(BATCH_SIZE,), dtype=tf.int64),\n", - " ),\n", - " batch_size=BATCH_SIZE,\n", - " drop_last=True,\n", - " )\n", + " tf_dataset = to_tf_dataset(\n", + " dataset=dataset_shard,\n", + " batch_size=BATCH_SIZE,\n", " )\n", "\n", " model.fit(tf_dataset, verbose=0)\n", diff --git a/python/ray/air/examples/tf/tensorflow_autoencoder_example.py b/python/ray/air/examples/tf/tensorflow_autoencoder_example.py index 76b70240b91b..54da25d72718 100644 --- a/python/ray/air/examples/tf/tensorflow_autoencoder_example.py +++ b/python/ray/air/examples/tf/tensorflow_autoencoder_example.py @@ -90,18 +90,27 @@ def train_func(config: dict): ], ) + def to_tf_dataset(dataset, batch_size): + def to_tensor_iterator(): + for batch in dataset.iter_tf_batches( + batch_size=batch_size, dtypes=tf.float32 + ): + yield batch["image"], batch["label"] + + output_signature = ( + tf.TensorSpec(shape=(None, 784), dtype=tf.float32), + tf.TensorSpec(shape=(None, 784), dtype=tf.float32), + ) + tf_dataset = tf.data.Dataset.from_generator( + to_tensor_iterator, output_signature=output_signature + ) + return prepare_dataset_shard(tf_dataset) + results = [] for epoch in range(epochs): - tf_dataset = prepare_dataset_shard( - dataset_shard.to_tf( - feature_columns=["image"], - label_column="label", - output_signature=( - tf.TensorSpec(shape=(None, 784), dtype=tf.float32), - tf.TensorSpec(shape=(None, 784), dtype=tf.float32), - ), - batch_size=per_worker_batch_size, - ) + tf_dataset = to_tf_dataset( + dataset=dataset_shard, + batch_size=per_worker_batch_size, ) history = multi_worker_model.fit( tf_dataset, callbacks=[TrainCheckpointReportCallback()] diff --git a/python/ray/air/examples/tf/tensorflow_regression_example.py b/python/ray/air/examples/tf/tensorflow_regression_example.py index d2cbaf7ff01d..6b6157f1502a 100644 --- a/python/ray/air/examples/tf/tensorflow_regression_example.py +++ b/python/ray/air/examples/tf/tensorflow_regression_example.py @@ -62,18 +62,25 @@ def train_func(config: dict): dataset = session.get_dataset_shard("train") + def to_tf_dataset(dataset, batch_size): + def to_tensor_iterator(): + for batch in dataset.iter_tf_batches( + batch_size=batch_size, dtypes=tf.float32 + ): + yield batch["x"], batch["y"] + + output_signature = ( + tf.TensorSpec(shape=(None, 100), dtype=tf.float32), + tf.TensorSpec(shape=(None), dtype=tf.float32), + ) + tf_dataset = tf.data.Dataset.from_generator( + to_tensor_iterator, output_signature=output_signature + ) + return prepare_dataset_shard(tf_dataset) + results = [] for _ in range(epochs): - tf_dataset = prepare_dataset_shard( - dataset.to_tf( - label_column="y", - output_signature=( - tf.TensorSpec(shape=(None, 100), dtype=tf.float32), - tf.TensorSpec(shape=(None), dtype=tf.float32), - ), - batch_size=batch_size, - ) - ) + tf_dataset = to_tf_dataset(dataset=dataset, batch_size=batch_size) history = multi_worker_model.fit( tf_dataset, callbacks=[TrainCheckpointReportCallback()] ) diff --git a/python/ray/air/tests/test_keras_callback.py b/python/ray/air/tests/test_keras_callback.py index c97887f48413..41629b05a918 100644 --- a/python/ray/air/tests/test_keras_callback.py +++ b/python/ray/air/tests/test_keras_callback.py @@ -46,17 +46,24 @@ def train_func(config: dict): dataset = session.get_dataset_shard("train") - for _ in range(config.get("epoch", 3)): - tf_dataset = prepare_dataset_shard( - dataset.to_tf( - label_column="y", - output_signature=( - tf.TensorSpec(shape=(None, 1), dtype=tf.float32), - tf.TensorSpec(shape=(None), dtype=tf.float32), - ), - batch_size=32, - ) + def to_tf_dataset(dataset, batch_size): + def to_tensor_iterator(): + for batch in dataset.iter_tf_batches( + batch_size=batch_size, dtypes=tf.float32 + ): + yield batch["x"], batch["y"] + + output_signature = ( + tf.TensorSpec(shape=(None), dtype=tf.float32), + tf.TensorSpec(shape=(None), dtype=tf.float32), + ) + tf_dataset = tf.data.Dataset.from_generator( + to_tensor_iterator, output_signature=output_signature ) + return prepare_dataset_shard(tf_dataset) + + for _ in range(config.get("epoch", 3)): + tf_dataset = to_tf_dataset(dataset=dataset, batch_size=32) multi_worker_model.fit(tf_dataset, callbacks=[Callback()]) diff --git a/python/ray/train/tensorflow/tensorflow_trainer.py b/python/ray/train/tensorflow/tensorflow_trainer.py index 43b6c2263585..092fcf5c9d76 100644 --- a/python/ray/train/tensorflow/tensorflow_trainer.py +++ b/python/ray/train/tensorflow/tensorflow_trainer.py @@ -75,7 +75,7 @@ def train_loop_per_worker(): def train_loop_per_worker(): # Turns off autosharding for a dataset. # You should use this if you are doing - # `session.get_dataset_shard(...).to_tf(...)` + # `session.get_dataset_shard(...).iter_tf_batches(...)` # as the data will be already sharded. train.tensorflow.prepare_dataset_shard(...) @@ -113,17 +113,24 @@ def train_loop_for_worker(config): model.compile( optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) - for epoch in range(config["num_epochs"]): - tf_dataset = prepare_dataset_shard( - dataset_shard.to_tf( - label_column="y", - output_signature=( - tf.TensorSpec(shape=(None, 1), dtype=tf.float32), - tf.TensorSpec(shape=(None), dtype=tf.float32), - ), - batch_size=1, - ) + def to_tf_dataset(dataset, batch_size): + def to_tensor_iterator(): + for batch in dataset.iter_tf_batches( + batch_size=batch_size, dtypes=tf.float32 + ): + yield tf.expand_dims(batch["x"], 1), batch["y"] + + output_signature = ( + tf.TensorSpec(shape=(None, 1), dtype=tf.float32), + tf.TensorSpec(shape=(None), dtype=tf.float32), + ) + tf_dataset = tf.data.Dataset.from_generator( + to_tensor_iterator, output_signature=output_signature ) + return prepare_dataset_shard(tf_dataset) + + for epoch in range(config["num_epochs"]): + tf_dataset = to_tf_dataset(dataset=dataset_shard, batch_size=1) model.fit(tf_dataset) # You can also use ray.air.callbacks.keras.Callback # for reporting and checkpointing instead of reporting manually. diff --git a/python/ray/train/tensorflow/train_loop_utils.py b/python/ray/train/tensorflow/train_loop_utils.py index 4f1f32b3d892..c21144b4bdbc 100644 --- a/python/ray/train/tensorflow/train_loop_utils.py +++ b/python/ray/train/tensorflow/train_loop_utils.py @@ -7,9 +7,10 @@ def prepare_dataset_shard(tf_dataset_shard: tf.data.Dataset): """A utility function that disables Tensorflow autosharding. - This should be used on a TensorFlow ``Dataset`` created by calling ``to_tf()`` - on a ``ray.data.Dataset`` returned by ``ray.train.get_dataset_shard()`` since - the dataset has already been sharded across the workers. + This should be used on a TensorFlow ``Dataset`` created by calling + ``iter_tf_batches()`` on a ``ray.data.Dataset`` returned by + ``ray.train.get_dataset_shard()`` since the dataset has already been sharded across + the workers. Args: tf_dataset_shard (tf.data.Dataset): A TensorFlow Dataset.