From 48772f9bb59886b7b396471d4806118e97d25472 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Wed, 19 Oct 2022 22:18:08 +0000 Subject: [PATCH 1/3] Support custom resource with remote args in random_shuffle_each_window() Signed-off-by: jianoaix --- python/ray/data/dataset_pipeline.py | 5 +++- .../ray/data/tests/test_dataset_pipeline.py | 26 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 059c10f73cf0..272d3c060476 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -824,11 +824,14 @@ def random_shuffle_each_window( *, seed: Optional[int] = None, num_blocks: Optional[int] = None, + **ray_remote_args, ) -> "DatasetPipeline[U]": """Apply :py:meth:`Dataset.random_shuffle ` to each dataset/window in this pipeline.""" return self.foreach_window( - lambda ds: ds.random_shuffle(seed=seed, num_blocks=num_blocks) + lambda ds: ds.random_shuffle( + seed=seed, num_blocks=num_blocks, **ray_remote_args + ) ) def sort_each_window( diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index c33c1602a691..5c02ea196579 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -634,6 +634,32 @@ def test_drop_columns(ray_start_regular_shared): assert pipe.drop_columns(["col2"]).take(1) == [{"col1": 1, "col3": 3}] +def test_random_shuffle_each_window_with_custom_resource(ray_start_cluster): + cluster = ray_start_cluster + # Create two nodes which have different custom resources. + cluster.add_node( + resources={"foo": 100}, + num_cpus=1, + ) + cluster.add_node(resources={"bar": 100}, num_cpus=1) + + ray.init(cluster.address) + + # Run dataset in "bar" nodes. + pipe = ray.data.read_datasource( + ray.data.datasource.RangeDatasource(), + parallelism=10, + n=1000, + block_format="list", + ray_remote_args={"resources": {"bar": 1}}, + ).repeat(3) + pipe = pipe.random_shuffle_each_window(resources={"bar": 1}) + for batch in pipe.iter_batches(): + pass + assert "1 nodes used" in pipe.stats() + assert "2 nodes used" not in pipe.stats() + + def test_in_place_transformation_doesnt_clear_objects(ray_start_regular_shared): ds = ray.data.from_items([1, 2, 3, 4, 5, 6]) From 5a96dc23bbbd9efb107f4cb0e17f242121bfc0e2 Mon Sep 17 00:00:00 2001 From: jianoaix Date: Wed, 19 Oct 2022 22:20:10 +0000 Subject: [PATCH 2/3] fix Signed-off-by: jianoaix --- python/ray/data/tests/test_dataset_pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index 5c02ea196579..30f6283167bd 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -645,7 +645,7 @@ def test_random_shuffle_each_window_with_custom_resource(ray_start_cluster): ray.init(cluster.address) - # Run dataset in "bar" nodes. + # Run pipeline in "bar" nodes. pipe = ray.data.read_datasource( ray.data.datasource.RangeDatasource(), parallelism=10, From 1623bdc8c34286863ee4ccd7227d197a3883388e Mon Sep 17 00:00:00 2001 From: jianoaix Date: Thu, 20 Oct 2022 01:11:08 +0000 Subject: [PATCH 3/3] fix Signed-off-by: jianoaix --- python/ray/data/tests/test_dataset_pipeline.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/data/tests/test_dataset_pipeline.py b/python/ray/data/tests/test_dataset_pipeline.py index 30f6283167bd..202d599c90be 100644 --- a/python/ray/data/tests/test_dataset_pipeline.py +++ b/python/ray/data/tests/test_dataset_pipeline.py @@ -635,6 +635,7 @@ def test_drop_columns(ray_start_regular_shared): def test_random_shuffle_each_window_with_custom_resource(ray_start_cluster): + ray.shutdown() cluster = ray_start_cluster # Create two nodes which have different custom resources. cluster.add_node(