From 9a36755821a18179275257a456d5f8c811822464 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Tue, 5 Mar 2024 13:20:25 -0800 Subject: [PATCH] [Data] Remove Ray Data read parallelism from all libraries and tests (#43692) This PR is to remove Ray Data read `parallelism`, replace with `override_num_blocks` from all libraries and unit tests. The PR change on Ray Data is https://github.com/ray-project/ray/pull/43113 . Signed-off-by: Cheng Su --- .../modules/data/tests/test_data_head.py | 2 +- .../ray/air/tests/test_new_dataset_config.py | 12 +-- python/ray/air/util/check_ingest.py | 4 +- python/ray/data/dataset.py | 6 +- python/ray/data/read_api.py | 10 +-- python/ray/data/tests/test_all_to_all.py | 62 +++++++-------- .../ray/data/tests/test_auto_parallelism.py | 8 +- .../ray/data/tests/test_backpressure_e2e.py | 10 +-- .../data/tests/test_backpressure_policies.py | 4 +- python/ray/data/tests/test_binary.py | 4 +- python/ray/data/tests/test_block_sizing.py | 24 ++++-- python/ray/data/tests/test_consumption.py | 75 ++++++++++--------- .../data/tests/test_context_propagation.py | 6 +- python/ray/data/tests/test_csv.py | 16 ++-- python/ray/data/tests/test_datasink.py | 4 +- .../data/tests/test_dynamic_block_split.py | 20 ++--- .../data/tests/test_execution_optimizer.py | 36 ++++----- python/ray/data/tests/test_file_datasink.py | 2 +- python/ray/data/tests/test_formats.py | 8 +- python/ray/data/tests/test_huggingface.py | 4 +- python/ray/data/tests/test_image.py | 6 +- python/ray/data/tests/test_json.py | 16 ++-- python/ray/data/tests/test_map.py | 50 +++++++------ python/ray/data/tests/test_mongo.py | 16 ++-- python/ray/data/tests/test_numpy.py | 14 ++-- python/ray/data/tests/test_numpy_support.py | 14 ++-- python/ray/data/tests/test_object_gc.py | 22 +++--- python/ray/data/tests/test_optimize.py | 4 +- python/ray/data/tests/test_parquet.py | 22 ++++-- python/ray/data/tests/test_random_access.py | 4 +- .../data/tests/test_randomize_block_order.py | 2 +- python/ray/data/tests/test_size_estimation.py | 18 ++--- python/ray/data/tests/test_sort.py | 28 ++++--- python/ray/data/tests/test_split.py | 16 ++-- python/ray/data/tests/test_splitblocks.py | 22 +++--- python/ray/data/tests/test_sql.py | 8 +- python/ray/data/tests/test_stats.py | 42 ++++++----- .../test_streaming_executor_errored_blocks.py | 2 +- .../data/tests/test_streaming_integration.py | 28 +++---- python/ray/data/tests/test_tensor.py | 22 +++--- python/ray/data/tests/test_text.py | 2 +- python/ray/data/tests/test_tfrecords.py | 22 +++--- python/ray/data/tests/test_webdataset.py | 18 +++-- python/ray/experimental/tqdm_ray.py | 2 +- python/ray/tune/tests/test_tuner.py | 2 +- .../mlperf-train/resnet50_ray_air.py | 2 +- .../dataset/data_ingest_benchmark.py | 2 +- .../dataset/dataset_random_access.py | 2 +- .../dataset/image_loader_microbenchmark.py | 4 +- .../dataset/map_batches_benchmark.py | 2 +- .../dataset/operator_fusion_benchmark.py | 2 +- .../dataset/read_parquet_benchmark.py | 6 +- release/nightly_tests/dataset/sort.py | 2 +- 53 files changed, 403 insertions(+), 338 deletions(-) diff --git a/dashboard/modules/data/tests/test_data_head.py b/dashboard/modules/data/tests/test_data_head.py index 6feb5f5bc8ee..5689a5940fed 100644 --- a/dashboard/modules/data/tests/test_data_head.py +++ b/dashboard/modules/data/tests/test_data_head.py @@ -37,7 +37,7 @@ ) def test_get_datasets(): ray.init() - ds = ray.data.range(100, parallelism=20).map_batches(lambda x: x) + ds = ray.data.range(100, override_num_blocks=20).map_batches(lambda x: x) ds._set_name("data_head_test") ds.materialize() diff --git a/python/ray/air/tests/test_new_dataset_config.py b/python/ray/air/tests/test_new_dataset_config.py index f475e2699e91..34a949f25caf 100644 --- a/python/ray/air/tests/test_new_dataset_config.py +++ b/python/ray/air/tests/test_new_dataset_config.py @@ -239,15 +239,15 @@ def train_loop_per_worker(): def test_per_epoch_preprocessing(ray_start_4_cpus): - ds = ray.data.range(100, parallelism=100).randomize_block_order() + ds = ray.data.range(100, override_num_blocks=100).randomize_block_order() test = TestRandom(2, True, datasets={"train": ds}) test.fit() - ds = ray.data.range(100, parallelism=100).random_shuffle() + ds = ray.data.range(100, override_num_blocks=100).random_shuffle() test = TestRandom(2, True, datasets={"train": ds}) test.fit() - ds = ray.data.range(100, parallelism=100).map( + ds = ray.data.range(100, override_num_blocks=100).map( lambda x: {"id": x["id"] * random.random()} ) test = TestRandom(2, True, datasets={"train": ds}) @@ -257,7 +257,7 @@ def test_per_epoch_preprocessing(ray_start_4_cpus): def test_materialized_preprocessing(ray_start_4_cpus): # TODO(ekl) we should test all these configs with splitting enabled, but this # requires implementing deterministic streaming split. - ds = ray.data.range(100, parallelism=100).randomize_block_order() + ds = ray.data.range(100, override_num_blocks=100).randomize_block_order() ds = ds.materialize() test = TestRandom( 2, @@ -267,7 +267,7 @@ def test_materialized_preprocessing(ray_start_4_cpus): ) test.fit() - ds = ray.data.range(100, parallelism=100).random_shuffle() + ds = ray.data.range(100, override_num_blocks=100).random_shuffle() ds = ds.materialize() test = TestRandom( 2, @@ -277,7 +277,7 @@ def test_materialized_preprocessing(ray_start_4_cpus): ) test.fit() - ds = ray.data.range(100, parallelism=100).map( + ds = ray.data.range(100, override_num_blocks=100).map( lambda x: {"id": x["id"] * random.random()} ) ds = ds.materialize() diff --git a/python/ray/air/util/check_ingest.py b/python/ray/air/util/check_ingest.py index bc8765d12b4d..44bb4d14379b 100755 --- a/python/ray/air/util/check_ingest.py +++ b/python/ray/air/util/check_ingest.py @@ -169,8 +169,8 @@ def make_local_dataset_iterator( args = parser.parse_args() # Generate a synthetic dataset of ~10GiB of float64 data. The dataset is sharded - # into 100 blocks (parallelism=100). - ds = ray.data.range_tensor(50000, shape=(80, 80, 4), parallelism=100) + # into 100 blocks (override_num_blocks=100). + ds = ray.data.range_tensor(50000, shape=(80, 80, 4), override_num_blocks=100) # An example preprocessing chain that just scales all values by 4.0 in two stages. ds = ds.map_batches(lambda df: df * 2, batch_format="pandas") diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index b3e156b28b99..a84987aec92b 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -4398,7 +4398,7 @@ def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]: Examples: >>> import ray - >>> ds = ray.data.range(10, parallelism=2) + >>> ds = ray.data.range(10, override_num_blocks=2) >>> refs = ds.to_pandas_refs() >>> len(refs) 2 @@ -4426,7 +4426,7 @@ def to_numpy_refs( Examples: >>> import ray - >>> ds = ray.data.range(10, parallelism=2) + >>> ds = ray.data.range(10, override_num_blocks=2) >>> refs = ds.to_numpy_refs() >>> len(refs) 2 @@ -4461,7 +4461,7 @@ def to_arrow_refs(self) -> List[ObjectRef["pyarrow.Table"]]: Examples: >>> import ray - >>> ds = ray.data.range(10, parallelism=2) + >>> ds = ray.data.range(10, override_num_blocks=2) >>> refs = ds.to_arrow_refs() >>> len(refs) 2 diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index f1c1c612ed24..57ae01e8657c 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -468,7 +468,7 @@ def read_mongo( ... collection="my_collection", ... pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501 ... schema=Schema({"col1": pa.string(), "col2": pa.int64()}), - ... parallelism=10, + ... override_num_blocks=10, ... ) Args: @@ -1955,10 +1955,10 @@ def read_sql( ``LIMIT`` and ``OFFSET`` to fetch a subset of the rows. However, for many databases, ``OFFSET`` is slow. - As a workaround, set ``parallelism=1`` to directly fetch all rows in a single - task. Note that this approach requires all result rows to fit in the memory of - single task. If the rows don't fit, your program may raise an out of memory - error. + As a workaround, set ``override_num_blocks=1`` to directly fetch all rows in a + single task. Note that this approach requires all result rows to fit in the + memory of single task. If the rows don't fit, your program may raise an out of + memory error. Examples: diff --git a/python/ray/data/tests/test_all_to_all.py b/python/ray/data/tests/test_all_to_all.py index 0ea5efcd8da3..fe240e2ca223 100644 --- a/python/ray/data/tests/test_all_to_all.py +++ b/python/ray/data/tests/test_all_to_all.py @@ -20,8 +20,10 @@ def test_zip(ray_start_regular_shared): - ds1 = ray.data.range(5, parallelism=5) - ds2 = ray.data.range(5, parallelism=5).map(column_udf("id", lambda x: x + 1)) + ds1 = ray.data.range(5, override_num_blocks=5) + ds2 = ray.data.range(5, override_num_blocks=5).map( + column_udf("id", lambda x: x + 1) + ) ds = ds1.zip(ds2) assert ds.schema().names == ["id", "id_1"] assert ds.take() == named_values( @@ -39,8 +41,8 @@ def test_zip_different_num_blocks_combinations( ray_start_regular_shared, num_blocks1, num_blocks2 ): n = 12 - ds1 = ray.data.range(n, parallelism=num_blocks1) - ds2 = ray.data.range(n, parallelism=num_blocks2).map( + ds1 = ray.data.range(n, override_num_blocks=num_blocks1) + ds2 = ray.data.range(n, override_num_blocks=num_blocks2).map( column_udf("id", lambda x: x + 1) ) ds = ds1.zip(ds2) @@ -70,11 +72,11 @@ def test_zip_different_num_blocks_split_smallest( num_blocks1 = 4 num_blocks2 = 2 ds1 = ray.data.from_items( - [{str(i): i for i in range(num_cols1)}] * n, parallelism=num_blocks1 + [{str(i): i for i in range(num_cols1)}] * n, override_num_blocks=num_blocks1 ) ds2 = ray.data.from_items( [{str(i): i for i in range(num_cols1, num_cols1 + num_cols2)}] * n, - parallelism=num_blocks2, + override_num_blocks=num_blocks2, ) ds = ds1.zip(ds2).materialize() num_blocks = ds._plan._snapshot_blocks.executed_num_blocks() @@ -129,8 +131,8 @@ def foo(x): num_items = 10 items = list(range(num_items)) - ds1 = ray.data.from_items(items, parallelism=num_items) - ds2 = ray.data.from_items(items, parallelism=num_items) + ds1 = ray.data.from_items(items, override_num_blocks=num_items) + ds2 = ray.data.from_items(items, override_num_blocks=num_items) ds2 = ds2.map_batches(foo, batch_size=1) result = ds1.zip(ds2).take_all() assert result == named_values( @@ -139,7 +141,7 @@ def foo(x): def test_empty_shuffle(ray_start_regular_shared): - ds = ray.data.range(100, parallelism=100) + ds = ray.data.range(100, override_num_blocks=100) ds = ds.filter(lambda x: x) ds = ds.map_batches(lambda x: x) ds = ds.random_shuffle() # Would prev. crash with AssertionError: pyarrow.Table. @@ -147,7 +149,7 @@ def test_empty_shuffle(ray_start_regular_shared): def test_repartition_shuffle(ray_start_regular_shared): - ds = ray.data.range(20, parallelism=10) + ds = ray.data.range(20, override_num_blocks=10) assert ds._plan.initial_num_blocks() == 10 assert ds.sum() == 190 assert ds._block_num_rows() == [2] * 10 @@ -162,13 +164,13 @@ def test_repartition_shuffle(ray_start_regular_shared): assert ds3.sum() == 190 assert ds3._block_num_rows() == [2] * 10 + [0] * 10 - large = ray.data.range(10000, parallelism=10) + large = ray.data.range(10000, override_num_blocks=10) large = large.repartition(20, shuffle=True) assert large._block_num_rows() == [500] * 20 def test_repartition_noshuffle(ray_start_regular_shared): - ds = ray.data.range(20, parallelism=10) + ds = ray.data.range(20, override_num_blocks=10) assert ds._plan.initial_num_blocks() == 10 assert ds.sum() == 190 assert ds._block_num_rows() == [2] * 10 @@ -194,13 +196,13 @@ def test_repartition_noshuffle(ray_start_regular_shared): assert ds5._plan.initial_num_blocks() == 4 assert ds5._block_num_rows() == [5, 6, 5, 6] - large = ray.data.range(10000, parallelism=10) + large = ray.data.range(10000, override_num_blocks=10) large = large.repartition(20) assert large._block_num_rows() == [500] * 20 def test_repartition_shuffle_arrow(ray_start_regular_shared): - ds = ray.data.range(20, parallelism=10) + ds = ray.data.range(20, override_num_blocks=10) assert ds._plan.initial_num_blocks() == 10 assert ds.count() == 20 assert ds._block_num_rows() == [2] * 10 @@ -215,7 +217,7 @@ def test_repartition_shuffle_arrow(ray_start_regular_shared): assert ds3.count() == 20 assert ds3._block_num_rows() == [2] * 10 + [0] * 10 - large = ray.data.range(10000, parallelism=10) + large = ray.data.range(10000, override_num_blocks=10) large = large.repartition(20, shuffle=True) assert large._block_num_rows() == [500] * 20 @@ -1132,7 +1134,7 @@ def test_random_block_order(ray_start_regular_shared, restore_data_context): assert results == expected # Test LazyBlockList.randomize_block_order. - lazy_blocklist_ds = ray.data.range(12, parallelism=4) + lazy_blocklist_ds = ray.data.range(12, override_num_blocks=4) lazy_blocklist_ds = lazy_blocklist_ds.randomize_block_order(seed=0) lazy_blocklist_results = lazy_blocklist_ds.take() lazy_blocklist_expected = named_values("id", [6, 7, 8, 0, 1, 2, 3, 4, 5, 9, 10, 11]) @@ -1148,8 +1150,8 @@ def test_random_shuffle(shutdown_only, use_push_based_shuffle): r2 = ray.data.range(100).random_shuffle().take(999) assert r1 != r2, (r1, r2) - r1 = ray.data.range(100, parallelism=1).random_shuffle().take(999) - r2 = ray.data.range(100, parallelism=1).random_shuffle().take(999) + r1 = ray.data.range(100, override_num_blocks=1).random_shuffle().take(999) + r2 = ray.data.range(100, override_num_blocks=1).random_shuffle().take(999) assert r1 != r2, (r1, r2) assert ( @@ -1160,22 +1162,22 @@ def test_random_shuffle(shutdown_only, use_push_based_shuffle): r2 = ray.data.range(100).random_shuffle().repartition(1).take(999) assert r1 != r2, (r1, r2) - r0 = ray.data.range(100, parallelism=5).take(999) - r1 = ray.data.range(100, parallelism=5).random_shuffle(seed=0).take(999) - r2 = ray.data.range(100, parallelism=5).random_shuffle(seed=0).take(999) - r3 = ray.data.range(100, parallelism=5).random_shuffle(seed=12345).take(999) + r0 = ray.data.range(100, override_num_blocks=5).take(999) + r1 = ray.data.range(100, override_num_blocks=5).random_shuffle(seed=0).take(999) + r2 = ray.data.range(100, override_num_blocks=5).random_shuffle(seed=0).take(999) + r3 = ray.data.range(100, override_num_blocks=5).random_shuffle(seed=12345).take(999) assert r1 == r2, (r1, r2) assert r1 != r0, (r1, r0) assert r1 != r3, (r1, r3) - r0 = ray.data.range(100, parallelism=5).take(999) - r1 = ray.data.range(100, parallelism=5).random_shuffle(seed=0).take(999) - r2 = ray.data.range(100, parallelism=5).random_shuffle(seed=0).take(999) + r0 = ray.data.range(100, override_num_blocks=5).take(999) + r1 = ray.data.range(100, override_num_blocks=5).random_shuffle(seed=0).take(999) + r2 = ray.data.range(100, override_num_blocks=5).random_shuffle(seed=0).take(999) assert r1 == r2, (r1, r2) assert r1 != r0, (r1, r0) # Test move. - ds = ray.data.range(100, parallelism=2) + ds = ray.data.range(100, override_num_blocks=2) r1 = ds.random_shuffle().take(999) ds = ds.map(lambda x: x).take(999) r2 = ray.data.range(100).random_shuffle().take(999) @@ -1193,7 +1195,7 @@ def test_random_shuffle_check_random(shutdown_only): num_files = 10 num_rows = 100 items = [i for i in range(num_files) for _ in range(num_rows)] - ds = ray.data.from_items(items, parallelism=num_files) + ds = ray.data.from_items(items, override_num_blocks=num_files) out = ds.random_shuffle().take(num_files * num_rows) for i in range(num_files): part = out[i * num_rows : (i + 1) * num_rows] @@ -1220,7 +1222,7 @@ def test_random_shuffle_check_random(shutdown_only): num_files = 10 num_rows = 100 items = [j for i in range(num_files) for j in range(num_rows)] - ds = ray.data.from_items(items, parallelism=num_files) + ds = ray.data.from_items(items, override_num_blocks=num_files) out = ds.random_shuffle().take(num_files * num_rows) for i in range(num_files): part = out[i * num_rows : (i + 1) * num_rows] @@ -1252,7 +1254,7 @@ def test_random_shuffle_with_custom_resource(ray_start_cluster, use_push_based_s # Run dataset in "bar" nodes. ds = ray.data.read_parquet( "example://parquet_images_mini", - parallelism=2, + override_num_blocks=2, ray_remote_args={"resources": {"bar": 1}}, ) ds = ds.random_shuffle(resources={"bar": 1}).materialize() @@ -1279,7 +1281,7 @@ def get_node_id(): node1_id = ray.get(get_node_id.options(resources={"bar:1": 1}).remote()) node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote()) - ds = ray.data.range(100, parallelism=2).random_shuffle() + ds = ray.data.range(100, override_num_blocks=2).random_shuffle() blocks = ds.get_internal_block_refs() ray.wait(blocks, num_returns=len(blocks), fetch_local=False) location_data = ray.experimental.get_object_locations(blocks) diff --git a/python/ray/data/tests/test_auto_parallelism.py b/python/ray/data/tests/test_auto_parallelism.py index 765fea8fbcb3..42466ba13a1d 100644 --- a/python/ray/data/tests/test_auto_parallelism.py +++ b/python/ray/data/tests/test_auto_parallelism.py @@ -128,14 +128,14 @@ def test_auto_parallelism_basic(shutdown_only): context = DataContext.get_current() context.min_parallelism = 1 # Datasource bound. - ds = ray.data.range_tensor(5, shape=(100,), parallelism=-1) + ds = ray.data.range_tensor(5, shape=(100,), override_num_blocks=-1) assert ds._plan.initial_num_blocks() == 5, ds # CPU bound. TODO(ekl) we should fix range datasource to respect parallelism more # properly, currently it can go a little over. - ds = ray.data.range_tensor(10000, shape=(100,), parallelism=-1) + ds = ray.data.range_tensor(10000, shape=(100,), override_num_blocks=-1) assert ds._plan.initial_num_blocks() == 16, ds # Block size bound. - ds = ray.data.range_tensor(100000000, shape=(100,), parallelism=-1) + ds = ray.data.range_tensor(100000000, shape=(100,), override_num_blocks=-1) assert ds._plan.initial_num_blocks() >= 590, ds assert ds._plan.initial_num_blocks() <= 600, ds @@ -147,7 +147,7 @@ def test_auto_parallelism_placement_group(shutdown_only): def run(): context = DataContext.get_current() context.min_parallelism = 1 - ds = ray.data.range_tensor(2000, shape=(100,), parallelism=-1) + ds = ray.data.range_tensor(2000, shape=(100,), override_num_blocks=-1) return ds._plan.initial_num_blocks() # 1/16 * 4 * 16 = 4 diff --git a/python/ray/data/tests/test_backpressure_e2e.py b/python/ray/data/tests/test_backpressure_e2e.py index 049b57dcc10d..c3506df196db 100644 --- a/python/ray/data/tests/test_backpressure_e2e.py +++ b/python/ray/data/tests/test_backpressure_e2e.py @@ -59,7 +59,7 @@ def consume(batch): last_snapshot = get_initial_core_execution_metrics_snapshot() - ds = ray.data.range(NUM_ROWS_TOTAL, parallelism=NUM_TASKS) + ds = ray.data.range(NUM_ROWS_TOTAL, override_num_blocks=NUM_TASKS) ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK) ds = ds.map_batches(consume, batch_size=None, num_cpus=0.9) # Check core execution metrics every 10 rows, because it's expensive. @@ -106,7 +106,7 @@ def consumer(batch): del batch["data"] return batch - ds = ray.data.range(1, parallelism=1).materialize() + ds = ray.data.range(1, override_num_blocks=1).materialize() ds = ds.map_batches(producer, batch_size=None, num_cpus=producer_num_cpus) # Add a limit op in the middle, to test that ReservationOpResourceAllocator # will account limit op's resource usage to the previous producer map op. @@ -210,7 +210,7 @@ def map_fn(batch): batch["data"] = [np.zeros(block_size, dtype=np.uint8)] return batch - ds = ray.data.range(num_blocks, parallelism=num_blocks) + ds = ray.data.range(num_blocks, override_num_blocks=num_blocks) ds = ds.map_batches(map_fn, batch_size=None, num_cpus=1) assert len(ds.take_all()) == num_blocks @@ -265,7 +265,7 @@ def range_(i): ctx.execution_options.resource_limits.object_store_memory = 10e6 # 10GiB dataset. - ds = ray.data.read_datasource(source, n=10000, parallelism=1000) + ds = ray.data.read_datasource(source, n=10000, override_num_blocks=1000) it = iter(ds.iter_batches(batch_size=None, prefetch_batches=0)) next(it) time.sleep(3) @@ -290,7 +290,7 @@ def __call__(self, df: np.ndarray): return {"id": np.random.randn(1, 20, 1024, 1024)} ctx = ray.init(object_store_memory=4e9) - ds = ray.data.range_tensor(20, shape=(3, 1024, 1024), parallelism=20) + ds = ray.data.range_tensor(20, shape=(3, 1024, 1024), override_num_blocks=20) pipe = ds.map_batches( TestFast, diff --git a/python/ray/data/tests/test_backpressure_policies.py b/python/ray/data/tests/test_backpressure_policies.py index 8ec63fe7637e..bb66ea84d60d 100644 --- a/python/ray/data/tests/test_backpressure_policies.py +++ b/python/ray/data/tests/test_backpressure_policies.py @@ -114,7 +114,7 @@ def test_e2e_normal(self): # Create a dataset with 2 map ops. Each map op has N tasks, where N is # the number of cluster CPUs. N = self.__class__._cluster_cpus - ds = ray.data.range(N, parallelism=N) + ds = ray.data.range(N, override_num_blocks=N) # Use different `num_cpus` to make sure they don't fuse. ds = ds.map_batches(map_func1, batch_size=None, num_cpus=1, concurrency=1) ds = ds.map_batches(map_func2, batch_size=None, num_cpus=1.1, concurrency=1) @@ -150,7 +150,7 @@ def test_e2e_time_backpressure(self): # Create a dataset with 2 map ops. Each map op has N tasks, where N is # the number of cluster CPUs. N = self.__class__._cluster_cpus - ds = ray.data.range(N, parallelism=N) + ds = ray.data.range(N, override_num_blocks=N) # Use different `num_cpus` to make sure they don't fuse. ds = ds.map_batches(map_func1, batch_size=None, num_cpus=1, concurrency=1) ds = ds.map_batches(map_func2, batch_size=None, num_cpus=1.1, concurrency=1) diff --git a/python/ray/data/tests/test_binary.py b/python/ray/data/tests/test_binary.py index 7928939017cb..4f5a3e788ffe 100644 --- a/python/ray/data/tests/test_binary.py +++ b/python/ray/data/tests/test_binary.py @@ -41,7 +41,7 @@ def test_read_binary_files_partitioning(ray_start_regular_shared, tmp_path): def test_read_binary_files(ray_start_regular_shared): with gen_bin_files(10) as (_, paths): - ds = ray.data.read_binary_files(paths, parallelism=10) + ds = ray.data.read_binary_files(paths, override_num_blocks=10) for i, item in enumerate(ds.iter_rows()): expected = open(paths[i], "rb").read() assert expected == item["bytes"] @@ -73,7 +73,7 @@ def test_read_binary_files_with_fs(ray_start_regular_shared): with gen_bin_files(10) as (tempdir, paths): # All the paths are absolute, so we want the root file system. fs, _ = pa.fs.FileSystem.from_uri("/") - ds = ray.data.read_binary_files(paths, filesystem=fs, parallelism=10) + ds = ray.data.read_binary_files(paths, filesystem=fs, override_num_blocks=10) for i, item in enumerate(ds.iter_rows()): expected = open(paths[i], "rb").read() assert expected == item["bytes"] diff --git a/python/ray/data/tests/test_block_sizing.py b/python/ray/data/tests/test_block_sizing.py index 891cd8025614..ff340fb6ab4a 100644 --- a/python/ray/data/tests/test_block_sizing.py +++ b/python/ray/data/tests/test_block_sizing.py @@ -27,7 +27,7 @@ def test_map(shutdown_only, restore_data_context): last_snapshot = get_initial_core_execution_metrics_snapshot() # Test read. - ds = ray.data.range(100_000, parallelism=1).materialize() + ds = ray.data.range(100_000, override_num_blocks=1).materialize() assert ( num_blocks_expected <= ds._plan.initial_num_blocks() <= num_blocks_expected + 1 ) @@ -40,7 +40,11 @@ def test_map(shutdown_only, restore_data_context): # Test read -> map. # NOTE(swang): For some reason BlockBuilder's estimated memory usage when a # map fn is used is 2x the actual memory usage. - ds = ray.data.range(100_000, parallelism=1).map(lambda row: row).materialize() + ds = ( + ray.data.range(100_000, override_num_blocks=1) + .map(lambda row: row) + .materialize() + ) assert ( num_blocks_expected * 2 <= ds._plan.initial_num_blocks() @@ -57,7 +61,7 @@ def test_map(shutdown_only, restore_data_context): num_blocks_expected //= 2 # Test read. - ds = ray.data.range(100_000, parallelism=1).materialize() + ds = ray.data.range(100_000, override_num_blocks=1).materialize() assert ( num_blocks_expected <= ds._plan.initial_num_blocks() <= num_blocks_expected + 1 ) @@ -68,7 +72,11 @@ def test_map(shutdown_only, restore_data_context): ) # Test read -> map. - ds = ray.data.range(100_000, parallelism=1).map(lambda row: row).materialize() + ds = ( + ray.data.range(100_000, override_num_blocks=1) + .map(lambda row: row) + .materialize() + ) assert ( num_blocks_expected * 2 <= ds._plan.initial_num_blocks() @@ -85,7 +93,7 @@ def test_map(shutdown_only, restore_data_context): ctx.target_shuffle_max_block_size = ctx.target_max_block_size / 2 # Test read. - ds = ray.data.range(100_000, parallelism=1).materialize() + ds = ray.data.range(100_000, override_num_blocks=1).materialize() assert ( num_blocks_expected <= ds._plan.initial_num_blocks() <= num_blocks_expected + 1 ) @@ -96,7 +104,11 @@ def test_map(shutdown_only, restore_data_context): ) # Test read -> map. - ds = ray.data.range(100_000, parallelism=1).map(lambda row: row).materialize() + ds = ( + ray.data.range(100_000, override_num_blocks=1) + .map(lambda row: row) + .materialize() + ) assert ( num_blocks_expected * 2 <= ds._plan.initial_num_blocks() diff --git a/python/ray/data/tests/test_consumption.py b/python/ray/data/tests/test_consumption.py index 12260016ff7c..feae23c0d8eb 100644 --- a/python/ray/data/tests/test_consumption.py +++ b/python/ray/data/tests/test_consumption.py @@ -36,7 +36,7 @@ def test_schema(ray_start_regular): last_snapshot = get_initial_core_execution_metrics_snapshot() - ds2 = ray.data.range(10, parallelism=10) + ds2 = ray.data.range(10, override_num_blocks=10) ds3 = ds2.repartition(5) ds3 = ds3.materialize() last_snapshot = assert_core_execution_metrics_equals( @@ -85,7 +85,7 @@ def test_schema(ray_start_regular): def test_schema_no_execution(ray_start_regular): last_snapshot = get_initial_core_execution_metrics_snapshot() - ds = ray.data.range(100, parallelism=10) + ds = ray.data.range(100, override_num_blocks=10) last_snapshot = assert_core_execution_metrics_equals( CoreExecutionMetrics(task_count={"_get_datasource_or_legacy_reader": 1}), last_snapshot, @@ -121,7 +121,7 @@ def check_schema_cached(ds, expected_task_count, last_snapshot): return last_snapshot last_snapshot = get_initial_core_execution_metrics_snapshot() - ds = ray.data.from_items([{"a": i} for i in range(100)], parallelism=10) + ds = ray.data.from_items([{"a": i} for i in range(100)], override_num_blocks=10) last_snapshot = check_schema_cached(ds, {}, last_snapshot) # Add a map_batches operator so that we are forced to compute the schema. @@ -137,7 +137,7 @@ def check_schema_cached(ds, expected_task_count, last_snapshot): def test_count(ray_start_regular): - ds = ray.data.range(100, parallelism=10) + ds = ray.data.range(100, override_num_blocks=10) # We do not kick off the read task by default. assert ds._plan._in_blocks._num_computed() == 0 assert ds.count() == 100 @@ -152,8 +152,8 @@ def test_count(ray_start_regular): def test_limit_execution(ray_start_regular): last_snapshot = get_initial_core_execution_metrics_snapshot() - parallelism = 20 - ds = ray.data.range(100, parallelism=parallelism) + override_num_blocks = 20 + ds = ray.data.range(100, override_num_blocks=override_num_blocks) # Add some delay to the output to prevent all tasks from finishing # immediately. @@ -178,7 +178,8 @@ def delay(row): last_snapshot = assert_core_execution_metrics_equals( CoreExecutionMetrics( task_count={ - "ReadRange->Map(delay)": lambda count: count < parallelism / 2, + "ReadRange->Map(delay)": lambda count: count + < override_num_blocks / 2, "slice_fn": lambda count: count <= 1, } ), @@ -186,7 +187,7 @@ def delay(row): ) # .materialize().limit() should only trigger execution once. - ds = ray.data.range(100, parallelism=20).materialize() + ds = ray.data.range(100, override_num_blocks=20).materialize() last_snapshot = assert_core_execution_metrics_equals( CoreExecutionMetrics( task_count={ @@ -311,12 +312,12 @@ def test_basic(ray_start_regular_shared): def test_range(ray_start_regular_shared): - ds = ray.data.range(10, parallelism=10) + ds = ray.data.range(10, override_num_blocks=10) assert ds._plan.initial_num_blocks() == 10 assert ds.count() == 10 assert ds.take() == [{"id": i} for i in range(10)] - ds = ray.data.range(10, parallelism=2) + ds = ray.data.range(10, override_num_blocks=2) assert ds._plan.initial_num_blocks() == 2 assert ds.count() == 10 assert ds.take() == [{"id": i} for i in range(10)] @@ -425,7 +426,7 @@ def _check_none_computed(ds): def test_lazy_loading_exponential_rampup(ray_start_regular_shared): - ds = ray.data.range(100, parallelism=20) + ds = ray.data.range(100, override_num_blocks=20) _check_none_computed(ds) assert extract_values("id", ds.take(10)) == list(range(10)) _check_none_computed(ds) @@ -440,7 +441,7 @@ def test_lazy_loading_exponential_rampup(ray_start_regular_shared): def test_dataset_repr(ray_start_regular_shared): - ds = ray.data.range(10, parallelism=10) + ds = ray.data.range(10, override_num_blocks=10) assert repr(ds) == "Dataset(num_rows=10, schema={id: int64})" ds = ds.map_batches(lambda x: x) assert repr(ds) == ( @@ -492,7 +493,7 @@ def test_dataset_repr(ray_start_regular_shared): def my_dummy_fn(x): return x - ds = ray.data.range(10, parallelism=10) + ds = ray.data.range(10, override_num_blocks=10) ds = ds.map_batches(my_dummy_fn) assert repr(ds) == ( "MapBatches(my_dummy_fn)\n" "+- Dataset(num_rows=10, schema={id: int64})" @@ -501,7 +502,7 @@ def my_dummy_fn(x): @pytest.mark.parametrize("lazy", [False, True]) def test_limit(ray_start_regular_shared, lazy): - ds = ray.data.range(100, parallelism=20) + ds = ray.data.range(100, override_num_blocks=20) if not lazy: ds = ds.materialize() for i in range(100): @@ -561,11 +562,11 @@ def range_(i): source = CountingRangeDatasource() total_rows = 1000 - parallelism = 100 + override_num_blocks = 100 ds = ray.data.read_datasource( source, - parallelism=parallelism, - n=total_rows // parallelism, + override_num_blocks=override_num_blocks, + n=total_rows // override_num_blocks, ) # Apply multiple limit ops. # Once the smallest limit is reached, the entire dataset should stop execution. @@ -579,8 +580,8 @@ def range_(i): # We may launch more tasks than this number, in order to to maximize throughput. # But the actual number of read tasks should be less than the parallelism. count = ray.get(source.counter.get.remote()) - min_read_tasks = limit // (total_rows // parallelism) - assert min_read_tasks <= count < parallelism + min_read_tasks = limit // (total_rows // override_num_blocks) + assert min_read_tasks <= count < override_num_blocks def test_limit_no_num_row_info(ray_start_regular_shared): @@ -601,7 +602,7 @@ def prepare_read(self, parallelism, n): ) ] - ds = ray.data.read_datasource(DumbOnesDatasource(), parallelism=10, n=10) + ds = ray.data.read_datasource(DumbOnesDatasource(), override_num_blocks=10, n=10) for i in range(1, 100): assert extract_values("id", ds.limit(i).take(100)) == [1] * i @@ -630,7 +631,7 @@ def test_from_items_parallelism(ray_start_regular_shared, parallelism): # Test that specifying parallelism yields the expected number of blocks. n = 20 records = [{"a": i} for i in range(n)] - ds = ray.data.from_items(records, parallelism=parallelism) + ds = ray.data.from_items(records, override_num_blocks=parallelism) out = ds.take_all() assert out == records assert ds._plan.initial_num_blocks() == parallelism @@ -642,21 +643,21 @@ def test_from_items_parallelism_truncated(ray_start_regular_shared): n = 10 parallelism = 20 records = [{"a": i} for i in range(n)] - ds = ray.data.from_items(records, parallelism=parallelism) + ds = ray.data.from_items(records, override_num_blocks=parallelism) out = ds.take_all() assert out == records assert ds._plan.initial_num_blocks() == n def test_take_batch(ray_start_regular_shared): - ds = ray.data.range(10, parallelism=2) + ds = ray.data.range(10, override_num_blocks=2) assert ds.take_batch(3)["id"].tolist() == [0, 1, 2] assert ds.take_batch(6)["id"].tolist() == [0, 1, 2, 3, 4, 5] assert ds.take_batch(100)["id"].tolist() == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] assert isinstance(ds.take_batch(3, batch_format="pandas"), pd.DataFrame) assert isinstance(ds.take_batch(3, batch_format="numpy"), dict) - ds = ray.data.range_tensor(10, parallelism=2) + ds = ray.data.range_tensor(10, override_num_blocks=2) assert np.all(ds.take_batch(3)["data"] == np.array([[0], [1], [2]])) assert isinstance(ds.take_batch(3, batch_format="pandas"), pd.DataFrame) assert isinstance(ds.take_batch(3, batch_format="numpy"), dict) @@ -883,9 +884,9 @@ def test_iter_batches_local_shuffle(shutdown_only, ds_format): def range(n, parallelism=200): if ds_format == "arrow": - ds = ray.data.range(n, parallelism=parallelism) + ds = ray.data.range(n, override_num_blocks=parallelism) elif ds_format == "pandas": - ds = ray.data.range(n, parallelism=parallelism).map_batches( + ds = ray.data.range(n, override_num_blocks=parallelism).map_batches( lambda df: df, batch_size=None, batch_format="pandas" ) return ds @@ -1158,7 +1159,7 @@ def test_iter_batches_grid(ray_start_regular_shared): def test_lazy_loading_iter_batches_exponential_rampup(ray_start_regular_shared): - ds = ray.data.range(32, parallelism=8) + ds = ray.data.range(32, override_num_blocks=8) expected_num_blocks = [1, 2, 4, 4, 8, 8, 8, 8] for _, expected in zip(ds.iter_batches(batch_size=None), expected_num_blocks): # In streaming execution of ds.iter_batches(), there is no partial @@ -1167,7 +1168,7 @@ def test_lazy_loading_iter_batches_exponential_rampup(ray_start_regular_shared): def test_union(ray_start_regular_shared): - ds = ray.data.range(20, parallelism=10) + ds = ray.data.range(20, override_num_blocks=10) # Test lazy union. ds = ds.union(ds, ds, ds, ds) @@ -1272,7 +1273,7 @@ def _to_pandas(ds): # Test empty dataset # Note: we explicitly set parallelism here to ensure there are no empty # input blocks. - ds = ray.data.range(10, parallelism=10) + ds = ray.data.range(10, override_num_blocks=10) if ds_format == "pandas": ds = _to_pandas(ds) assert ds.filter(lambda r: r["id"] > 10).min("id") is None @@ -1315,7 +1316,7 @@ def _to_pandas(ds): # Test empty dataset # Note: we explicitly set parallelism here to ensure there are no empty # input blocks. - ds = ray.data.range(10, parallelism=10) + ds = ray.data.range(10, override_num_blocks=10) if ds_format == "pandas": ds = _to_pandas(ds) assert ds.filter(lambda r: r["id"] > 10).max("id") is None @@ -1358,7 +1359,7 @@ def _to_pandas(ds): # Test empty dataset # Note: we explicitly set parallelism here to ensure there are no empty # input blocks. - ds = ray.data.range(10, parallelism=10) + ds = ray.data.range(10, override_num_blocks=10) if ds_format == "pandas": ds = _to_pandas(ds) assert ds.filter(lambda r: r["id"] > 10).mean("id") is None @@ -1533,7 +1534,7 @@ def test_read_write_local_node_ray_client(ray_start_cluster_enabled): def test_read_warning_large_parallelism(ray_start_regular, propagate_logs, caplog): with caplog.at_level(logging.WARNING, logger="ray.data.read_api"): - ray.data.range(5000, parallelism=5000).materialize() + ray.data.range(5000, override_num_blocks=5000).materialize() assert ( "The requested number of read blocks of 5000 is " "more than 4x the number of available CPU slots in the cluster" in caplog.text @@ -1589,7 +1590,11 @@ def check_dataset_is_local(ds): check_dataset_is_local(ds) # With fusion. - ds = ray.data.read_parquet(local_path, parallelism=1).map(lambda x: x).materialize() + ds = ( + ray.data.read_parquet(local_path, override_num_blocks=1) + .map(lambda x: x) + .materialize() + ) check_dataset_is_local(ds) # Write back to local scheme. @@ -1684,7 +1689,7 @@ def f(should_import_polars): return polars_imported == should_import_polars # We should not use polars for non-Arrow sort. - _ = ray.data.range(num_items, parallelism=parallelism).sort() + _ = ray.data.range(num_items, override_num_blocks=parallelism).sort() assert all(ray.get([f.remote(False) for _ in range(parallelism)])) a = range(100) @@ -1745,7 +1750,7 @@ def test_dataset_schema_after_read_stats(ray_start_cluster): def test_dataset_plan_as_string(ray_start_cluster): - ds = ray.data.read_parquet("example://iris.parquet", parallelism=8) + ds = ray.data.read_parquet("example://iris.parquet", override_num_blocks=8) assert ds._plan.get_plan_as_string(type(ds)) == ( "Dataset(\n" " num_rows=150,\n" diff --git a/python/ray/data/tests/test_context_propagation.py b/python/ray/data/tests/test_context_propagation.py index 377b357eb254..e71d0179b2e3 100644 --- a/python/ray/data/tests/test_context_propagation.py +++ b/python/ray/data/tests/test_context_propagation.py @@ -126,7 +126,9 @@ def f(x): return x num_splits = 2 - splits = ray.data.range(10, parallelism=10).map(f).streaming_split(num_splits) + splits = ( + ray.data.range(10, override_num_blocks=10).map(f).streaming_split(num_splits) + ) @ray.remote(num_cpus=0) def consume(split): @@ -156,7 +158,7 @@ def test_context_placement_group(): ) ray.get(placement_group.ready()) context.scheduling_strategy = PlacementGroupSchedulingStrategy(placement_group) -ds = ray.data.range(100, parallelism=2).map(lambda x: {"id": x["id"] + 1}) +ds = ray.data.range(100, override_num_blocks=2).map(lambda x: {"id": x["id"] + 1}) assert ds.take_all() == [{"id": x} for x in range(1, 101)] placement_group_assert_no_leak([placement_group]) ray.shutdown() diff --git a/python/ray/data/tests/test_csv.py b/python/ray/data/tests/test_csv.py index 10002180b560..5d207f8d4b18 100644 --- a/python/ray/data/tests/test_csv.py +++ b/python/ray/data/tests/test_csv.py @@ -83,12 +83,12 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url): assert ds.input_files() == [_unwrap_protocol(path1)] assert "{one: int64, two: string}" in str(ds), ds - # Two files, parallelism=2. + # Two files, override_num_blocks=2. df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) path2 = os.path.join(data_path, "test2.csv") df2.to_csv(path2, index=False, storage_options=storage_options) ds = ray.data.read_csv( - [path1, path2], parallelism=2, filesystem=fs, partitioning=None + [path1, path2], override_num_blocks=2, filesystem=fs, partitioning=None ) dsdf = ds.to_pandas() df = pd.concat([df1, df2], ignore_index=True) @@ -97,12 +97,12 @@ def test_csv_read(ray_start_regular_shared, fs, data_path, endpoint_url): for block, meta in ds._plan.execute().get_blocks_with_metadata(): BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes - # Three files, parallelism=2. + # Three files, override_num_blocks=2. df3 = pd.DataFrame({"one": [7, 8, 9], "two": ["h", "i", "j"]}) path3 = os.path.join(data_path, "test3.csv") df3.to_csv(path3, index=False, storage_options=storage_options) ds = ray.data.read_csv( - [path1, path2, path3], parallelism=2, filesystem=fs, partitioning=None + [path1, path2, path3], override_num_blocks=2, filesystem=fs, partitioning=None ) df = pd.concat([df1, df2, df3], ignore_index=True) dsdf = ds.to_pandas() @@ -367,7 +367,7 @@ def test_csv_read_many_files_partitioned( paths, filesystem=fs, partitioning=partition_path_encoder.scheme, - parallelism=num_files, + override_num_blocks=num_files, ) assert_base_partitioned_ds( @@ -659,7 +659,7 @@ def keep_expected_partitions(kv_dict): data_path, partition_filter=partition_path_filter, filesystem=fs, - parallelism=6, + override_num_blocks=6, ) assert_base_partitioned_ds(ds, num_input_files=6, num_computed=6) assert ray.get(kept_file_counter.get.remote()) == 6 @@ -738,7 +738,7 @@ def test_csv_roundtrip(ray_start_regular_shared, fs, data_path): ds = ray.data.from_pandas([df, df2]) ds._set_uuid("data") ds.write_csv(data_path, filesystem=fs) - ds2 = ray.data.read_csv(data_path, parallelism=2, filesystem=fs) + ds2 = ray.data.read_csv(data_path, override_num_blocks=2, filesystem=fs) ds2df = ds2.to_pandas() assert pd.concat([df, df2], ignore_index=True).equals(ds2df) # Test metadata ops. @@ -863,7 +863,7 @@ def test_csv_invalid_file_handler(shutdown_only, tmp_path): @pytest.mark.parametrize("num_rows_per_file", [5, 10, 50]) def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file): - ray.data.range(100, parallelism=20).write_csv( + ray.data.range(100, override_num_blocks=20).write_csv( tmp_path, num_rows_per_file=num_rows_per_file ) diff --git a/python/ray/data/tests/test_datasink.py b/python/ray/data/tests/test_datasink.py index 100abc870e59..772078490601 100644 --- a/python/ray/data/tests/test_datasink.py +++ b/python/ray/data/tests/test_datasink.py @@ -21,7 +21,9 @@ def write(self, blocks: Iterable[Block], ctx: TaskContext) -> Any: def num_rows_per_write(self): return self._num_rows_per_write - ray.data.range(100, parallelism=20).write_datasink(MockDatasink(num_rows_per_write)) + ray.data.range(100, override_num_blocks=20).write_datasink( + MockDatasink(num_rows_per_write) + ) if __name__ == "__main__": diff --git a/python/ray/data/tests/test_dynamic_block_split.py b/python/ray/data/tests/test_dynamic_block_split.py index a14d67566816..47346cb8ee89 100644 --- a/python/ray/data/tests/test_dynamic_block_split.py +++ b/python/ray/data/tests/test_dynamic_block_split.py @@ -123,11 +123,13 @@ def test_bulk_lazy_eval_split_mode(shutdown_only, block_split, tmp_path): ray.init(num_cpus=8) ctx = ray.data.context.DataContext.get_current() - ray.data.range(8, parallelism=8).write_csv(str(tmp_path)) + ray.data.range(8, override_num_blocks=8).write_csv(str(tmp_path)) if not block_split: # Setting a huge block size effectively disables block splitting. ctx.target_max_block_size = 2**64 - ds = ray.data.read_datasource(SlowCSVDatasource(str(tmp_path)), parallelism=8) + ds = ray.data.read_datasource( + SlowCSVDatasource(str(tmp_path)), override_num_blocks=8 + ) start = time.time() ds.map(lambda x: x) @@ -194,7 +196,7 @@ def warmup(): last_snapshot = get_initial_core_execution_metrics_snapshot() ds = ray.data.read_datasource( RandomBytesDatasource(), - parallelism=num_tasks, + override_num_blocks=num_tasks, num_batches_per_task=num_blocks_per_task, row_size=ctx.target_max_block_size, ) @@ -293,7 +295,7 @@ def test_filter(ray_start_regular_shared, target_max_block_size): ds = ray.data.read_datasource( RandomBytesDatasource(), - parallelism=1, + override_num_blocks=1, num_batches_per_task=num_blocks_per_task, row_size=block_size, ) @@ -318,7 +320,7 @@ def test_lazy_block_list(shutdown_only, target_max_block_size): ds = ray.data.read_datasource( RandomBytesDatasource(), - parallelism=num_tasks, + override_num_blocks=num_tasks, num_batches_per_task=num_blocks_per_task, row_size=block_size, ) @@ -424,7 +426,7 @@ def foo(batch): ds = ray.data.read_datasource( RandomBytesDatasource(), - parallelism=1, + override_num_blocks=1, num_batches_per_task=num_blocks_per_task, row_size=block_size, ) @@ -442,7 +444,7 @@ def _test_write_large_data( ds = ray.data.read_datasource( RandomBytesDatasource(), - parallelism=1, + override_num_blocks=1, num_batches_per_task=num_blocks_per_task, row_size=block_size, use_bytes=use_bytes, @@ -588,7 +590,7 @@ def test_block_slicing( ds = ray.data.read_datasource( RandomBytesDatasource(), - parallelism=num_tasks, + override_num_blocks=num_tasks, num_batches_per_task=num_batches, num_rows_per_batch=num_rows_per_batch, row_size=row_size, @@ -625,7 +627,7 @@ def test_dynamic_block_split_deterministic( ctx.target_max_block_size = target_max_block_size # ~800 bytes per block - ds = ray.data.range(1000, parallelism=10).map_batches(lambda x: x) + ds = ray.data.range(1000, override_num_blocks=10).map_batches(lambda x: x) data = [ray.get(block) for block in ds.materialize()._plan._in_blocks._blocks] # Maps: first item of block -> block block_map = {block["id"][0]: block for block in data} diff --git a/python/ray/data/tests/test_execution_optimizer.py b/python/ray/data/tests/test_execution_optimizer.py index 45b8af4d2bf9..a7a8d3c3c23d 100644 --- a/python/ray/data/tests/test_execution_optimizer.py +++ b/python/ray/data/tests/test_execution_optimizer.py @@ -341,13 +341,13 @@ def ensure_sample_size_close(dataset, sample_percent=0.5): r1.count(), int(ds.count() * sample_percent), rel_tol=2, abs_tol=2 ) - ds = ray.data.range(10, parallelism=2) + ds = ray.data.range(10, override_num_blocks=2) ensure_sample_size_close(ds) - ds = ray.data.range(10, parallelism=2) + ds = ray.data.range(10, override_num_blocks=2) ensure_sample_size_close(ds) - ds = ray.data.range_tensor(5, parallelism=2, shape=(2, 2)) + ds = ray.data.range_tensor(5, override_num_blocks=2, shape=(2, 2)) ensure_sample_size_close(ds) _check_usage_record(["ReadRange", "MapBatches"]) @@ -374,7 +374,7 @@ def test_random_shuffle_operator(ray_start_regular_shared): def test_random_shuffle_e2e(ray_start_regular_shared, use_push_based_shuffle): - ds = ray.data.range(12, parallelism=4) + ds = ray.data.range(12, override_num_blocks=4) r1 = extract_values("id", ds.random_shuffle(seed=0).take_all()) r2 = extract_values("id", ds.random_shuffle(seed=1024).take_all()) assert r1 != r2, (r1, r2) @@ -426,14 +426,14 @@ def _check_repartition_usage_and_stats(ds): assert "RepartitionSplit" in ds_stats.metadata assert "RepartitionReduce" in ds_stats.metadata - ds = ray.data.range(10000, parallelism=10).repartition(20, shuffle=shuffle) + ds = ray.data.range(10000, override_num_blocks=10).repartition(20, shuffle=shuffle) assert ds._plan.initial_num_blocks() == 20, ds._plan.initial_num_blocks() assert ds.sum() == sum(range(10000)) assert ds._block_num_rows() == [500] * 20, ds._block_num_rows() _check_repartition_usage_and_stats(ds) # Test num_output_blocks > num_rows to trigger empty block handling. - ds = ray.data.range(20, parallelism=10).repartition(40, shuffle=shuffle) + ds = ray.data.range(20, override_num_blocks=10).repartition(40, shuffle=shuffle) assert ds._plan.initial_num_blocks() == 40, ds._plan.initial_num_blocks() assert ds.sum() == sum(range(20)) if shuffle: @@ -453,7 +453,7 @@ def _check_repartition_usage_and_stats(ds): _check_repartition_usage_and_stats(ds) # Test case where we do not split on repartitioning. - ds = ray.data.range(10, parallelism=1).repartition(1, shuffle=shuffle) + ds = ray.data.range(10, override_num_blocks=1).repartition(1, shuffle=shuffle) assert ds._plan.initial_num_blocks() == 1, ds._plan.initial_num_blocks() assert ds.sum() == sum(range(10)) assert ds._block_num_rows() == [10], ds._block_num_rows() @@ -492,7 +492,7 @@ def test_union_e2e(ray_start_regular_shared, preserve_order): ctx = ray.data.DataContext.get_current() ctx.execution_options = execution_options - ds = ray.data.range(20, parallelism=10) + ds = ray.data.range(20, override_num_blocks=10) # Test lazy union. ds = ds.union(ds, ds, ds, ds) @@ -935,7 +935,7 @@ def fn(batch): def test_read_map_chain_operator_fusion_e2e( ray_start_regular_shared, ): - ds = ray.data.range(10, parallelism=2) + ds = ray.data.range(10, override_num_blocks=2) ds = ds.filter(lambda x: x["id"] % 2 == 0) ds = ds.map(column_udf("id", lambda x: x + 1)) ds = ds.map_batches( @@ -963,7 +963,7 @@ def test_read_map_chain_operator_fusion_e2e( def test_write_fusion(ray_start_regular_shared, tmp_path): - ds = ray.data.range(10, parallelism=2) + ds = ray.data.range(10, override_num_blocks=2) ds.write_csv(tmp_path) assert "ReadRange->Write" in ds._write_ds.stats() _check_usage_record(["ReadRange", "WriteCSV"]) @@ -1012,7 +1012,7 @@ def test_sort_operator( def test_sort_e2e(ray_start_regular_shared, use_push_based_shuffle, tmp_path): - ds = ray.data.range(100, parallelism=4) + ds = ray.data.range(100, override_num_blocks=4) ds = ds.random_shuffle() ds = ds.sort("id") assert extract_values("id", ds.take_all()) == list(range(100)) @@ -1086,7 +1086,7 @@ def test_aggregate_operator(ray_start_regular_shared): def test_aggregate_e2e(ray_start_regular_shared, use_push_based_shuffle): - ds = ray.data.range(100, parallelism=4) + ds = ray.data.range(100, override_num_blocks=4) ds = ds.groupby("id").count() assert ds.count() == 100 for idx, row in enumerate(ds.sort("id").iter_rows()): @@ -1157,8 +1157,8 @@ def test_zip_operator(ray_start_regular_shared): ) def test_zip_e2e(ray_start_regular_shared, num_blocks1, num_blocks2): n = 12 - ds1 = ray.data.range(n, parallelism=num_blocks1) - ds2 = ray.data.range(n, parallelism=num_blocks2).map( + ds1 = ray.data.range(n, override_num_blocks=num_blocks1) + ds2 = ray.data.range(n, override_num_blocks=num_blocks2).map( column_udf("id", lambda x: x + 1) ) ds = ds1.zip(ds2) @@ -1405,7 +1405,7 @@ def f2(x): return x # Test basic limit pushdown past Map. - ds = ray.data.range(100, parallelism=100).map(f1).limit(1) + ds = ray.data.range(100, override_num_blocks=100).map(f1).limit(1) _check_valid_plan_and_result( ds, "Read[ReadRange] -> Limit[limit=1] -> MapRows[Map(f1)]", [{"id": 0}] ) @@ -1456,7 +1456,7 @@ def f2(x): [{"id": i} for i in range(5)], ) # Test limit pushdown between two Map operators. - ds5 = ray.data.range(100, parallelism=100).map(f1).limit(1).map(f2) + ds5 = ray.data.range(100, override_num_blocks=100).map(f1).limit(1).map(f2) # Limit operators get pushed down in the logical plan optimization, # then fused together. _check_valid_plan_and_result( @@ -1508,7 +1508,7 @@ def test_execute_to_legacy_block_iterator( def test_streaming_executor( ray_start_regular_shared, ): - ds = ray.data.range(100, parallelism=4) + ds = ray.data.range(100, override_num_blocks=4) ds = ds.map_batches(lambda x: x) ds = ds.filter(lambda x: x["id"] > 0) ds = ds.random_shuffle() @@ -1536,7 +1536,7 @@ def test_schema_partial_execution( ds = ray.data.read_parquet( "example://iris.parquet", schema=pa.schema(fields), - parallelism=2, + override_num_blocks=2, ).map_batches(lambda x: x) iris_schema = ds.schema() diff --git a/python/ray/data/tests/test_file_datasink.py b/python/ray/data/tests/test_file_datasink.py index 1b96a05cedd9..8b1be4f5e546 100644 --- a/python/ray/data/tests/test_file_datasink.py +++ b/python/ray/data/tests/test_file_datasink.py @@ -154,7 +154,7 @@ def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"): for _ in range(block.num_rows()): file.write(b"row\n") - ds = ray.data.range(100, parallelism=20) + ds = ray.data.range(100, override_num_blocks=20) ds.write_datasink( MockFileDatasink(path=tmp_path, num_rows_per_file=num_rows_per_file) diff --git a/python/ray/data/tests/test_formats.py b/python/ray/data/tests/test_formats.py index 401df1484455..8039c173e4b8 100644 --- a/python/ray/data/tests/test_formats.py +++ b/python/ray/data/tests/test_formats.py @@ -76,7 +76,7 @@ def test_to_arrow_refs(ray_start_regular_shared): def test_get_internal_block_refs(ray_start_regular_shared): - blocks = ray.data.range(10, parallelism=10).get_internal_block_refs() + blocks = ray.data.range(10, override_num_blocks=10).get_internal_block_refs() assert len(blocks) == 10 out = [] for b in ray.get(blocks): @@ -146,14 +146,14 @@ def test_read_example_data(ray_start_regular_shared, tmp_path): def test_write_datasink(ray_start_regular_shared): output = DummyOutputDatasink() - ds = ray.data.range(10, parallelism=2) + ds = ray.data.range(10, override_num_blocks=2) ds.write_datasink(output) assert output.num_ok == 1 assert output.num_failed == 0 assert ray.get(output.data_sink.get_rows_written.remote()) == 10 output.enabled = False - ds = ray.data.range(10, parallelism=2) + ds = ray.data.range(10, override_num_blocks=2) with pytest.raises(ValueError): ds.write_datasink(output, ray_remote_args={"max_retries": 0}) assert output.num_ok == 1 @@ -274,7 +274,7 @@ def get_node_id(): bar_node_id = ray.get(get_node_id.options(resources={"bar": 1}).remote()) output = NodeLoggerOutputDatasink() - ds = ray.data.range(100, parallelism=10) + ds = ray.data.range(100, override_num_blocks=10) # Pin write tasks to node with "bar" resource. ds.write_datasink(output, ray_remote_args={"resources": {"bar": 1}}) assert output.num_ok == 1 diff --git a/python/ray/data/tests/test_huggingface.py b/python/ray/data/tests/test_huggingface.py index a79bee24f211..2ea5b6fc2263 100644 --- a/python/ray/data/tests/test_huggingface.py +++ b/python/ray/data/tests/test_huggingface.py @@ -46,7 +46,9 @@ def test_from_huggingface(hf_dataset, ray_start_regular_shared, num_par): ray.data.from_huggingface(hf_dataset) ray_datasets = { - "train": ray.data.from_huggingface(hf_dataset["train"], parallelism=num_par), + "train": ray.data.from_huggingface( + hf_dataset["train"], override_num_blocks=num_par + ), } assert isinstance(ray_datasets["train"], ray.data.Dataset) diff --git a/python/ray/data/tests/test_image.py b/python/ray/data/tests/test_image.py index 889bd06f31f4..9b7de043b7ee 100644 --- a/python/ray/data/tests/test_image.py +++ b/python/ray/data/tests/test_image.py @@ -39,7 +39,7 @@ def test_multi_threading(self, ray_start_regular_shared, num_threads, monkeypatc ) ds = ray.data.read_images( "example://image-datasets/simple", - parallelism=1, + override_num_blocks=1, include_paths=True, ) paths = [item["path"][-len("image1.jpg") :] for item in ds.take_all()] @@ -227,7 +227,7 @@ def test_data_size_estimate( ): root = "example://image-datasets/different-sizes" ds = ray.data.read_images( - root, size=(image_size, image_size), mode=image_mode, parallelism=1 + root, size=(image_size, image_size), mode=image_mode, override_num_blocks=1 ) data_size = ds.size_bytes() @@ -258,7 +258,7 @@ def test_dynamic_block_split(ray_start_regular_shared): ctx.target_max_block_size = 1 try: root = "example://image-datasets/simple" - ds = ray.data.read_images(root, parallelism=1) + ds = ray.data.read_images(root, override_num_blocks=1) assert ds._plan.initial_num_blocks() == 1 ds = ds.materialize() # Verify dynamic block splitting taking effect to generate more blocks. diff --git a/python/ray/data/tests/test_json.py b/python/ray/data/tests/test_json.py index fb8b6778cb20..1a90069fc7bc 100644 --- a/python/ray/data/tests/test_json.py +++ b/python/ray/data/tests/test_json.py @@ -65,11 +65,11 @@ def test_json_read(ray_start_regular_shared, fs, data_path, endpoint_url): assert ds.input_files() == [_unwrap_protocol(path1)] assert "{one: int64, two: string}" in str(ds), ds - # Two files, parallelism=2. + # Two files, override_num_blocks=2. df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) path2 = os.path.join(data_path, "test2.json") df2.to_json(path2, orient="records", lines=True, storage_options=storage_options) - ds = ray.data.read_json([path1, path2], parallelism=2, filesystem=fs) + ds = ray.data.read_json([path1, path2], override_num_blocks=2, filesystem=fs) dsdf = ds.to_pandas() df = pd.concat([df1, df2], ignore_index=True) assert df.equals(dsdf) @@ -77,11 +77,11 @@ def test_json_read(ray_start_regular_shared, fs, data_path, endpoint_url): for block, meta in ds._plan.execute().get_blocks_with_metadata(): BlockAccessor.for_block(ray.get(block)).size_bytes() == meta.size_bytes - # Three files, parallelism=2. + # Three files, override_num_blocks=2. df3 = pd.DataFrame({"one": [7, 8, 9], "two": ["h", "i", "j"]}) path3 = os.path.join(data_path, "test3.json") df3.to_json(path3, orient="records", lines=True, storage_options=storage_options) - ds = ray.data.read_json([path1, path2, path3], parallelism=2, filesystem=fs) + ds = ray.data.read_json([path1, path2, path3], override_num_blocks=2, filesystem=fs) df = pd.concat([df1, df2, df3], ignore_index=True) dsdf = ds.to_pandas() assert df.equals(dsdf) @@ -229,11 +229,11 @@ def test_zipped_json_read(ray_start_regular_shared, tmp_path): assert ds.count() == 3 assert ds.input_files() == [path1] - # Two files, parallelism=2. + # Two files, override_num_blocks=2. df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) path2 = os.path.join(tmp_path, "test2.json.gz") df2.to_json(path2, compression="gzip", orient="records", lines=True) - ds = ray.data.read_json([path1, path2], parallelism=2) + ds = ray.data.read_json([path1, path2], override_num_blocks=2) dsdf = ds.to_pandas() assert pd.concat([df1, df2], ignore_index=True).equals(dsdf) # Test metadata ops. @@ -567,7 +567,7 @@ def test_json_roundtrip(ray_start_regular_shared, fs, data_path): new_file_path = old_file_path.replace(".json", ".jsonl") os.rename(old_file_path, new_file_path) else: - ds2 = ray.data.read_json(data_path, parallelism=2, filesystem=fs) + ds2 = ray.data.read_json(data_path, override_num_blocks=2, filesystem=fs) ds2df = ds2.to_pandas() assert pd.concat([df, df2], ignore_index=True).equals(ds2df) # Test metadata ops. @@ -643,7 +643,7 @@ def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoi @pytest.mark.parametrize("num_rows_per_file", [5, 10, 50]) def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file): - ray.data.range(100, parallelism=20).write_json( + ray.data.range(100, override_num_blocks=20).write_json( tmp_path, num_rows_per_file=num_rows_per_file ) diff --git a/python/ray/data/tests/test_map.py b/python/ray/data/tests/test_map.py index 3c9db36b1ac1..2dde2e16b7d8 100644 --- a/python/ray/data/tests/test_map.py +++ b/python/ray/data/tests/test_map.py @@ -46,7 +46,7 @@ def test_basic_actors(shutdown_only): ) == list(range(1, n + 1)) # Test setting custom max inflight tasks. - ds = ray.data.range(10, parallelism=5) + ds = ray.data.range(10, override_num_blocks=5) assert sorted( extract_values( "id", @@ -74,7 +74,7 @@ def test_basic_actors(shutdown_only): def test_callable_classes(shutdown_only): ray.init(num_cpus=2) - ds = ray.data.range(10, parallelism=10) + ds = ray.data.range(10, override_num_blocks=10) class StatefulFn: def __init__(self): @@ -179,7 +179,7 @@ def __call__(self, x, arg, kwarg): def test_concurrent_callable_classes(shutdown_only): """Test that concurrenct actor pool runs user UDF in a separate thread.""" ray.init(num_cpus=2) - ds = ray.data.range(10, parallelism=10) + ds = ray.data.range(10, override_num_blocks=10) class StatefulFn: def __call__(self, x): @@ -204,7 +204,7 @@ def __call__(self, x): def test_transform_failure(shutdown_only): ray.init(num_cpus=2) - ds = ray.data.from_items([0, 10], parallelism=2) + ds = ray.data.from_items([0, 10], override_num_blocks=2) def mapper(x): time.sleep(x) @@ -221,7 +221,7 @@ def test_actor_task_failure(shutdown_only, restore_data_context): ctx = DataContext.get_current() ctx.actor_task_retry_on_errors = [ValueError] - ds = ray.data.from_items([0, 10], parallelism=2) + ds = ray.data.from_items([0, 10], override_num_blocks=2) class Mapper: def __init__(self): @@ -238,7 +238,7 @@ def __call__(self, x): def test_concurrency(shutdown_only): ray.init(num_cpus=6) - ds = ray.data.range(10, parallelism=10) + ds = ray.data.range(10, override_num_blocks=10) def udf(x): return x @@ -656,7 +656,7 @@ def map_batches(batch): ray.get(concurrency_counter.decr.remote()) return batch - ds = ray.data.range(num_blocks, parallelism=num_blocks) + ds = ray.data.range(num_blocks, override_num_blocks=num_blocks) ds = ds.map_batches( map_batches, batch_size=None, @@ -707,7 +707,7 @@ def __call__(self, x): ray.shutdown() ray.init(num_cpus=2) # Test that actor compute model preserves block order. - ds = ray.data.range(10, parallelism=5) + ds = ray.data.range(10, override_num_blocks=5) assert extract_values("id", ds.map_batches(UDFClass, concurrency=1).take()) == list( range(10) ) @@ -733,7 +733,9 @@ def mutate(df): df["id"] += 1 return df - ds = ray.data.range(num_rows, parallelism=num_blocks).repartition(num_blocks) + ds = ray.data.range(num_rows, override_num_blocks=num_blocks).repartition( + num_blocks + ) # Convert to Pandas blocks. ds = ds.map_batches(lambda df: df, batch_format="pandas", batch_size=None) @@ -760,7 +762,9 @@ def mutate(df): df["id"] += 1 return df - ds = ray.data.range(num_rows, parallelism=num_blocks).repartition(num_blocks) + ds = ray.data.range(num_rows, override_num_blocks=num_blocks).repartition( + num_blocks + ) # Convert to Pandas blocks. ds = ds.map_batches(lambda df: df, batch_format="pandas", batch_size=None) ds = ds.materialize() @@ -787,7 +791,7 @@ def test_map_batches_block_bundling_auto( ): # Ensure that we test at least 2 batches worth of blocks. num_blocks = max(10, 2 * batch_size // block_size) - ds = ray.data.range(num_blocks * block_size, parallelism=num_blocks) + ds = ray.data.range(num_blocks * block_size, override_num_blocks=num_blocks) # Confirm that we have the expected number of initial blocks. assert ds._plan.initial_num_blocks() == num_blocks @@ -878,7 +882,7 @@ def good_fn(row): else: return {"b": "hello2", "a": "hello1"} - ds = ray.data.range(10, parallelism=1) + ds = ray.data.range(10, override_num_blocks=1) error_message = "Current row has different columns compared to previous rows." with pytest.raises(ValueError) as e: ds.map(bad_fn).materialize() @@ -888,7 +892,7 @@ def good_fn(row): def test_map_batches_preserve_empty_blocks(ray_start_regular_shared): - ds = ray.data.range(10, parallelism=10) + ds = ray.data.range(10, override_num_blocks=10) ds = ds.map_batches(lambda x: []) ds = ds.map_batches(lambda x: x) assert ds._plan.initial_num_blocks() == 10, ds @@ -946,24 +950,24 @@ def ensure_sample_size_close(dataset, sample_percent=0.5): r1.count(), int(ds.count() * sample_percent), rel_tol=2, abs_tol=2 ) - ds = ray.data.range(10, parallelism=2) + ds = ray.data.range(10, override_num_blocks=2) ensure_sample_size_close(ds) - ds = ray.data.range(10, parallelism=2) + ds = ray.data.range(10, override_num_blocks=2) ensure_sample_size_close(ds) - ds = ray.data.range_tensor(5, parallelism=2, shape=(2, 2)) + ds = ray.data.range_tensor(5, override_num_blocks=2, shape=(2, 2)) ensure_sample_size_close(ds) # imbalanced datasets - ds1 = ray.data.range(1, parallelism=1) - ds2 = ray.data.range(2, parallelism=1) - ds3 = ray.data.range(3, parallelism=1) + ds1 = ray.data.range(1, override_num_blocks=1) + ds2 = ray.data.range(2, override_num_blocks=1) + ds3 = ray.data.range(3, override_num_blocks=1) # noinspection PyTypeChecker ds = ds1.union(ds2).union(ds3) ensure_sample_size_close(ds) # Small datasets - ds1 = ray.data.range(5, parallelism=5) + ds1 = ray.data.range(5, override_num_blocks=5) ensure_sample_size_close(ds1) @@ -993,7 +997,7 @@ def __call__(self, x): ray.shutdown() ray.init(num_cpus=num_cpus) compute_strategy = ray.data.ActorPoolStrategy() - ray.data.range(10, parallelism=10).map_batches( + ray.data.range(10, override_num_blocks=10).map_batches( UDFClass, compute=compute_strategy, batch_size=1 ).materialize() @@ -1007,14 +1011,14 @@ def __call__(self, x): max_size = 2 ds = ( - ray.data.range(10, parallelism=10) + ray.data.range(10, override_num_blocks=10) .map_batches(UDFClass, batch_size=None, concurrency=max_size) .materialize() ) # Check batch size is still respected. ds = ( - ray.data.range(10, parallelism=10) + ray.data.range(10, override_num_blocks=10) .map_batches(UDFClass, batch_size=10, concurrency=max_size) .materialize() ) diff --git a/python/ray/data/tests/test_mongo.py b/python/ray/data/tests/test_mongo.py index 8c845df76621..97828aae6bea 100644 --- a/python/ray/data/tests/test_mongo.py +++ b/python/ray/data/tests/test_mongo.py @@ -76,7 +76,7 @@ def test_read_write_mongo(ray_start_regular_shared, start_mongo): database=foo_db, collection=foo_collection, schema=schema, - parallelism=2, + override_num_blocks=2, ) assert ds._block_num_rows() == [3, 2] assert str(ds) == ( @@ -90,7 +90,7 @@ def test_read_write_mongo(ray_start_regular_shared, start_mongo): uri=mongo_url, database=foo_db, collection=foo_collection, - parallelism=2, + override_num_blocks=2, ) assert ds._block_num_rows() == [3, 2] assert str(ds) == ( @@ -108,7 +108,7 @@ def test_read_write_mongo(ray_start_regular_shared, start_mongo): database=foo_db, collection=foo_collection, pipeline=[{"$match": {"int_field": {"$gte": 0, "$lt": 3}}}], - parallelism=2, + override_num_blocks=2, ) assert ds._block_num_rows() == [2, 1] assert str(ds) == ( @@ -140,7 +140,7 @@ def test_read_write_mongo(ray_start_regular_shared, start_mongo): uri=mongo_url, database=foo_db, collection=foo_collection, - parallelism=1000, + override_num_blocks=1000, ) assert str(ds) == ( "Dataset(\n" @@ -200,7 +200,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): database=foo_db, collection=foo_collection, schema=schema, - parallelism=2, + override_num_blocks=2, ).materialize() assert ds._block_num_rows() == [3, 2] assert str(ds) == ( @@ -218,7 +218,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): uri=mongo_url, database=foo_db, collection=foo_collection, - parallelism=2, + override_num_blocks=2, ).materialize() assert ds._block_num_rows() == [3, 2] assert str(ds) == ( @@ -252,7 +252,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): uri=mongo_url, database=foo_db, collection=foo_collection, - parallelism=1000, + override_num_blocks=1000, ) assert str(ds) == ( "Dataset(\n" @@ -269,7 +269,7 @@ def test_mongo_datasource(ray_start_regular_shared, start_mongo): database=foo_db, collection=foo_collection, pipeline=[{"$match": {"int_field": {"$gte": 0, "$lt": 3}}}], - parallelism=2, + override_num_blocks=2, ) assert ds._block_num_rows() == [2, 1] assert str(ds) == ( diff --git a/python/ray/data/tests/test_numpy.py b/python/ray/data/tests/test_numpy.py index 4105c015eb63..ce0cda3a91d3 100644 --- a/python/ray/data/tests/test_numpy.py +++ b/python/ray/data/tests/test_numpy.py @@ -74,7 +74,7 @@ def recursive_to_list(a): def test_to_numpy_refs(ray_start_regular_shared): # Tensor Dataset - ds = ray.data.range_tensor(10, parallelism=2) + ds = ray.data.range_tensor(10, override_num_blocks=2) arr = np.concatenate(extract_values("data", ray.get(ds.to_numpy_refs()))) np.testing.assert_equal(arr, np.expand_dims(np.arange(0, 10), 1)) @@ -111,7 +111,7 @@ def test_to_numpy_refs(ray_start_regular_shared): ], ) def test_numpy_roundtrip(ray_start_regular_shared, fs, data_path): - ds = ray.data.range_tensor(10, parallelism=2) + ds = ray.data.range_tensor(10, override_num_blocks=2) ds.write_numpy(data_path, filesystem=fs, column="data") ds = ray.data.read_numpy(data_path, filesystem=fs) assert str(ds) == ( @@ -126,7 +126,7 @@ def test_numpy_read(ray_start_regular_shared, tmp_path): path = os.path.join(tmp_path, "test_np_dir") os.mkdir(path) np.save(os.path.join(path, "test.npy"), np.expand_dims(np.arange(0, 10), 1)) - ds = ray.data.read_numpy(path, parallelism=1) + ds = ray.data.read_numpy(path, override_num_blocks=1) assert str(ds) == ( "Dataset(num_rows=10, schema={data: numpy.ndarray(shape=(1,), dtype=int64)})" ) @@ -138,7 +138,7 @@ def test_numpy_read(ray_start_regular_shared, tmp_path): with open(os.path.join(path, "foo.txt"), "w") as f: f.write("foobar") - ds = ray.data.read_numpy(path, parallelism=1) + ds = ray.data.read_numpy(path, override_num_blocks=1) assert ds._plan.initial_num_blocks() == 1 assert ds.count() == 10 assert str(ds) == ( @@ -175,7 +175,7 @@ def test_numpy_read_meta_provider(ray_start_regular_shared, tmp_path): path = os.path.join(path, "test.npy") np.save(path, np.expand_dims(np.arange(0, 10), 1)) ds = ray.data.read_numpy( - path, meta_provider=FastFileMetadataProvider(), parallelism=1 + path, meta_provider=FastFileMetadataProvider(), override_num_blocks=1 ) assert str(ds) == ( "Dataset(num_rows=10, schema={data: numpy.ndarray(shape=(1,), dtype=int64)})" @@ -257,7 +257,7 @@ def skip_unpartitioned(kv_dict): ], ) def test_numpy_write(ray_start_regular_shared, fs, data_path, endpoint_url): - ds = ray.data.range_tensor(10, parallelism=2) + ds = ray.data.range_tensor(10, override_num_blocks=2) ds._set_uuid("data") ds.write_numpy(data_path, filesystem=fs, column="data") file_path1 = os.path.join(data_path, "data_000000_000000.npy") @@ -281,7 +281,7 @@ def test_numpy_write(ray_start_regular_shared, fs, data_path, endpoint_url): @pytest.mark.parametrize("num_rows_per_file", [5, 10, 50]) def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file): - ray.data.range(100, parallelism=20).write_numpy( + ray.data.range(100, override_num_blocks=20).write_numpy( tmp_path, column="id", num_rows_per_file=num_rows_per_file ) diff --git a/python/ray/data/tests/test_numpy_support.py b/python/ray/data/tests/test_numpy_support.py index 9d48bba7f5b5..4a5b74d9bca2 100644 --- a/python/ray/data/tests/test_numpy_support.py +++ b/python/ray/data/tests/test_numpy_support.py @@ -96,7 +96,7 @@ def test_scalar_lists_not_converted(ray_start_regular_shared): def test_scalar_numpy(ray_start_regular_shared): data = np.int64(1) - ds = ray.data.range(2, parallelism=1) + ds = ray.data.range(2, override_num_blocks=1) ds = ds.map(lambda x: {"output": data}) output = ds.take_batch()["output"] assert_structure_equals(output, np.array([1, 1], dtype=np.int64)) @@ -104,7 +104,7 @@ def test_scalar_numpy(ray_start_regular_shared): def test_scalar_arrays(ray_start_regular_shared): data = np.array([1, 2, 3]) - ds = ray.data.range(2, parallelism=1) + ds = ray.data.range(2, override_num_blocks=1) ds = ds.map(lambda x: {"output": data}) output = ds.take_batch()["output"] assert_structure_equals(output, np.array([[1, 2, 3], [1, 2, 3]], dtype=np.int64)) @@ -113,7 +113,7 @@ def test_scalar_arrays(ray_start_regular_shared): def test_bytes(ray_start_regular_shared): """Tests that bytes are converted to object dtype instead of zero-terminated.""" data = b"\x1a\n\x00\n\x1a" - ds = ray.data.range(1, parallelism=1) + ds = ray.data.range(1, override_num_blocks=1) ds = ds.map(lambda x: {"output": data}) output = ds.take_batch()["output"] assert_structure_equals(output, np.array([b"\x1a\n\x00\n\x1a"], dtype=object)) @@ -121,7 +121,7 @@ def test_bytes(ray_start_regular_shared): def test_scalar_array_like(ray_start_regular_shared): data = torch.Tensor([1, 2, 3]) - ds = ray.data.range(2, parallelism=1) + ds = ray.data.range(2, override_num_blocks=1) ds = ds.map(lambda x: {"output": data}) output = ds.take_batch()["output"] assert_structure_equals(output, np.array([[1, 2, 3], [1, 2, 3]], dtype=np.float32)) @@ -129,7 +129,7 @@ def test_scalar_array_like(ray_start_regular_shared): def test_scalar_ragged_arrays(ray_start_regular_shared): data = [np.array([1, 2, 3]), np.array([1, 2])] - ds = ray.data.range(2, parallelism=1) + ds = ray.data.range(2, override_num_blocks=1) ds = ds.map(lambda x: {"output": data[x["id"]]}) output = ds.take_batch()["output"] assert_structure_equals( @@ -139,7 +139,7 @@ def test_scalar_ragged_arrays(ray_start_regular_shared): def test_scalar_ragged_array_like(ray_start_regular_shared): data = [torch.Tensor([1, 2, 3]), torch.Tensor([1, 2])] - ds = ray.data.range(2, parallelism=1) + ds = ray.data.range(2, override_num_blocks=1) ds = ds.map(lambda x: {"output": data[x["id"]]}) output = ds.take_batch()["output"] assert_structure_equals( @@ -147,7 +147,7 @@ def test_scalar_ragged_array_like(ray_start_regular_shared): ) data = [torch.zeros((3, 5, 10)), torch.zeros((3, 8, 8))] - ds = ray.data.range(2, parallelism=1) + ds = ray.data.range(2, override_num_blocks=1) ds = ds.map(lambda x: {"output": data[x["id"]]}) output = ds.take_batch()["output"] assert_structure_equals( diff --git a/python/ray/data/tests/test_object_gc.py b/python/ray/data/tests/test_object_gc.py index 5e2e87a4cfcd..d43953150f87 100644 --- a/python/ray/data/tests/test_object_gc.py +++ b/python/ray/data/tests/test_object_gc.py @@ -81,7 +81,7 @@ def test_iter_batches_no_spilling_upon_no_transformation(shutdown_only): # The object store is about 300MB. ctx = ray.init(num_cpus=1, object_store_memory=300e6) # The size of dataset is 500*(80*80*4)*8B, about 100MB. - ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) + ds = ray.data.range_tensor(500, shape=(80, 80, 4), override_num_blocks=100) check_no_spill(ctx, ds) @@ -89,7 +89,7 @@ def test_torch_iteration(shutdown_only): # The object store is about 400MB. ctx = ray.init(num_cpus=1, object_store_memory=400e6) # The size of dataset is 500*(80*80*4)*8B, about 100MB. - ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) + ds = ray.data.range_tensor(500, shape=(80, 80, 4), override_num_blocks=100) # to_torch check_to_torch_no_spill(ctx, ds) @@ -101,9 +101,9 @@ def test_tf_iteration(shutdown_only): # The object store is about 800MB. ctx = ray.init(num_cpus=1, object_store_memory=800e6) # The size of dataset is 500*(80*80*4)*8B, about 100MB. - ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100).add_column( - "label", lambda x: 1 - ) + ds = ray.data.range_tensor( + 500, shape=(80, 80, 4), override_num_blocks=100 + ).add_column("label", lambda x: 1) # to_tf check_to_tf_no_spill(ctx, ds.map(lambda x: x)) @@ -115,7 +115,7 @@ def test_iter_batches_no_spilling_upon_prior_transformation(shutdown_only): # The object store is about 500MB. ctx = ray.init(num_cpus=1, object_store_memory=500e6) # The size of dataset is 500*(80*80*4)*8B, about 100MB. - ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) + ds = ray.data.range_tensor(500, shape=(80, 80, 4), override_num_blocks=100) check_no_spill(ctx, ds.map_batches(lambda x: x)) @@ -124,7 +124,7 @@ def test_iter_batches_no_spilling_upon_post_transformation(shutdown_only): # The object store is about 500MB. ctx = ray.init(num_cpus=1, object_store_memory=500e6) # The size of dataset is 500*(80*80*4)*8B, about 100MB. - ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) + ds = ray.data.range_tensor(500, shape=(80, 80, 4), override_num_blocks=100) check_no_spill(ctx, ds.map_batches(lambda x: x, batch_size=5)) @@ -133,7 +133,7 @@ def test_iter_batches_no_spilling_upon_transformations(shutdown_only): # The object store is about 700MB. ctx = ray.init(num_cpus=1, object_store_memory=700e6) # The size of dataset is 500*(80*80*4)*8B, about 100MB. - ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) + ds = ray.data.range_tensor(500, shape=(80, 80, 4), override_num_blocks=100) check_no_spill( ctx, @@ -146,7 +146,7 @@ def test_global_bytes_spilled(shutdown_only): ctx = ray.init(object_store_memory=90e6) # The size of dataset is 500*(80*80*4)*8B, about 100MB. ds = ( - ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100) + ray.data.range_tensor(500, shape=(80, 80, 4), override_num_blocks=100) .materialize() .map_batches(lambda x: x) .materialize() @@ -165,7 +165,9 @@ def test_no_global_bytes_spilled(shutdown_only): # The object store is about 200MB. ctx = ray.init(object_store_memory=200e6) # The size of dataset is 500*(80*80*4)*8B, about 100MB. - ds = ray.data.range_tensor(500, shape=(80, 80, 4), parallelism=100).materialize() + ds = ray.data.range_tensor( + 500, shape=(80, 80, 4), override_num_blocks=100 + ).materialize() check_no_spill(ctx, ds) assert ds._get_stats_summary().global_bytes_spilled == 0 diff --git a/python/ray/data/tests/test_optimize.py b/python/ray/data/tests/test_optimize.py index 3a986035c219..1a657ded9a0b 100644 --- a/python/ray/data/tests/test_optimize.py +++ b/python/ray/data/tests/test_optimize.py @@ -149,7 +149,7 @@ def inc(row): # Test that fan-out of a lazy dataset results in re-execution up to the datasource, # due to block move semantics. - ds = ray.data.read_datasource(source, parallelism=1) + ds = ray.data.read_datasource(source, override_num_blocks=1) ds1 = ds.map(inc) ds2 = ds1.map(inc) ds3 = ds1.map(inc) @@ -282,7 +282,7 @@ def test_optimize_lazy_reuse_base_data( df.to_csv(path, index=False) counter = Counter.remote() source = MySource(paths, counter) - ds = ray.data.read_datasource(source, parallelism=4) + ds = ray.data.read_datasource(source, override_num_blocks=4) num_reads = ray.get(counter.get.remote()) assert num_reads == 1, num_reads ds = ds.map(column_udf("id", lambda x: x)) diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index faccc22392b3..8020896365ed 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -543,7 +543,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path # 2 partitions, 1 empty partition, 1 block/read task ds = ray.data.read_parquet( - str(tmp_path), parallelism=1, filter=(pa.dataset.field("two") == "a") + str(tmp_path), override_num_blocks=1, filter=(pa.dataset.field("two") == "a") ) values = [[s["one"], s["two"]] for s in ds.take()] @@ -554,7 +554,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path # 2 partitions, 1 empty partition, 2 block/read tasks, 1 empty block ds = ray.data.read_parquet( - str(tmp_path), parallelism=2, filter=(pa.dataset.field("two") == "a") + str(tmp_path), override_num_blocks=2, filter=(pa.dataset.field("two") == "a") ) values = [[s["one"], s["two"]] for s in ds.take()] @@ -725,7 +725,9 @@ def _block_udf(block: pa.Table): # 1 block/read task - ds = ray.data.read_parquet(str(tmp_path), parallelism=1, _block_udf=_block_udf) + ds = ray.data.read_parquet( + str(tmp_path), override_num_blocks=1, _block_udf=_block_udf + ) ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()]) check_num_computed(ds, 0) @@ -733,7 +735,9 @@ def _block_udf(block: pa.Table): # 2 blocks/read tasks - ds = ray.data.read_parquet(str(tmp_path), parallelism=2, _block_udf=_block_udf) + ds = ray.data.read_parquet( + str(tmp_path), override_num_blocks=2, _block_udf=_block_udf + ) ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()]) check_num_computed(ds, 0) @@ -743,7 +747,7 @@ def _block_udf(block: pa.Table): ds = ray.data.read_parquet( str(tmp_path), - parallelism=2, + override_num_blocks=2, filter=(pa.dataset.field("two") == "a"), _block_udf=_block_udf, ) @@ -776,7 +780,9 @@ def test_parquet_read_parallel_meta_fetch(ray_start_regular_shared, fs, data_pat pq.write_table(table, path, filesystem=fs) parallelism = 8 - ds = ray.data.read_parquet(data_path, filesystem=fs, parallelism=parallelism) + ds = ray.data.read_parquet( + data_path, filesystem=fs, override_num_blocks=parallelism + ) # Test metadata-only parquet ops. check_num_computed(ds, 0) @@ -1058,7 +1064,7 @@ def test_parquet_roundtrip(ray_start_regular_shared, fs, data_path): else: fs.create_dir(_unwrap_protocol(path)) ds.write_parquet(path, filesystem=fs) - ds2 = ray.data.read_parquet(path, parallelism=2, filesystem=fs) + ds2 = ray.data.read_parquet(path, override_num_blocks=2, filesystem=fs) ds2df = ds2.to_pandas() assert pd.concat([df1, df2], ignore_index=True).equals(ds2df) # Test metadata ops. @@ -1190,7 +1196,7 @@ def test_parquet_bulk_columns(ray_start_regular_shared): def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file): import pyarrow.parquet as pq - ray.data.range(100, parallelism=20).write_parquet( + ray.data.range(100, override_num_blocks=20).write_parquet( tmp_path, num_rows_per_file=num_rows_per_file ) diff --git a/python/ray/data/tests/test_random_access.py b/python/ray/data/tests/test_random_access.py index 8e1771f9bf40..ec396eae4837 100644 --- a/python/ray/data/tests/test_random_access.py +++ b/python/ray/data/tests/test_random_access.py @@ -7,7 +7,7 @@ @pytest.mark.parametrize("pandas", [False, True]) def test_basic(ray_start_regular_shared, pandas): - ds = ray.data.range(100, parallelism=10) + ds = ray.data.range(100, override_num_blocks=10) ds = ds.add_column("embedding", lambda b: b["id"] ** 2) if not pandas: ds = ds.map_batches( @@ -45,7 +45,7 @@ def test_errors(ray_start_regular_shared): def test_stats(ray_start_regular_shared): - ds = ray.data.range(100, parallelism=10) + ds = ray.data.range(100, override_num_blocks=10) rad = ds.to_random_access_dataset("id", num_workers=1) stats = rad.stats() assert "Accesses per worker: 0 min, 0 max, 0 mean" in stats, stats diff --git a/python/ray/data/tests/test_randomize_block_order.py b/python/ray/data/tests/test_randomize_block_order.py index 7be374fe7c5a..1ccf95eb2ffb 100644 --- a/python/ray/data/tests/test_randomize_block_order.py +++ b/python/ray/data/tests/test_randomize_block_order.py @@ -113,7 +113,7 @@ def test_randomize_block_order_after_repartition(): def test_randomize_blocks_e2e(ray_start_regular_shared): - ds = ray.data.range(12, parallelism=4) + ds = ray.data.range(12, override_num_blocks=4) ds = ds.randomize_block_order(seed=0) assert extract_values("id", ds.take_all()) == [ 6, diff --git a/python/ray/data/tests/test_size_estimation.py b/python/ray/data/tests/test_size_estimation.py index 06795caa8305..8e24d7ea8cda 100644 --- a/python/ray/data/tests/test_size_estimation.py +++ b/python/ray/data/tests/test_size_estimation.py @@ -82,10 +82,10 @@ def test_split_read_csv(ray_start_regular_shared, tmp_path): def gen(name): path = os.path.join(tmp_path, name) - ray.data.range(1000, parallelism=1).map( + ray.data.range(1000, override_num_blocks=1).map( lambda _: {"out": LARGE_VALUE} ).write_csv(path) - return ray.data.read_csv(path, parallelism=1) + return ray.data.read_csv(path, override_num_blocks=1) # 20MiB ctx.target_max_block_size = 20_000_000 @@ -121,16 +121,16 @@ def test_split_read_parquet(ray_start_regular_shared, tmp_path): def gen(name): path = os.path.join(tmp_path, name) ds = ( - ray.data.range(200000, parallelism=1) + ray.data.range(200000, override_num_blocks=1) .map(lambda _: {"out": uuid.uuid4().hex}) .materialize() ) # Fully execute the operations prior to write, because with - # parallelism=1, there is only one task; so the write operator + # override_num_blocks=1, there is only one task; so the write operator # will only write to one file, even though there are multiple # blocks created by block splitting. ds.write_parquet(path) - return ray.data.read_parquet(path, parallelism=1) + return ray.data.read_parquet(path, override_num_blocks=1) # 20MiB ctx.target_max_block_size = 20_000_000 @@ -186,7 +186,7 @@ def __call__(self, x): ctx = ray.data.context.DataContext.get_current() ctx.target_max_block_size = 20_000_000 ctx.target_max_block_size = 20_000_000 - ds2 = ray.data.range(1000, parallelism=1).map(arrow_fn, **kwargs) + ds2 = ray.data.range(1000, override_num_blocks=1).map(arrow_fn, **kwargs) nblocks = len(ds2.map(identity_fn, **kwargs).get_internal_block_refs()) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 @@ -196,7 +196,7 @@ def __call__(self, x): # Disabled. # Setting a huge block size effectively disables block splitting. ctx.target_max_block_size = 2**64 - ds3 = ray.data.range(1000, parallelism=1).map(arrow_fn, **kwargs) + ds3 = ray.data.range(1000, override_num_blocks=1).map(arrow_fn, **kwargs) nblocks = len(ds3.map(identity_fn, **kwargs).get_internal_block_refs()) assert nblocks == 1, nblocks @@ -206,7 +206,7 @@ def test_split_flat_map(ray_start_regular_shared): ctx.target_max_block_size = 20_000_000 # Arrow block ctx.target_max_block_size = 20_000_000 - ds2 = ray.data.range(1000, parallelism=1).map(lambda _: ARROW_LARGE_VALUE) + ds2 = ray.data.range(1000, override_num_blocks=1).map(lambda _: ARROW_LARGE_VALUE) nblocks = len(ds2.flat_map(lambda x: [x]).get_internal_block_refs()) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 @@ -219,7 +219,7 @@ def test_split_map_batches(ray_start_regular_shared): ctx.target_max_block_size = 20_000_000 # Arrow block ctx.target_max_block_size = 20_000_000 - ds2 = ray.data.range(1000, parallelism=1).map(lambda _: ARROW_LARGE_VALUE) + ds2 = ray.data.range(1000, override_num_blocks=1).map(lambda _: ARROW_LARGE_VALUE) nblocks = len(ds2.map_batches(lambda x: x, batch_size=1).get_internal_block_refs()) assert nblocks == 1, nblocks ctx.target_max_block_size = 2_000_000 diff --git a/python/ray/data/tests/test_sort.py b/python/ray/data/tests/test_sort.py index 564a31a38d99..a0d0eb1f5c67 100644 --- a/python/ray/data/tests/test_sort.py +++ b/python/ray/data/tests/test_sort.py @@ -56,7 +56,7 @@ def test_sort_simple(ray_start_regular, use_push_based_shuffle): parallelism = 4 xs = list(range(num_items)) random.shuffle(xs) - ds = ray.data.from_items(xs, parallelism=parallelism) + ds = ray.data.from_items(xs, override_num_blocks=parallelism) assert extract_values("item", ds.sort("item").take(num_items)) == list( range(num_items) ) @@ -171,7 +171,7 @@ def test_sort_arrow_with_empty_blocks( ) ds = ray.data.from_items( - [{"A": (x % 3), "B": x} for x in range(3)], parallelism=3 + [{"A": (x % 3), "B": x} for x in range(3)], override_num_blocks=3 ) ds = ds.filter(lambda r: r["A"] == 0) assert list(ds.sort("A").iter_rows()) == [{"A": 0, "B": 0}] @@ -271,7 +271,9 @@ def test_sort_pandas_with_empty_blocks(ray_start_regular, use_push_based_shuffle == 0 ) - ds = ray.data.from_items([{"A": (x % 3), "B": x} for x in range(3)], parallelism=3) + ds = ray.data.from_items( + [{"A": (x % 3), "B": x} for x in range(3)], override_num_blocks=3 + ) ds = ds.filter(lambda r: r["A"] == 0) assert list(ds.sort("A").iter_rows()) == [{"A": 0, "B": 0}] @@ -435,7 +437,7 @@ def test_push_based_shuffle_stats(ray_start_cluster): ray.init(cluster.address) parallelism = 100 - ds = ray.data.range(1000, parallelism=parallelism).random_shuffle() + ds = ray.data.range(1000, override_num_blocks=parallelism).random_shuffle() ds = ds.materialize() assert "RandomShuffleMerge" in ds.stats() # Check all nodes used. @@ -470,7 +472,11 @@ def test_sort_multinode(ray_start_cluster, use_push_based_shuffle): ray.init(cluster.address) parallelism = 100 - ds = ray.data.range(1000, parallelism=parallelism).random_shuffle().sort("id") + ds = ( + ray.data.range(1000, override_num_blocks=parallelism) + .random_shuffle() + .sort("id") + ) for i, row in enumerate(ds.iter_rows()): assert row["id"] == i @@ -576,7 +582,9 @@ def check_pipelined(refs): ray.init(cluster.address) - ds = ray.data.range(1000, parallelism=num_output_blocks).random_shuffle() + ds = ray.data.range( + 1000, override_num_blocks=num_output_blocks + ).random_shuffle() # Only the last round should have fewer tasks in flight. assert task_context["num_instances_below_parallelism"] <= 1 task_context["num_instances_below_parallelism"] = 0 @@ -613,7 +621,7 @@ def test_debug_limit_shuffle_execution_to_num_blocks( shuffle_fn = shuffle_op parallelism = 100 - ds = ray.data.range(1000, parallelism=parallelism) + ds = ray.data.range(1000, override_num_blocks=parallelism) shuffled_ds = shuffle_fn(ds).materialize() shuffled_ds = shuffled_ds.materialize() assert shuffled_ds._plan.initial_num_blocks() == parallelism @@ -631,7 +639,7 @@ def test_memory_usage(ray_start_regular, restore_data_context, use_push_based_sh DataContext.get_current().use_push_based_shuffle = use_push_based_shuffle parallelism = 2 - ds = ray.data.range(int(1e8), parallelism=parallelism) + ds = ray.data.range(int(1e8), override_num_blocks=parallelism) ds = ds.random_shuffle().materialize() stats = ds._get_stats_summary() @@ -664,7 +672,7 @@ def test_sort_object_ref_warnings( if not under_threshold: DataContext.get_current().warn_on_driver_memory_usage_bytes = 10_000 - ds = ray.data.range(int(1e8), parallelism=10) + ds = ray.data.range(int(1e8), override_num_blocks=10) with caplog.at_level(logging.WARNING, logger="ray.data.dataset"): ds = ds.random_shuffle().materialize() @@ -703,7 +711,7 @@ def test_sort_inlined_objects_warnings( if not under_threshold: DataContext.get_current().warn_on_driver_memory_usage_bytes = 3_000_000 - ds = ray.data.range(int(1e6), parallelism=10) + ds = ray.data.range(int(1e6), override_num_blocks=10) with caplog.at_level(logging.WARNING, logger="ray.data.dataset"): ds = ds.random_shuffle().materialize() diff --git a/python/ray/data/tests/test_split.py b/python/ray/data/tests/test_split.py index 196a00bcf221..f4e3ec9330c4 100644 --- a/python/ray/data/tests/test_split.py +++ b/python/ray/data/tests/test_split.py @@ -161,7 +161,7 @@ def take(s): for locality_hints in [None, x[:n]]: for equal in [True, False]: print("Testing", m, n, equal, locality_hints) - ds = ray.data.from_items(data, parallelism=m) + ds = ray.data.from_items(data, override_num_blocks=m) splits = ds.split(n, equal=equal, locality_hints=locality_hints) assert len(splits) == n outs = ray.get([take.remote(s) for s in splits]) @@ -192,7 +192,7 @@ def take(s): def test_split_at_indices_simple(ray_start_regular_shared): - ds = ray.data.range(10, parallelism=3) + ds = ray.data.range(10, override_num_blocks=3) with pytest.raises(ValueError): ds.split_at_indices([]) @@ -252,7 +252,7 @@ def test_split_at_indices_simple(ray_start_regular_shared): def test_split_at_indices_coverage(ray_start_regular_shared, num_blocks, indices): # Test that split_at_indices() creates the expected splits on a set of partition and # indices configurations. - ds = ray.data.range(20, parallelism=num_blocks) + ds = ray.data.range(20, override_num_blocks=num_blocks) splits = ds.split_at_indices(indices) r = [extract_values("id", s.take_all()) for s in splits] # Use np.array_split() semantics as our correctness ground-truth. @@ -290,7 +290,7 @@ def test_split_at_indices_coverage_complete( ): # Test that split_at_indices() creates the expected splits on a set of partition and # indices configurations. - ds = ray.data.range(5, parallelism=num_blocks) + ds = ray.data.range(5, override_num_blocks=num_blocks) splits = ds.split_at_indices(indices) r = [extract_values("id", s.take_all()) for s in splits] # Use np.array_split() semantics as our correctness ground-truth. @@ -298,7 +298,7 @@ def test_split_at_indices_coverage_complete( def test_split_proportionately(ray_start_regular_shared): - ds = ray.data.range(10, parallelism=3) + ds = ray.data.range(10, override_num_blocks=3) with pytest.raises(ValueError): ds.split_proportionately([]) @@ -336,7 +336,7 @@ def test_split_proportionately(ray_start_regular_shared): def test_split(ray_start_regular_shared): - ds = ray.data.range(20, parallelism=10) + ds = ray.data.range(20, override_num_blocks=10) assert ds._plan.initial_num_blocks() == 10 assert ds.sum() == 190 assert ds._block_num_rows() == [2] * 10 @@ -395,7 +395,7 @@ def assert_split_assignment(block_node_ids, actor_node_ids, expected_split_resul datasets[1] contains block 2. """ num_blocks = len(block_node_ids) - ds = ray.data.range(num_blocks, parallelism=num_blocks) + ds = ray.data.range(num_blocks, override_num_blocks=num_blocks) blocks = ds.get_internal_block_refs() assert len(block_node_ids) == len(blocks) actors = [Actor.remote() for i in range(len(actor_node_ids))] @@ -770,7 +770,7 @@ def test_train_test_split(ray_start_regular_shared): def test_split_is_not_disruptive(ray_start_cluster): ray.shutdown() - ds = ray.data.range(100, parallelism=10).map_batches(lambda x: x) + ds = ray.data.range(100, override_num_blocks=10).map_batches(lambda x: x) def verify_integrity(splits): for dss in splits: diff --git a/python/ray/data/tests/test_splitblocks.py b/python/ray/data/tests/test_splitblocks.py index aceff72b6715..a4006a9fb67c 100644 --- a/python/ray/data/tests/test_splitblocks.py +++ b/python/ray/data/tests/test_splitblocks.py @@ -31,7 +31,7 @@ def f(n, k): def test_small_file_split(ray_start_10_cpus_shared, restore_data_context): last_snapshot = get_initial_core_execution_metrics_snapshot() - ds = ray.data.read_csv("example://iris.csv", parallelism=1) + ds = ray.data.read_csv("example://iris.csv", override_num_blocks=1) materialized_ds = ds.materialize() assert materialized_ds._plan.initial_num_blocks() == 1 last_snapshot = assert_core_execution_metrics_equals( @@ -58,7 +58,7 @@ def test_small_file_split(ray_start_10_cpus_shared, restore_data_context): stats = materialized_ds.stats() assert "Operator 1 ReadCSV->MapBatches" in stats, stats - ds = ray.data.read_csv("example://iris.csv", parallelism=10) + ds = ray.data.read_csv("example://iris.csv", override_num_blocks=10) assert ds._plan.initial_num_blocks() == 1 assert ds.map_batches(lambda x: x).materialize()._plan.initial_num_blocks() == 10 last_snapshot = assert_core_execution_metrics_equals( @@ -82,7 +82,7 @@ def test_small_file_split(ray_start_10_cpus_shared, restore_data_context): last_snapshot, ) - ds = ray.data.read_csv("example://iris.csv", parallelism=100) + ds = ray.data.read_csv("example://iris.csv", override_num_blocks=100) assert ds._plan.initial_num_blocks() == 1 assert ds.map_batches(lambda x: x).materialize()._plan.initial_num_blocks() == 100 assert ds.materialize()._plan.initial_num_blocks() == 100 @@ -109,35 +109,39 @@ def test_large_file_additional_split(ray_start_10_cpus_shared, tmp_path): ds = ray.data.range_tensor(1000, shape=(10000,)) ds.repartition(1).write_parquet(tmp_path) - ds = ray.data.read_parquet(tmp_path, parallelism=1) + ds = ray.data.read_parquet(tmp_path, override_num_blocks=1) assert ds._plan.initial_num_blocks() == 1 print(ds.materialize().stats()) assert ( 5 < ds.materialize()._plan.initial_num_blocks() < 20 ) # Size-based block split - ds = ray.data.read_parquet(tmp_path, parallelism=10) + ds = ray.data.read_parquet(tmp_path, override_num_blocks=10) assert ds._plan.initial_num_blocks() == 1 assert 5 < ds.materialize()._plan.initial_num_blocks() < 20 - ds = ray.data.read_parquet(tmp_path, parallelism=100) + ds = ray.data.read_parquet(tmp_path, override_num_blocks=100) assert ds._plan.initial_num_blocks() == 1 assert 50 < ds.materialize()._plan.initial_num_blocks() < 200 - ds = ray.data.read_parquet(tmp_path, parallelism=1000) + ds = ray.data.read_parquet(tmp_path, override_num_blocks=1000) assert ds._plan.initial_num_blocks() == 1 assert 500 < ds.materialize()._plan.initial_num_blocks() < 2000 def test_map_batches_split(ray_start_10_cpus_shared, restore_data_context): - ds = ray.data.range(1000, parallelism=1).map_batches(lambda x: x, batch_size=1000) + ds = ray.data.range(1000, override_num_blocks=1).map_batches( + lambda x: x, batch_size=1000 + ) assert ds.materialize()._plan.initial_num_blocks() == 1 ctx = ray.data.context.DataContext.get_current() # 100 integer rows per block. ctx.target_max_block_size = 800 - ds = ray.data.range(1000, parallelism=1).map_batches(lambda x: x, batch_size=1000) + ds = ray.data.range(1000, override_num_blocks=1).map_batches( + lambda x: x, batch_size=1000 + ) assert ds.materialize()._plan.initial_num_blocks() == 10 # A single row is already larger than the target block diff --git a/python/ray/data/tests/test_sql.py b/python/ray/data/tests/test_sql.py index 42f635a0850a..0c1ff4771c96 100644 --- a/python/ray/data/tests/test_sql.py +++ b/python/ray/data/tests/test_sql.py @@ -37,7 +37,7 @@ def test_read_sql(temp_database: str, parallelism: int): dataset = ray.data.read_sql( "SELECT * FROM movie", lambda: sqlite3.connect(temp_database), - parallelism=parallelism, + override_num_blocks=parallelism, ) actual_values = [tuple(record.values()) for record in dataset.take_all()] @@ -267,7 +267,7 @@ def request_get_mock(url, params=None, **kwargs): table="table1", catalog="catalog1", schema="db1", - parallelism=5, + override_num_blocks=5, ).to_pandas() pd.testing.assert_frame_equal(result, expected_result_df) @@ -278,7 +278,7 @@ def request_get_mock(url, params=None, **kwargs): query="select * from table1", catalog="catalog1", schema="db1", - parallelism=5, + override_num_blocks=5, ).to_pandas() pd.testing.assert_frame_equal(result, expected_result_df) @@ -289,7 +289,7 @@ def request_get_mock(url, params=None, **kwargs): query="select * from table1", catalog="catalog1", schema="db1", - parallelism=100, + override_num_blocks=100, ).to_pandas() pd.testing.assert_frame_equal(result, expected_result_df) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 5454bda6ea5c..913cca0d79ac 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -174,7 +174,7 @@ def patch_update_stats_actor_iter(): def test_streaming_split_stats(ray_start_regular_shared, restore_data_context): context = DataContext.get_current() context.verbose_stats_logs = True - ds = ray.data.range(1000, parallelism=10) + ds = ray.data.range(1000, override_num_blocks=10) it = ds.map_batches(dummy_map_batches).streaming_split(1)[0] list(it.iter_batches()) stats = it.stats() @@ -220,7 +220,7 @@ def test_large_args_scheduling_strategy( ): context = DataContext.get_current() context.verbose_stats_logs = verbose_stats_logs - ds = ray.data.range_tensor(100, shape=(100000,), parallelism=1) + ds = ray.data.range_tensor(100, shape=(100000,), override_num_blocks=1) ds = ds.map_batches(dummy_map_batches, num_cpus=0.9).materialize() stats = ds.stats() expected_stats = ( @@ -264,7 +264,7 @@ def test_dataset_stats_basic( ) with patch.object(logger, "info") as mock_logger: - ds = ray.data.range(1000, parallelism=10) + ds = ray.data.range(1000, override_num_blocks=10) ds = ds.map_batches(dummy_map_batches).materialize() if enable_auto_log_stats: @@ -341,7 +341,7 @@ def test_dataset_stats_basic( def test_block_location_nums(ray_start_regular_shared, restore_data_context): context = DataContext.get_current() context.enable_get_object_locations_for_metrics = True - ds = ray.data.range(1000, parallelism=10) + ds = ray.data.range(1000, override_num_blocks=10) ds = ds.map_batches(dummy_map_batches).materialize() for batch in ds.iter_batches(): @@ -553,7 +553,7 @@ def check_stats2(): def test_dataset_stats_shuffle(ray_start_regular_shared): - ds = ray.data.range(1000, parallelism=10) + ds = ray.data.range(1000, override_num_blocks=10) ds = ds.random_shuffle().repartition(1, shuffle=True) stats = canonicalize(ds.materialize().stats()) assert ( @@ -606,28 +606,28 @@ def test_dataset_stats_shuffle(ray_start_regular_shared): def test_dataset_stats_repartition(ray_start_regular_shared): - ds = ray.data.range(1000, parallelism=10) + ds = ray.data.range(1000, override_num_blocks=10) ds = ds.repartition(1, shuffle=False) stats = ds.materialize().stats() assert "Repartition" in stats, stats def test_dataset_stats_union(ray_start_regular_shared): - ds = ray.data.range(1000, parallelism=10) + ds = ray.data.range(1000, override_num_blocks=10) ds = ds.union(ds) stats = ds.materialize().stats() assert "Union" in stats, stats def test_dataset_stats_zip(ray_start_regular_shared): - ds = ray.data.range(1000, parallelism=10) + ds = ray.data.range(1000, override_num_blocks=10) ds = ds.zip(ds) stats = ds.materialize().stats() assert "Zip" in stats, stats def test_dataset_stats_sort(ray_start_regular_shared): - ds = ray.data.range(1000, parallelism=10) + ds = ray.data.range(1000, override_num_blocks=10) ds = ds.sort("id") stats = ds.materialize().stats() assert "SortMap" in stats, stats @@ -641,7 +641,7 @@ def test_dataset_stats_from_items(ray_start_regular_shared): def test_dataset_stats_read_parquet(ray_start_regular_shared, tmp_path): - ds = ray.data.range(1000, parallelism=10) + ds = ray.data.range(1000, override_num_blocks=10) ds.write_parquet(str(tmp_path)) ds = ray.data.read_parquet(str(tmp_path)).map(lambda x: x) stats = canonicalize(ds.materialize().stats()) @@ -659,7 +659,9 @@ def test_dataset_stats_read_parquet(ray_start_regular_shared, tmp_path): def test_dataset_split_stats(ray_start_regular_shared, tmp_path): - ds = ray.data.range(100, parallelism=10).map(column_udf("id", lambda x: x + 1)) + ds = ray.data.range(100, override_num_blocks=10).map( + column_udf("id", lambda x: x + 1) + ) dses = ds.split_at_indices([49]) dses = [ds.map(column_udf("id", lambda x: x + 1)) for ds in dses] for ds_ in dses: @@ -864,7 +866,7 @@ def test_get_total_stats(ray_start_regular_shared, op_two_block): "See: https://github.com/ray-project/ray/pull/40173" ) def test_streaming_stats_full(ray_start_regular_shared, restore_data_context): - ds = ray.data.range(5, parallelism=5).map(column_udf("id", lambda x: x + 1)) + ds = ray.data.range(5, override_num_blocks=5).map(column_udf("id", lambda x: x + 1)) ds.take_all() stats = canonicalize(ds.stats()) assert ( @@ -893,7 +895,7 @@ def test_streaming_stats_full(ray_start_regular_shared, restore_data_context): def test_write_ds_stats(ray_start_regular_shared, tmp_path): - ds = ray.data.range(100, parallelism=100) + ds = ray.data.range(100, override_num_blocks=100) ds.write_parquet(str(tmp_path)) stats = ds.stats() @@ -913,7 +915,11 @@ def test_write_ds_stats(ray_start_regular_shared, tmp_path): assert stats == ds._write_ds.stats() - ds = ray.data.range(100, parallelism=100).map_batches(lambda x: x).materialize() + ds = ( + ray.data.range(100, override_num_blocks=100) + .map_batches(lambda x: x) + .materialize() + ) ds.write_parquet(str(tmp_path)) stats = ds.stats() @@ -1030,7 +1036,7 @@ def test_spilled_stats(shutdown_only, verbose_stats_logs, restore_data_context): # The size of dataset is around 50MB, there should be no spillage ds = ( - ray.data.range(250 * 80 * 80 * 4, parallelism=1) + ray.data.range(250 * 80 * 80 * 4, override_num_blocks=1) .map_batches(lambda x: x) .materialize() ) @@ -1088,7 +1094,7 @@ def test_stats_actor_iter_metrics(): def test_dataset_name(): - ds = ray.data.range(100, parallelism=20).map_batches(lambda x: x) + ds = ray.data.range(100, override_num_blocks=20).map_batches(lambda x: x) ds._set_name("test_ds") assert ds._name == "test_ds" assert str(ds) == ( @@ -1124,7 +1130,7 @@ def test_dataset_name(): assert update_fn.call_args_list[-1].args[0] == f"dataset_{mds._uuid}" - ds = ray.data.range(100, parallelism=20) + ds = ray.data.range(100, override_num_blocks=20) ds._set_name("very_loooooooong_name") assert ( str(ds) @@ -1172,7 +1178,7 @@ def test_op_state_logging(): def test_stats_actor_datasets(ray_start_cluster): - ds = ray.data.range(100, parallelism=20).map_batches(lambda x: x) + ds = ray.data.range(100, override_num_blocks=20).map_batches(lambda x: x) ds._set_name("test_stats_actor_datasets") ds.materialize() stats_actor = _get_or_create_stats_actor() diff --git a/python/ray/data/tests/test_streaming_executor_errored_blocks.py b/python/ray/data/tests/test_streaming_executor_errored_blocks.py index c98a6d10e774..1412ac087e58 100644 --- a/python/ray/data/tests/test_streaming_executor_errored_blocks.py +++ b/python/ray/data/tests/test_streaming_executor_errored_blocks.py @@ -40,7 +40,7 @@ def map_func(row): raise RuntimeError(f"Task failed: {id}") return row - ds = ray.data.range(num_tasks, parallelism=num_tasks).map(map_func) + ds = ray.data.range(num_tasks, override_num_blocks=num_tasks).map(map_func) should_fail = 0 <= max_errored_blocks < num_errored_blocks if should_fail: with pytest.raises(Exception, match="Task failed"): diff --git a/python/ray/data/tests/test_streaming_integration.py b/python/ray/data/tests/test_streaming_integration.py index dd518b5b155d..36454243b6ee 100644 --- a/python/ray/data/tests/test_streaming_integration.py +++ b/python/ray/data/tests/test_streaming_integration.py @@ -212,7 +212,7 @@ def run(self): lengths = get_lengths(i1, i2) assert lengths == [0, 1], lengths - ds = ray.data.range(1000, parallelism=10) + ds = ray.data.range(1000, override_num_blocks=10) for equal_split, use_iter_batches in itertools.product( [True, False], [True, False] ): @@ -226,7 +226,7 @@ def run(self): def test_streaming_split_barrier(ray_start_10_cpus_shared): - ds = ray.data.range(20, parallelism=20) + ds = ray.data.range(20, override_num_blocks=20) ( i1, i2, @@ -251,7 +251,7 @@ def consume(x, times): def test_streaming_split_invalid_iterator(ray_start_10_cpus_shared): - ds = ray.data.range(20, parallelism=20) + ds = ray.data.range(20, override_num_blocks=20) ( i1, i2, @@ -286,7 +286,7 @@ def test_streaming_split_independent_finish(ray_start_10_cpus_shared): num_splits = 2 ds = ray.data.range( num_splits * num_blocks_per_split, - parallelism=num_splits * num_blocks_per_split, + override_num_blocks=num_splits * num_blocks_per_split, ) ( i1, @@ -348,7 +348,7 @@ def consume(self, it, signal_actor, split_index): ) def test_e2e_option_propagation(ray_start_10_cpus_shared, restore_data_context): def run(): - ray.data.range(5, parallelism=5).map( + ray.data.range(5, override_num_blocks=5).map( lambda x: x, compute=ray.data.ActorPoolStrategy(size=2) ).take_all() @@ -374,7 +374,7 @@ def _test_hook(fn, args, strategy): DataContext.get_current().large_args_threshold = 0 # Simple 2-operator pipeline. - ray.data.range(2, parallelism=2).map(lambda x: x, num_cpus=2).take_all() + ray.data.range(2, override_num_blocks=2).map(lambda x: x, num_cpus=2).take_all() # Read tasks get SPREAD by default, subsequent ones use default policy. tasks = sorted(tasks) @@ -407,7 +407,7 @@ def func(x): # Only take the first item from the iterator. it = iter( - ray.data.range(100, parallelism=100) + ray.data.range(100, override_num_blocks=100) .map_batches(func, batch_size=None) .iter_batches(batch_size=None) ) @@ -447,7 +447,7 @@ def func(x): ctx.execution_options.resource_limits.object_store_memory = block_size # Only take the first item from the iterator. - ds = ray.data.range(100, parallelism=100).map_batches(func, batch_size=None) + ds = ray.data.range(100, override_num_blocks=100).map_batches(func, batch_size=None) it = iter(ds.iter_batches(batch_size=None, prefetch_batches=0)) next(it) time.sleep(3) # Pause a little so anything that would be executed runs. @@ -473,7 +473,7 @@ def test_e2e_liveness_with_output_backpressure_edge_case( ctx = DataContext.get_current() ctx.execution_options.preserve_order = True ctx.execution_options.resource_limits.object_store_memory = 1 - ds = ray.data.range(10000, parallelism=100).map(lambda x: x, num_cpus=2) + ds = ray.data.range(10000, override_num_blocks=100).map(lambda x: x, num_cpus=2) # This will hang forever if the liveness logic is wrong, since the output # backpressure will prevent any operators from running at all. assert extract_values("id", ds.take_all()) == list(range(10000)) @@ -519,7 +519,7 @@ def __call__(self, x): # Tests that we autoscale up to necessary size. # 6 tasks + 1 tasks in flight per actor => need at least 6 actors to run. - ray.data.range(6, parallelism=6).map_batches( + ray.data.range(6, override_num_blocks=6).map_batches( BarrierWaiter, fn_constructor_args=(b1,), compute=ray.data.ActorPoolStrategy( @@ -533,7 +533,7 @@ def __call__(self, x): # Tests that we don't over-scale up. # 6 tasks + 2 tasks in flight per actor => only scale up to 3 actors - ray.data.range(6, parallelism=6).map_batches( + ray.data.range(6, override_num_blocks=6).map_batches( BarrierWaiter, fn_constructor_args=(b2,), compute=ray.data.ActorPoolStrategy( @@ -548,7 +548,7 @@ def __call__(self, x): # This will hang, since the actor pool is too small. with pytest.raises(ray.exceptions.RayTaskError): - ray.data.range(6, parallelism=6).map( + ray.data.range(6, override_num_blocks=6).map( BarrierWaiter, fn_constructor_args=(b3,), compute=ray.data.ActorPoolStrategy(min_size=1, max_size=2), @@ -564,7 +564,7 @@ def __call__(self, x): # Tests that autoscaling works even when resource constrained via actor killing. # To pass this, we need to autoscale down to free up slots for task execution. DataContext.get_current().execution_options.resource_limits.cpu = 2 - ray.data.range(5, parallelism=5).map_batches( + ray.data.range(5, override_num_blocks=5).map_batches( UDFClass, compute=ray.data.ActorPoolStrategy(min_size=1, max_size=2), batch_size=None, @@ -592,7 +592,7 @@ def __call__(self, x): return x # Test recover. - base = ray.data.range(1000, parallelism=100) + base = ray.data.range(1000, override_num_blocks=100) ds1 = base.map_batches( RandomExit, compute=ray.data.ActorPoolStrategy(size=4), max_task_retries=999 ) diff --git a/python/ray/data/tests/test_tensor.py b/python/ray/data/tests/test_tensor.py index 206535982028..71ed971e6e30 100644 --- a/python/ray/data/tests/test_tensor.py +++ b/python/ray/data/tests/test_tensor.py @@ -26,7 +26,7 @@ def test_large_tensor_creation(ray_start_regular_shared): """Tests that large tensor read task creation can complete successfully without hanging.""" start_time = time.time() - ray.data.range_tensor(1000, parallelism=1000, shape=(80, 80, 100, 100)) + ray.data.range_tensor(1000, override_num_blocks=1000, shape=(80, 80, 100, 100)) end_time = time.time() # Should not take more than 20 seconds. @@ -36,7 +36,7 @@ def test_large_tensor_creation(ray_start_regular_shared): def test_tensors_basic(ray_start_regular_shared): # Create directly. tensor_shape = (3, 5) - ds = ray.data.range_tensor(6, shape=tensor_shape, parallelism=6) + ds = ray.data.range_tensor(6, shape=tensor_shape, override_num_blocks=6) assert str(ds) == ( "Dataset(num_rows=6, schema={data: numpy.ndarray(shape=(3, 5), dtype=int64)})" ) @@ -200,7 +200,7 @@ def mapper(arr): return arr res = ( - ray.data.range(10, parallelism=2) + ray.data.range(10, override_num_blocks=2) .map_batches(mapper, batch_format="numpy") .take() ) @@ -210,7 +210,9 @@ def mapper(arr): def test_batch_tensors(ray_start_regular_shared): import torch - ds = ray.data.from_items([torch.tensor([0, 0]) for _ in range(40)], parallelism=40) + ds = ray.data.from_items( + [torch.tensor([0, 0]) for _ in range(40)], override_num_blocks=40 + ) res = ( "MaterializedDataset(\n" " num_blocks=40,\n" @@ -298,7 +300,9 @@ def test_tensors_sort(ray_start_regular_shared): def test_tensors_inferred_from_map(ray_start_regular_shared): # Test map. - ds = ray.data.range(10, parallelism=10).map(lambda _: {"data": np.ones((4, 4))}) + ds = ray.data.range(10, override_num_blocks=10).map( + lambda _: {"data": np.ones((4, 4))} + ) ds = ds.materialize() assert str(ds) == ( "MaterializedDataset(\n" @@ -309,7 +313,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ) # Test map_batches. - ds = ray.data.range(16, parallelism=4).map_batches( + ds = ray.data.range(16, override_num_blocks=4).map_batches( lambda _: {"data": np.ones((3, 4, 4))}, batch_size=2 ) ds = ds.materialize() @@ -322,7 +326,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ) # Test flat_map. - ds = ray.data.range(10, parallelism=10).flat_map( + ds = ray.data.range(10, override_num_blocks=10).flat_map( lambda _: [{"data": np.ones((4, 4))}, {"data": np.ones((4, 4))}] ) ds = ds.materialize() @@ -335,7 +339,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ) # Test map_batches ndarray column. - ds = ray.data.range(16, parallelism=4).map_batches( + ds = ray.data.range(16, override_num_blocks=4).map_batches( lambda _: pd.DataFrame({"a": [np.ones((4, 4))] * 3}), batch_size=2 ) ds = ds.materialize() @@ -347,7 +351,7 @@ def test_tensors_inferred_from_map(ray_start_regular_shared): ")" ) - ds = ray.data.range(16, parallelism=4).map_batches( + ds = ray.data.range(16, override_num_blocks=4).map_batches( lambda _: pd.DataFrame({"a": [np.ones((2, 2)), np.ones((3, 3))]}), batch_size=2, ) diff --git a/python/ray/data/tests/test_text.py b/python/ray/data/tests/test_text.py index c2936fa62c55..47dd736fe3be 100644 --- a/python/ray/data/tests/test_text.py +++ b/python/ray/data/tests/test_text.py @@ -192,7 +192,7 @@ def get_node_id(): f.write("goodbye") ds = ray.data.read_text( - path, parallelism=2, ray_remote_args={"resources": {"bar": 1}} + path, override_num_blocks=2, ray_remote_args={"resources": {"bar": 1}} ) blocks = ds.get_internal_block_refs() diff --git a/python/ray/data/tests/test_tfrecords.py b/python/ray/data/tests/test_tfrecords.py index 508c7bbe9894..85e7cbbd5a44 100644 --- a/python/ray/data/tests/test_tfrecords.py +++ b/python/ray/data/tests/test_tfrecords.py @@ -499,10 +499,10 @@ def test_write_tfrecords( # The dataset we will write to a .tfrecords file. ds = ray.data.from_items( data_partial(with_tf_schema), - # Here, we specify `parallelism=1` to ensure that all rows end up in the same - # block, which is required for type inference involving - # partially missing columns. - parallelism=1, + # Here, we specify `override_num_blocks=1` to ensure that all rows end up in + # the same block, which is required for type inference involving partially + # missing columns. + override_num_blocks=1, ) # The corresponding tf.train.Example that we would expect to read @@ -602,10 +602,10 @@ def test_readback_tfrecords( """ # The dataset we will write to a .tfrecords file. - # Here and in the read_tfrecords call below, we specify `parallelism=1` + # Here and in the read_tfrecords call below, we specify `override_num_blocks=1` # to ensure that all rows end up in the same block, which is required # for type inference involving partially missing columns. - ds = ray.data.from_items(data_partial(with_tf_schema), parallelism=1) + ds = ray.data.from_items(data_partial(with_tf_schema), override_num_blocks=1) expected_records = tf_records_partial() tf_schema = None @@ -617,7 +617,7 @@ def test_readback_tfrecords( ds.write_tfrecords(tmp_path, tf_schema=tf_schema) # Read the TFRecords. readback_ds = read_tfrecords_with_tfx_read_override( - tmp_path, tf_schema=tf_schema, parallelism=1 + tmp_path, tf_schema=tf_schema, override_num_blocks=1 ) _ds_eq_streaming(ds, readback_ds) @@ -641,7 +641,7 @@ def test_readback_tfrecords_empty_features( # type inference on completely empty columns is ambiguous. ds.write_tfrecords(tmp_path) else: - ds = ray.data.from_items(data_empty(with_tf_schema), parallelism=1) + ds = ray.data.from_items(data_empty(with_tf_schema), override_num_blocks=1) expected_records = tf_records_empty() features = expected_records[0].features @@ -654,7 +654,7 @@ def test_readback_tfrecords_empty_features( readback_ds = read_tfrecords_with_tfx_read_override( tmp_path, tf_schema=tf_schema, - parallelism=1, + override_num_blocks=1, ) _ds_eq_streaming(ds, readback_ds) @@ -691,7 +691,7 @@ def test_read_with_invalid_schema( from tensorflow_metadata.proto.v0 import schema_pb2 # The dataset we will write to a .tfrecords file. - ds = ray.data.from_items(data_partial(True), parallelism=1) + ds = ray.data.from_items(data_partial(True), override_num_blocks=1) expected_records = tf_records_partial() # Build fake schema proto with missing/incorrect field name @@ -740,7 +740,7 @@ def test_read_with_invalid_schema( @pytest.mark.parametrize("num_rows_per_file", [5, 10, 50]) def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file): - ray.data.range(100, parallelism=20).write_tfrecords( + ray.data.range(100, override_num_blocks=20).write_tfrecords( tmp_path, num_rows_per_file=num_rows_per_file ) diff --git a/python/ray/data/tests/test_webdataset.py b/python/ray/data/tests/test_webdataset.py index 7f286ed5de8b..3f509f812baf 100644 --- a/python/ray/data/tests/test_webdataset.py +++ b/python/ray/data/tests/test_webdataset.py @@ -39,7 +39,7 @@ def test_webdataset_read(ray_start_2_cpus, tmp_path): tf.write(f"{i}.b", str(i**2).encode("utf-8")) assert os.path.exists(path) assert len(glob.glob(f"{tmp_path}/*.tar")) == 1 - ds = ray.data.read_webdataset(paths=[str(tmp_path)], parallelism=1) + ds = ray.data.read_webdataset(paths=[str(tmp_path)], override_num_blocks=1) samples = ds.take(100) assert len(samples) == 100 for i, sample in enumerate(samples): @@ -62,7 +62,7 @@ def test_webdataset_suffixes(ray_start_2_cpus, tmp_path): # test simple suffixes ds = ray.data.read_webdataset( - paths=[str(tmp_path)], parallelism=1, suffixes=["txt", "cls"] + paths=[str(tmp_path)], override_num_blocks=1, suffixes=["txt", "cls"] ) samples = ds.take(100) assert len(samples) == 100 @@ -71,7 +71,7 @@ def test_webdataset_suffixes(ray_start_2_cpus, tmp_path): # test fnmatch patterns for suffixes ds = ray.data.read_webdataset( - paths=[str(tmp_path)], parallelism=1, suffixes=["*.txt", "*.cls"] + paths=[str(tmp_path)], override_num_blocks=1, suffixes=["*.txt", "*.cls"] ) samples = ds.take(100) assert len(samples) == 100 @@ -82,7 +82,9 @@ def test_webdataset_suffixes(ray_start_2_cpus, tmp_path): def select(name): return name.endswith("txt") - ds = ray.data.read_webdataset(paths=[str(tmp_path)], parallelism=1, suffixes=select) + ds = ray.data.read_webdataset( + paths=[str(tmp_path)], override_num_blocks=1, suffixes=select + ) samples = ds.take(100) assert len(samples) == 100 for i, sample in enumerate(samples): @@ -95,7 +97,7 @@ def renamer(name): return result ds = ray.data.read_webdataset( - paths=[str(tmp_path)], parallelism=1, filerename=renamer + paths=[str(tmp_path)], override_num_blocks=1, filerename=renamer ) samples = ds.take(100) assert len(samples) == 100 @@ -165,7 +167,7 @@ def test_webdataset_coding(ray_start_2_cpus, tmp_path): assert len(paths) == 1 path = paths[0] assert os.path.exists(path) - ds = ray.data.read_webdataset(paths=[str(tmp_path)], parallelism=1) + ds = ray.data.read_webdataset(paths=[str(tmp_path)], override_num_blocks=1) samples = ds.take(1) assert len(samples) == 1 for sample in samples: @@ -185,7 +187,7 @@ def test_webdataset_coding(ray_start_2_cpus, tmp_path): # test the format argument to the default decoder and multiple decoders ds = ray.data.read_webdataset( - paths=[str(tmp_path)], parallelism=1, decoder=["PIL", custom_decoder] + paths=[str(tmp_path)], override_num_blocks=1, decoder=["PIL", custom_decoder] ) samples = ds.take(1) assert len(samples) == 1 @@ -202,7 +204,7 @@ def test_webdataset_coding(ray_start_2_cpus, tmp_path): @pytest.mark.parametrize("num_rows_per_file", [5, 10, 50]) def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file): ray.data.from_items( - [{"id": str(i)} for i in range(100)], parallelism=20 + [{"id": str(i)} for i in range(100)], override_num_blocks=20 ).write_webdataset(tmp_path, num_rows_per_file=num_rows_per_file) for filename in os.listdir(tmp_path): diff --git a/python/ray/experimental/tqdm_ray.py b/python/ray/experimental/tqdm_ray.py index cbd127e8bbf3..d0f2373606c4 100644 --- a/python/ray/experimental/tqdm_ray.py +++ b/python/ray/experimental/tqdm_ray.py @@ -388,7 +388,7 @@ def sleep(x): time.sleep(delay) return x - ray.data.range(1000, parallelism=100).map( + ray.data.range(1000, override_num_blocks=100).map( sleep, compute=ray.data.ActorPoolStrategy(size=1) ).count() diff --git a/python/ray/tune/tests/test_tuner.py b/python/ray/tune/tests/test_tuner.py index f5c054b08904..34602144e5d8 100644 --- a/python/ray/tune/tests/test_tuner.py +++ b/python/ray/tune/tests/test_tuner.py @@ -86,7 +86,7 @@ def load_data(): def gen_dataset_func(do_shuffle: Optional[bool] = False) -> Dataset: test_datasource = TestDatasource(do_shuffle) - return read_datasource(test_datasource, parallelism=1) + return read_datasource(test_datasource, override_num_blocks=1) def gen_dataset_func_eager(): diff --git a/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py b/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py index d597dd5565ee..a1c98c7f680d 100644 --- a/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py +++ b/release/air_tests/air_benchmarks/mlperf-train/resnet50_ray_air.py @@ -276,7 +276,7 @@ def build_synthetic_dataset(batch_size): empty = np.empty(image_dims, dtype=np.uint8) ds = ray.data.from_items( [{"image": empty, "label": 1} for _ in range(int(batch_size))], - parallelism=1, + override_num_blocks=1, ) return ds diff --git a/release/nightly_tests/dataset/data_ingest_benchmark.py b/release/nightly_tests/dataset/data_ingest_benchmark.py index 23b96391f79b..ee38208226e6 100644 --- a/release/nightly_tests/dataset/data_ingest_benchmark.py +++ b/release/nightly_tests/dataset/data_ingest_benchmark.py @@ -119,7 +119,7 @@ def make_ds(size_gb: int, parallelism: int = -1): record_size = record_dim * 8 num_records = int(total_size / record_size) dataset = ray.data.range_tensor( - num_records, shape=(record_dim,), parallelism=parallelism + num_records, shape=(record_dim,), override_num_blocks=parallelism ) print("Created dataset", dataset, "of size", dataset.size_bytes()) return dataset diff --git a/release/nightly_tests/dataset/dataset_random_access.py b/release/nightly_tests/dataset/dataset_random_access.py index dfe955520496..91aaf7224dc4 100644 --- a/release/nightly_tests/dataset/dataset_random_access.py +++ b/release/nightly_tests/dataset/dataset_random_access.py @@ -26,7 +26,7 @@ def main(): num_workers = 400 run_time = 15 - ds = ray.data.range(nrow, parallelism=parallelism) + ds = ray.data.range(nrow, override_num_blocks=parallelism) rmap = ds.to_random_access_dataset("id", num_workers=num_workers) print("Multiget throughput: ", end="") diff --git a/release/nightly_tests/dataset/image_loader_microbenchmark.py b/release/nightly_tests/dataset/image_loader_microbenchmark.py index fa93fb530e12..a3ae3b7f15c5 100644 --- a/release/nightly_tests/dataset/image_loader_microbenchmark.py +++ b/release/nightly_tests/dataset/image_loader_microbenchmark.py @@ -372,7 +372,9 @@ def get_ray_mosaic_dataset(mosaic_data_root): def get_ray_parquet_dataset(parquet_data_root, parallelism=None): if parallelism is not None: - ray_dataset = ray.data.read_parquet(parquet_data_root, parallelism=parallelism) + ray_dataset = ray.data.read_parquet( + parquet_data_root, override_num_blocks=parallelism + ) else: ray_dataset = ray.data.read_parquet(parquet_data_root) ray_dataset = ray_dataset.map(decode_image_crop_and_flip) diff --git a/release/nightly_tests/dataset/map_batches_benchmark.py b/release/nightly_tests/dataset/map_batches_benchmark.py index 7637eb97370d..556a6d96a734 100644 --- a/release/nightly_tests/dataset/map_batches_benchmark.py +++ b/release/nightly_tests/dataset/map_batches_benchmark.py @@ -165,7 +165,7 @@ def run_map_batches_benchmark(benchmark: Benchmark): # And the first one will generate multiple output blocks. ray.data._internal.logical.optimizers.PHYSICAL_OPTIMIZER_RULES = [] parallelism = input_size // batch_size - input_ds = ray.data.range(input_size, parallelism=parallelism).materialize() + input_ds = ray.data.range(input_size, override_num_blocks=parallelism).materialize() def map_batches_fn(num_output_blocks, batch): """A map_batches function that generates num_output_blocks output blocks.""" diff --git a/release/nightly_tests/dataset/operator_fusion_benchmark.py b/release/nightly_tests/dataset/operator_fusion_benchmark.py index eb04bc64be60..a6e1e36efb81 100644 --- a/release/nightly_tests/dataset/operator_fusion_benchmark.py +++ b/release/nightly_tests/dataset/operator_fusion_benchmark.py @@ -84,7 +84,7 @@ def make_ds( block_size=block_size, data_format=data_format, num_columns=num_columns, - parallelism=num_tasks, + override_num_blocks=num_tasks, ) for op_spec in ops_spec: op = op_spec.pop("op") diff --git a/release/nightly_tests/dataset/read_parquet_benchmark.py b/release/nightly_tests/dataset/read_parquet_benchmark.py index 1820341556a0..4de1a35c802b 100644 --- a/release/nightly_tests/dataset/read_parquet_benchmark.py +++ b/release/nightly_tests/dataset/read_parquet_benchmark.py @@ -17,7 +17,7 @@ def read_parquet( ) -> Dataset: return ray.data.read_parquet( paths=root, - parallelism=parallelism, + override_num_blocks=parallelism, use_threads=use_threads, filter=filter, columns=columns, @@ -33,7 +33,7 @@ def run_read_parquet_benchmark(benchmark: Benchmark): test_name, read_parquet, root="s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet", # noqa: E501 - parallelism=parallelism, + override_num_blocks=parallelism, use_threads=use_threads, ) @@ -79,7 +79,7 @@ def run_read_parquet_benchmark(benchmark: Benchmark): test_name, read_parquet, root=data_dirs[-1], - parallelism=1, # We are testing one task to handle N files + override_num_blocks=1, # We are testing one task to handle N files ) for dir in data_dirs: shutil.rmtree(dir) diff --git a/release/nightly_tests/dataset/sort.py b/release/nightly_tests/dataset/sort.py index 845c7a4a9923..5cdf512b45d2 100644 --- a/release/nightly_tests/dataset/sort.py +++ b/release/nightly_tests/dataset/sort.py @@ -129,7 +129,7 @@ def run_benchmark(args): num_rows_per_partition = partition_size // (8 + args.row_size_bytes) ds = ray.data.read_datasource( source, - parallelism=num_partitions, + override_num_blocks=num_partitions, n=num_rows_per_partition * num_partitions, row_size_bytes=args.row_size_bytes, )