Skip to content

Commit

Permalink
[Data] Remove Ray Data read parallelism from all libraries and tests (#…
Browse files Browse the repository at this point in the history
…43692)

This PR is to remove Ray Data read `parallelism`, replace with `override_num_blocks` from all libraries and unit tests. The PR change on Ray Data is #43113 .

Signed-off-by: Cheng Su <[email protected]>
  • Loading branch information
c21 authored Mar 5, 2024
1 parent 13ad9fd commit 9a36755
Show file tree
Hide file tree
Showing 53 changed files with 403 additions and 338 deletions.
2 changes: 1 addition & 1 deletion dashboard/modules/data/tests/test_data_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)
def test_get_datasets():
ray.init()
ds = ray.data.range(100, parallelism=20).map_batches(lambda x: x)
ds = ray.data.range(100, override_num_blocks=20).map_batches(lambda x: x)
ds._set_name("data_head_test")
ds.materialize()

Expand Down
12 changes: 6 additions & 6 deletions python/ray/air/tests/test_new_dataset_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,15 @@ def train_loop_per_worker():


def test_per_epoch_preprocessing(ray_start_4_cpus):
ds = ray.data.range(100, parallelism=100).randomize_block_order()
ds = ray.data.range(100, override_num_blocks=100).randomize_block_order()
test = TestRandom(2, True, datasets={"train": ds})
test.fit()

ds = ray.data.range(100, parallelism=100).random_shuffle()
ds = ray.data.range(100, override_num_blocks=100).random_shuffle()
test = TestRandom(2, True, datasets={"train": ds})
test.fit()

ds = ray.data.range(100, parallelism=100).map(
ds = ray.data.range(100, override_num_blocks=100).map(
lambda x: {"id": x["id"] * random.random()}
)
test = TestRandom(2, True, datasets={"train": ds})
Expand All @@ -257,7 +257,7 @@ def test_per_epoch_preprocessing(ray_start_4_cpus):
def test_materialized_preprocessing(ray_start_4_cpus):
# TODO(ekl) we should test all these configs with splitting enabled, but this
# requires implementing deterministic streaming split.
ds = ray.data.range(100, parallelism=100).randomize_block_order()
ds = ray.data.range(100, override_num_blocks=100).randomize_block_order()
ds = ds.materialize()
test = TestRandom(
2,
Expand All @@ -267,7 +267,7 @@ def test_materialized_preprocessing(ray_start_4_cpus):
)
test.fit()

ds = ray.data.range(100, parallelism=100).random_shuffle()
ds = ray.data.range(100, override_num_blocks=100).random_shuffle()
ds = ds.materialize()
test = TestRandom(
2,
Expand All @@ -277,7 +277,7 @@ def test_materialized_preprocessing(ray_start_4_cpus):
)
test.fit()

ds = ray.data.range(100, parallelism=100).map(
ds = ray.data.range(100, override_num_blocks=100).map(
lambda x: {"id": x["id"] * random.random()}
)
ds = ds.materialize()
Expand Down
4 changes: 2 additions & 2 deletions python/ray/air/util/check_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ def make_local_dataset_iterator(
args = parser.parse_args()

# Generate a synthetic dataset of ~10GiB of float64 data. The dataset is sharded
# into 100 blocks (parallelism=100).
ds = ray.data.range_tensor(50000, shape=(80, 80, 4), parallelism=100)
# into 100 blocks (override_num_blocks=100).
ds = ray.data.range_tensor(50000, shape=(80, 80, 4), override_num_blocks=100)

# An example preprocessing chain that just scales all values by 4.0 in two stages.
ds = ds.map_batches(lambda df: df * 2, batch_format="pandas")
Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4398,7 +4398,7 @@ def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]:
Examples:
>>> import ray
>>> ds = ray.data.range(10, parallelism=2)
>>> ds = ray.data.range(10, override_num_blocks=2)
>>> refs = ds.to_pandas_refs()
>>> len(refs)
2
Expand Down Expand Up @@ -4426,7 +4426,7 @@ def to_numpy_refs(
Examples:
>>> import ray
>>> ds = ray.data.range(10, parallelism=2)
>>> ds = ray.data.range(10, override_num_blocks=2)
>>> refs = ds.to_numpy_refs()
>>> len(refs)
2
Expand Down Expand Up @@ -4461,7 +4461,7 @@ def to_arrow_refs(self) -> List[ObjectRef["pyarrow.Table"]]:
Examples:
>>> import ray
>>> ds = ray.data.range(10, parallelism=2)
>>> ds = ray.data.range(10, override_num_blocks=2)
>>> refs = ds.to_arrow_refs()
>>> len(refs)
2
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def read_mongo(
... collection="my_collection",
... pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501
... schema=Schema({"col1": pa.string(), "col2": pa.int64()}),
... parallelism=10,
... override_num_blocks=10,
... )
Args:
Expand Down Expand Up @@ -1955,10 +1955,10 @@ def read_sql(
``LIMIT`` and ``OFFSET`` to fetch a subset of the rows. However, for many
databases, ``OFFSET`` is slow.
As a workaround, set ``parallelism=1`` to directly fetch all rows in a single
task. Note that this approach requires all result rows to fit in the memory of
single task. If the rows don't fit, your program may raise an out of memory
error.
As a workaround, set ``override_num_blocks=1`` to directly fetch all rows in a
single task. Note that this approach requires all result rows to fit in the
memory of single task. If the rows don't fit, your program may raise an out of
memory error.
Examples:
Expand Down
62 changes: 32 additions & 30 deletions python/ray/data/tests/test_all_to_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@


def test_zip(ray_start_regular_shared):
ds1 = ray.data.range(5, parallelism=5)
ds2 = ray.data.range(5, parallelism=5).map(column_udf("id", lambda x: x + 1))
ds1 = ray.data.range(5, override_num_blocks=5)
ds2 = ray.data.range(5, override_num_blocks=5).map(
column_udf("id", lambda x: x + 1)
)
ds = ds1.zip(ds2)
assert ds.schema().names == ["id", "id_1"]
assert ds.take() == named_values(
Expand All @@ -39,8 +41,8 @@ def test_zip_different_num_blocks_combinations(
ray_start_regular_shared, num_blocks1, num_blocks2
):
n = 12
ds1 = ray.data.range(n, parallelism=num_blocks1)
ds2 = ray.data.range(n, parallelism=num_blocks2).map(
ds1 = ray.data.range(n, override_num_blocks=num_blocks1)
ds2 = ray.data.range(n, override_num_blocks=num_blocks2).map(
column_udf("id", lambda x: x + 1)
)
ds = ds1.zip(ds2)
Expand Down Expand Up @@ -70,11 +72,11 @@ def test_zip_different_num_blocks_split_smallest(
num_blocks1 = 4
num_blocks2 = 2
ds1 = ray.data.from_items(
[{str(i): i for i in range(num_cols1)}] * n, parallelism=num_blocks1
[{str(i): i for i in range(num_cols1)}] * n, override_num_blocks=num_blocks1
)
ds2 = ray.data.from_items(
[{str(i): i for i in range(num_cols1, num_cols1 + num_cols2)}] * n,
parallelism=num_blocks2,
override_num_blocks=num_blocks2,
)
ds = ds1.zip(ds2).materialize()
num_blocks = ds._plan._snapshot_blocks.executed_num_blocks()
Expand Down Expand Up @@ -129,8 +131,8 @@ def foo(x):

num_items = 10
items = list(range(num_items))
ds1 = ray.data.from_items(items, parallelism=num_items)
ds2 = ray.data.from_items(items, parallelism=num_items)
ds1 = ray.data.from_items(items, override_num_blocks=num_items)
ds2 = ray.data.from_items(items, override_num_blocks=num_items)
ds2 = ds2.map_batches(foo, batch_size=1)
result = ds1.zip(ds2).take_all()
assert result == named_values(
Expand All @@ -139,15 +141,15 @@ def foo(x):


def test_empty_shuffle(ray_start_regular_shared):
ds = ray.data.range(100, parallelism=100)
ds = ray.data.range(100, override_num_blocks=100)
ds = ds.filter(lambda x: x)
ds = ds.map_batches(lambda x: x)
ds = ds.random_shuffle() # Would prev. crash with AssertionError: pyarrow.Table.
ds.show()


def test_repartition_shuffle(ray_start_regular_shared):
ds = ray.data.range(20, parallelism=10)
ds = ray.data.range(20, override_num_blocks=10)
assert ds._plan.initial_num_blocks() == 10
assert ds.sum() == 190
assert ds._block_num_rows() == [2] * 10
Expand All @@ -162,13 +164,13 @@ def test_repartition_shuffle(ray_start_regular_shared):
assert ds3.sum() == 190
assert ds3._block_num_rows() == [2] * 10 + [0] * 10

large = ray.data.range(10000, parallelism=10)
large = ray.data.range(10000, override_num_blocks=10)
large = large.repartition(20, shuffle=True)
assert large._block_num_rows() == [500] * 20


def test_repartition_noshuffle(ray_start_regular_shared):
ds = ray.data.range(20, parallelism=10)
ds = ray.data.range(20, override_num_blocks=10)
assert ds._plan.initial_num_blocks() == 10
assert ds.sum() == 190
assert ds._block_num_rows() == [2] * 10
Expand All @@ -194,13 +196,13 @@ def test_repartition_noshuffle(ray_start_regular_shared):
assert ds5._plan.initial_num_blocks() == 4
assert ds5._block_num_rows() == [5, 6, 5, 6]

large = ray.data.range(10000, parallelism=10)
large = ray.data.range(10000, override_num_blocks=10)
large = large.repartition(20)
assert large._block_num_rows() == [500] * 20


def test_repartition_shuffle_arrow(ray_start_regular_shared):
ds = ray.data.range(20, parallelism=10)
ds = ray.data.range(20, override_num_blocks=10)
assert ds._plan.initial_num_blocks() == 10
assert ds.count() == 20
assert ds._block_num_rows() == [2] * 10
Expand All @@ -215,7 +217,7 @@ def test_repartition_shuffle_arrow(ray_start_regular_shared):
assert ds3.count() == 20
assert ds3._block_num_rows() == [2] * 10 + [0] * 10

large = ray.data.range(10000, parallelism=10)
large = ray.data.range(10000, override_num_blocks=10)
large = large.repartition(20, shuffle=True)
assert large._block_num_rows() == [500] * 20

Expand Down Expand Up @@ -1132,7 +1134,7 @@ def test_random_block_order(ray_start_regular_shared, restore_data_context):
assert results == expected

# Test LazyBlockList.randomize_block_order.
lazy_blocklist_ds = ray.data.range(12, parallelism=4)
lazy_blocklist_ds = ray.data.range(12, override_num_blocks=4)
lazy_blocklist_ds = lazy_blocklist_ds.randomize_block_order(seed=0)
lazy_blocklist_results = lazy_blocklist_ds.take()
lazy_blocklist_expected = named_values("id", [6, 7, 8, 0, 1, 2, 3, 4, 5, 9, 10, 11])
Expand All @@ -1148,8 +1150,8 @@ def test_random_shuffle(shutdown_only, use_push_based_shuffle):
r2 = ray.data.range(100).random_shuffle().take(999)
assert r1 != r2, (r1, r2)

r1 = ray.data.range(100, parallelism=1).random_shuffle().take(999)
r2 = ray.data.range(100, parallelism=1).random_shuffle().take(999)
r1 = ray.data.range(100, override_num_blocks=1).random_shuffle().take(999)
r2 = ray.data.range(100, override_num_blocks=1).random_shuffle().take(999)
assert r1 != r2, (r1, r2)

assert (
Expand All @@ -1160,22 +1162,22 @@ def test_random_shuffle(shutdown_only, use_push_based_shuffle):
r2 = ray.data.range(100).random_shuffle().repartition(1).take(999)
assert r1 != r2, (r1, r2)

r0 = ray.data.range(100, parallelism=5).take(999)
r1 = ray.data.range(100, parallelism=5).random_shuffle(seed=0).take(999)
r2 = ray.data.range(100, parallelism=5).random_shuffle(seed=0).take(999)
r3 = ray.data.range(100, parallelism=5).random_shuffle(seed=12345).take(999)
r0 = ray.data.range(100, override_num_blocks=5).take(999)
r1 = ray.data.range(100, override_num_blocks=5).random_shuffle(seed=0).take(999)
r2 = ray.data.range(100, override_num_blocks=5).random_shuffle(seed=0).take(999)
r3 = ray.data.range(100, override_num_blocks=5).random_shuffle(seed=12345).take(999)
assert r1 == r2, (r1, r2)
assert r1 != r0, (r1, r0)
assert r1 != r3, (r1, r3)

r0 = ray.data.range(100, parallelism=5).take(999)
r1 = ray.data.range(100, parallelism=5).random_shuffle(seed=0).take(999)
r2 = ray.data.range(100, parallelism=5).random_shuffle(seed=0).take(999)
r0 = ray.data.range(100, override_num_blocks=5).take(999)
r1 = ray.data.range(100, override_num_blocks=5).random_shuffle(seed=0).take(999)
r2 = ray.data.range(100, override_num_blocks=5).random_shuffle(seed=0).take(999)
assert r1 == r2, (r1, r2)
assert r1 != r0, (r1, r0)

# Test move.
ds = ray.data.range(100, parallelism=2)
ds = ray.data.range(100, override_num_blocks=2)
r1 = ds.random_shuffle().take(999)
ds = ds.map(lambda x: x).take(999)
r2 = ray.data.range(100).random_shuffle().take(999)
Expand All @@ -1193,7 +1195,7 @@ def test_random_shuffle_check_random(shutdown_only):
num_files = 10
num_rows = 100
items = [i for i in range(num_files) for _ in range(num_rows)]
ds = ray.data.from_items(items, parallelism=num_files)
ds = ray.data.from_items(items, override_num_blocks=num_files)
out = ds.random_shuffle().take(num_files * num_rows)
for i in range(num_files):
part = out[i * num_rows : (i + 1) * num_rows]
Expand All @@ -1220,7 +1222,7 @@ def test_random_shuffle_check_random(shutdown_only):
num_files = 10
num_rows = 100
items = [j for i in range(num_files) for j in range(num_rows)]
ds = ray.data.from_items(items, parallelism=num_files)
ds = ray.data.from_items(items, override_num_blocks=num_files)
out = ds.random_shuffle().take(num_files * num_rows)
for i in range(num_files):
part = out[i * num_rows : (i + 1) * num_rows]
Expand Down Expand Up @@ -1252,7 +1254,7 @@ def test_random_shuffle_with_custom_resource(ray_start_cluster, use_push_based_s
# Run dataset in "bar" nodes.
ds = ray.data.read_parquet(
"example://parquet_images_mini",
parallelism=2,
override_num_blocks=2,
ray_remote_args={"resources": {"bar": 1}},
)
ds = ds.random_shuffle(resources={"bar": 1}).materialize()
Expand All @@ -1279,7 +1281,7 @@ def get_node_id():
node1_id = ray.get(get_node_id.options(resources={"bar:1": 1}).remote())
node2_id = ray.get(get_node_id.options(resources={"bar:2": 1}).remote())

ds = ray.data.range(100, parallelism=2).random_shuffle()
ds = ray.data.range(100, override_num_blocks=2).random_shuffle()
blocks = ds.get_internal_block_refs()
ray.wait(blocks, num_returns=len(blocks), fetch_local=False)
location_data = ray.experimental.get_object_locations(blocks)
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/tests/test_auto_parallelism.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ def test_auto_parallelism_basic(shutdown_only):
context = DataContext.get_current()
context.min_parallelism = 1
# Datasource bound.
ds = ray.data.range_tensor(5, shape=(100,), parallelism=-1)
ds = ray.data.range_tensor(5, shape=(100,), override_num_blocks=-1)
assert ds._plan.initial_num_blocks() == 5, ds
# CPU bound. TODO(ekl) we should fix range datasource to respect parallelism more
# properly, currently it can go a little over.
ds = ray.data.range_tensor(10000, shape=(100,), parallelism=-1)
ds = ray.data.range_tensor(10000, shape=(100,), override_num_blocks=-1)
assert ds._plan.initial_num_blocks() == 16, ds
# Block size bound.
ds = ray.data.range_tensor(100000000, shape=(100,), parallelism=-1)
ds = ray.data.range_tensor(100000000, shape=(100,), override_num_blocks=-1)
assert ds._plan.initial_num_blocks() >= 590, ds
assert ds._plan.initial_num_blocks() <= 600, ds

Expand All @@ -147,7 +147,7 @@ def test_auto_parallelism_placement_group(shutdown_only):
def run():
context = DataContext.get_current()
context.min_parallelism = 1
ds = ray.data.range_tensor(2000, shape=(100,), parallelism=-1)
ds = ray.data.range_tensor(2000, shape=(100,), override_num_blocks=-1)
return ds._plan.initial_num_blocks()

# 1/16 * 4 * 16 = 4
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/tests/test_backpressure_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def consume(batch):

last_snapshot = get_initial_core_execution_metrics_snapshot()

ds = ray.data.range(NUM_ROWS_TOTAL, parallelism=NUM_TASKS)
ds = ray.data.range(NUM_ROWS_TOTAL, override_num_blocks=NUM_TASKS)
ds = ds.map_batches(produce, batch_size=NUM_ROWS_PER_TASK)
ds = ds.map_batches(consume, batch_size=None, num_cpus=0.9)
# Check core execution metrics every 10 rows, because it's expensive.
Expand Down Expand Up @@ -106,7 +106,7 @@ def consumer(batch):
del batch["data"]
return batch

ds = ray.data.range(1, parallelism=1).materialize()
ds = ray.data.range(1, override_num_blocks=1).materialize()
ds = ds.map_batches(producer, batch_size=None, num_cpus=producer_num_cpus)
# Add a limit op in the middle, to test that ReservationOpResourceAllocator
# will account limit op's resource usage to the previous producer map op.
Expand Down Expand Up @@ -210,7 +210,7 @@ def map_fn(batch):
batch["data"] = [np.zeros(block_size, dtype=np.uint8)]
return batch

ds = ray.data.range(num_blocks, parallelism=num_blocks)
ds = ray.data.range(num_blocks, override_num_blocks=num_blocks)
ds = ds.map_batches(map_fn, batch_size=None, num_cpus=1)
assert len(ds.take_all()) == num_blocks

Expand Down Expand Up @@ -265,7 +265,7 @@ def range_(i):
ctx.execution_options.resource_limits.object_store_memory = 10e6

# 10GiB dataset.
ds = ray.data.read_datasource(source, n=10000, parallelism=1000)
ds = ray.data.read_datasource(source, n=10000, override_num_blocks=1000)
it = iter(ds.iter_batches(batch_size=None, prefetch_batches=0))
next(it)
time.sleep(3)
Expand All @@ -290,7 +290,7 @@ def __call__(self, df: np.ndarray):
return {"id": np.random.randn(1, 20, 1024, 1024)}

ctx = ray.init(object_store_memory=4e9)
ds = ray.data.range_tensor(20, shape=(3, 1024, 1024), parallelism=20)
ds = ray.data.range_tensor(20, shape=(3, 1024, 1024), override_num_blocks=20)

pipe = ds.map_batches(
TestFast,
Expand Down
Loading

0 comments on commit 9a36755

Please sign in to comment.