From c8892760126ae1fd169015671c2802b84605f40a Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Thu, 17 Nov 2022 14:11:40 -0800 Subject: [PATCH 1/2] Add batch_format to noop BatchMapper in batch predictor logic Signed-off-by: Justin Yu --- python/ray/train/batch_predictor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/train/batch_predictor.py b/python/ray/train/batch_predictor.py index abe133599187..e73eb04c0ad2 100644 --- a/python/ray/train/batch_predictor.py +++ b/python/ray/train/batch_predictor.py @@ -287,7 +287,9 @@ def __call__(self, input_batch: DataBatchType) -> DataBatchType: if preprocessor: # Set the in-predictor preprocessing to a no-op when using a separate # GPU stage. Otherwise, the preprocessing will be applied twice. - override_prep = BatchMapper(lambda x: x) + override_prep = BatchMapper( + lambda x: x, batch_format=predict_stage_batch_format + ) # preprocessor.transform will break for DatasetPipeline due to # missing _dataset_format() batch_fn = preprocessor._transform_batch From eec712e7aed9690bcb5715de8c7c4c7829f7599d Mon Sep 17 00:00:00 2001 From: Justin Yu Date: Thu, 17 Nov 2022 14:13:22 -0800 Subject: [PATCH 2/2] Update all examples/tests that use BatchMapper to specify batch_format Signed-off-by: Justin Yu --- doc/source/ray-air/doc_code/air_ingest.py | 12 ++++++------ doc/source/ray-air/doc_code/preprocessors.py | 2 +- .../examples/huggingface_text_classification.ipynb | 8 ++++---- .../examples/tfx_tabular_train_to_serve.ipynb | 10 +++++----- python/ray/air/tests/test_dataset_config.py | 6 +++--- python/ray/air/util/check_ingest.py | 4 ++-- python/ray/train/tests/test_base_trainer.py | 2 +- .../air_benchmarks/mlperf-train/resnet50_ray_air.py | 10 ++++++---- .../air_benchmarks/workloads/data_benchmark.py | 2 +- 9 files changed, 29 insertions(+), 27 deletions(-) diff --git a/doc/source/ray-air/doc_code/air_ingest.py b/doc/source/ray-air/doc_code/air_ingest.py index 753484acae7e..f993a0b52404 100644 --- a/doc/source/ray-air/doc_code/air_ingest.py +++ b/doc/source/ray-air/doc_code/air_ingest.py @@ -14,8 +14,8 @@ # An example preprocessor chain that just scales all values by 4.0 in two stages. preprocessor = Chain( - BatchMapper(lambda df: df * 2), - BatchMapper(lambda df: df * 2), + BatchMapper(lambda df: df * 2, batch_format="pandas"), + BatchMapper(lambda df: df * 2, batch_format="pandas"), ) # __check_ingest_1_end__ @@ -93,7 +93,7 @@ from ray.air.config import ScalingConfig # A simple preprocessor that just scales all values by 2.0. -preprocessor = BatchMapper(lambda df: df * 2) +preprocessor = BatchMapper(lambda df: df * 2, batch_format="pandas") def train_loop_per_worker(): @@ -128,7 +128,7 @@ def train_loop_per_worker(): from ray.air.config import ScalingConfig, DatasetConfig # A simple preprocessor that just scales all values by 2.0. -preprocessor = BatchMapper(lambda df: df * 2) +preprocessor = BatchMapper(lambda df: df * 2, batch_format="pandas") def train_loop_per_worker(): @@ -253,7 +253,7 @@ def train_loop_per_worker(): # A simple preprocessor that just scales all values by 2.0. -preprocessor = BatchMapper(lambda df: df * 2) +preprocessor = BatchMapper(lambda df: df * 2, batch_format="pandas") my_trainer = TorchTrainer( train_loop_per_worker, @@ -290,7 +290,7 @@ def train_loop_per_worker(): # A simple preprocessor that just scales all values by 2.0. -preprocessor = BatchMapper(lambda df: df * 2) +preprocessor = BatchMapper(lambda df: df * 2, batch_format="pandas") my_trainer = TorchTrainer( train_loop_per_worker, diff --git a/doc/source/ray-air/doc_code/preprocessors.py b/doc/source/ray-air/doc_code/preprocessors.py index 815a8c1e2fa7..3cdc2e4b7bc7 100644 --- a/doc/source/ray-air/doc_code/preprocessors.py +++ b/doc/source/ray-air/doc_code/preprocessors.py @@ -133,7 +133,7 @@ # [{'value': 0}, {'value': 1}, {'value': 2}, {'value': 3}] # Create a stateless preprocess that multiplies values by 2. -preprocessor = BatchMapper(lambda df: df * 2, batch_size=2) +preprocessor = BatchMapper(lambda df: df * 2, batch_size=2, batch_format="pandas") dataset_transformed = preprocessor.transform(dataset) print(dataset_transformed.take()) # [{'value': 0}, {'value': 2}, {'value': 4}, {'value': 6}] diff --git a/doc/source/ray-air/examples/huggingface_text_classification.ipynb b/doc/source/ray-air/examples/huggingface_text_classification.ipynb index 81fa1d226772..14a5a07d68f1 100644 --- a/doc/source/ray-air/examples/huggingface_text_classification.ipynb +++ b/doc/source/ray-air/examples/huggingface_text_classification.ipynb @@ -500,7 +500,7 @@ " ret = {**examples, **ret}\n", " return pd.DataFrame.from_dict(ret)\n", "\n", - "batch_encoder = BatchMapper(preprocess_function)" + "batch_encoder = BatchMapper(preprocess_function, batch_format=\"pandas\")" ] }, { @@ -2255,7 +2255,7 @@ "provenance": [] }, "kernelspec": { - "display_name": "Python 3.8.10 ('venv': venv)", + "display_name": "Python 3.8.9 64-bit", "language": "python", "name": "python3" }, @@ -2269,11 +2269,11 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.10" + "version": "3.8.9" }, "vscode": { "interpreter": { - "hash": "3c0d54d489a08ae47a06eae2fd00ff032d6cddb527c382959b7b2575f6a8167f" + "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" } } }, diff --git a/doc/source/ray-air/examples/tfx_tabular_train_to_serve.ipynb b/doc/source/ray-air/examples/tfx_tabular_train_to_serve.ipynb index 36218f764aa7..1bc1fd344c4e 100644 --- a/doc/source/ray-air/examples/tfx_tabular_train_to_serve.ipynb +++ b/doc/source/ray-air/examples/tfx_tabular_train_to_serve.ipynb @@ -587,8 +587,8 @@ " imputer4,\n", " imputer5,\n", " ohe,\n", - " BatchMapper(batch_mapper_fn),\n", - " BatchMapper(concat_for_tensor)\n", + " BatchMapper(batch_mapper_fn, batch_format=\"pandas\"),\n", + " BatchMapper(concat_for_tensor, batch_format=\"pandas\")\n", " )\n", " return chained_pp\n" ] @@ -1888,7 +1888,7 @@ "provenance": [] }, "kernelspec": { - "display_name": "Python 3.10.8 ('.venv': venv)", + "display_name": "Python 3.8.9 64-bit", "language": "python", "name": "python3" }, @@ -1902,11 +1902,11 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.8" + "version": "3.8.9" }, "vscode": { "interpreter": { - "hash": "c704e19737f24b51bc631dadcac7a7e356bb35d1c5cd7766248d8a6946059909" + "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" } } }, diff --git a/python/ray/air/tests/test_dataset_config.py b/python/ray/air/tests/test_dataset_config.py index 931d2ed34798..a7d4fc7f5118 100644 --- a/python/ray/air/tests/test_dataset_config.py +++ b/python/ray/air/tests/test_dataset_config.py @@ -120,7 +120,7 @@ def test_error(ray_start_4_cpus): {"train": 10}, dataset_config={}, datasets={"train": ds}, - preprocessor=BatchMapper(lambda x: x), + preprocessor=BatchMapper(lambda x: x, batch_format="pandas"), ) test.fit() @@ -260,7 +260,7 @@ def checker(shard, results): def rand(x): return [random.random() for _ in range(len(x))] - prep = BatchMapper(rand) + prep = BatchMapper(rand, batch_format="pandas") ds = ray.data.range_table(5, parallelism=1) test = TestStream( checker, @@ -275,7 +275,7 @@ def test_stream_finite_window_nocache_prep(ray_start_4_cpus): def rand(x): return [random.random() for _ in range(len(x))] - prep = BatchMapper(rand) + prep = BatchMapper(rand, batch_format="pandas") ds = ray.data.range_table(5) # Test the default 1GiB window size. diff --git a/python/ray/air/util/check_ingest.py b/python/ray/air/util/check_ingest.py index ddbdd4f748a9..d9b9a442501d 100755 --- a/python/ray/air/util/check_ingest.py +++ b/python/ray/air/util/check_ingest.py @@ -163,8 +163,8 @@ def generate_epochs(data: Union[Dataset, DatasetPipeline], epochs: int): # An example preprocessor chain that just scales all values by 4.0 in two stages. preprocessor = Chain( - BatchMapper(lambda df: df * 2), - BatchMapper(lambda df: df * 2), + BatchMapper(lambda df: df * 2, batch_format="pandas"), + BatchMapper(lambda df: df * 2, batch_format="pandas"), ) # Setup the dummy trainer that prints ingest stats. diff --git a/python/ray/train/tests/test_base_trainer.py b/python/ray/train/tests/test_base_trainer.py index 18fd2fd1bf75..714f8884296a 100644 --- a/python/ray/train/tests/test_base_trainer.py +++ b/python/ray/train/tests/test_base_trainer.py @@ -407,7 +407,7 @@ def map_fn(batch): assert ctx.target_max_block_size == target_max_block_size return batch + 1 - preprocessor = BatchMapper(map_fn) + preprocessor = BatchMapper(map_fn, batch_format="pandas") ctx = ray.data.context.DatasetContext.get_current() ctx.target_max_block_size = target_max_block_size diff --git a/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py b/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py index 9dde883a857d..1f818fba6e3c 100644 --- a/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py +++ b/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py @@ -221,7 +221,7 @@ def crop_and_flip_image_batch(image_batch): return image_batch -def decode_tf_record_batch(tf_record_batch): +def decode_tf_record_batch(tf_record_batch: pd.DataFrame) -> pd.DataFrame: def process_images(): for image_buffer in tf_record_batch["image/encoded"]: image_buffer = tf.reshape(image_buffer, shape=[]) @@ -237,7 +237,7 @@ def process_images(): return df -def decode_crop_and_flip_tf_record_batch(tf_record_batch): +def decode_crop_and_flip_tf_record_batch(tf_record_batch: pd.DataFrame) -> pd.DataFrame: """ This version of the preprocessor fuses the load step with the crop and flip step, which should have better performance (at the cost of re-executing the @@ -518,11 +518,13 @@ def append_to_test_output_json(path, metrics): batch_size = 32 if args.online_processing: preprocessor = BatchMapper( - decode_tf_record_batch, batch_size=batch_size + decode_tf_record_batch, batch_size=batch_size, batch_format="pandas" ) else: preprocessor = BatchMapper( - decode_crop_and_flip_tf_record_batch, batch_size=batch_size + decode_crop_and_flip_tf_record_batch, + batch_size=batch_size, + batch_format="pandas", ) train_loop_config["data_loader"] = RAY_DATA diff --git a/release/air_tests/air_benchmarks/workloads/data_benchmark.py b/release/air_tests/air_benchmarks/workloads/data_benchmark.py index 0f65a8ff83ae..837704ac8090 100644 --- a/release/air_tests/air_benchmarks/workloads/data_benchmark.py +++ b/release/air_tests/air_benchmarks/workloads/data_benchmark.py @@ -23,7 +23,7 @@ def make_ds(size_gb: int): def run_ingest_bulk(dataset, num_workers, num_cpus_per_worker): - dummy_prep = BatchMapper(lambda df: df * 2) + dummy_prep = BatchMapper(lambda df: df * 2, batch_format="pandas") trainer = DummyTrainer( scaling_config=ScalingConfig( num_workers=num_workers,