Skip to content

Commit

Permalink
[Data] Add partitioning parameter to read_parquet (ray-project#47553
Browse files Browse the repository at this point in the history
)

To extract path partition information with `read_parquet`, you pass a
PyArrow `partitioning` object to `dataset_kwargs`. For example:
```
schema = pa.schema([("one", pa.int32()), ("two", pa.string())])
partitioning = pa.dataset.partitioning(schema, flavor="hive")
ds = ray.data.read_parquet(... dataset_kwargs=dict(partitioning=partitioning))
```

This is problematic for two reasons:
1. It tightly couples the interface with the implementation;
partitioning only works if we use `pyarrow.Dataset` in a specific way in
the implementation.
2. It's inconsistent with all of the other file-based API. All other
APIs use expose a top-level `partitioning` parameter (rather than
`dataset_kwargs`) where you pass a Ray Data `Partitioning` object
(rather than a PyArrow partitioning object).

---------

Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: ujjawal-khare <[email protected]>
  • Loading branch information
bveeramani authored and ujjawal-khare committed Oct 15, 2024
1 parent 43009a3 commit 1acc942
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 120 deletions.
8 changes: 0 additions & 8 deletions python/ray/data/_internal/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,6 @@ def __init__(
# duplicating the partition data, we disable PyArrow's partitioning.
dataset_kwargs["partitioning"] = None

# `read_schema` is the schema object that will be used to perform
# read operations.
# It should be None, unless user has specified the schema or columns.
# We don't use the inferred schema for read, because the pyarrow only infers
# schema based on the first file. Thus, files with different schemas will end
# up producing blocks with wrong schema.
# See https://github.com/ray-project/ray/issues/47960 for more context.
read_schema = schema
pq_ds = get_parquet_dataset(paths, filesystem, dataset_kwargs)

if schema is None:
Expand Down
112 changes: 0 additions & 112 deletions python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1205,118 +1205,6 @@ def test_partitioning_in_dataset_kwargs_raises_error(ray_start_regular_shared):
)


def test_tensors_in_tables_parquet(
ray_start_regular_shared, tmp_path, restore_data_context
):
"""This test verifies both V1 and V2 Tensor Type extensions of
Arrow Array types
"""

num_rows = 10_000
num_groups = 10

inner_shape = (2, 2, 2)
shape = (num_rows,) + inner_shape
num_tensor_elem = np.prod(np.array(shape))

arr = np.arange(num_tensor_elem).reshape(shape)

id_col_name = "_id"
group_col_name = "group"
tensor_col_name = "tensor"

id_vals = list(range(num_rows))
group_vals = [i % num_groups for i in id_vals]

df = pd.DataFrame(
{
id_col_name: id_vals,
group_col_name: group_vals,
tensor_col_name: [a.tobytes() for a in arr],
}
)

#
# Test #1: Verify writing tensors as ArrowTensorType (v1)
#

tensor_v1_path = f"{tmp_path}/tensor_v1"

ds = ray.data.from_pandas([df])
ds.write_parquet(tensor_v1_path)

ds = ray.data.read_parquet(
tensor_v1_path,
tensor_column_schema={tensor_col_name: (arr.dtype, inner_shape)},
override_num_blocks=10,
)

assert isinstance(
ds.schema().base_schema.field_by_name(tensor_col_name).type, ArrowTensorType
)

expected_tuples = list(zip(id_vals, group_vals, arr))

def _assert_equal(rows, expected):
values = [[s[id_col_name], s[group_col_name], s[tensor_col_name]] for s in rows]

assert len(values) == len(expected)

for v, e in zip(sorted(values, key=lambda v: v[0]), expected):
np.testing.assert_equal(v, e)

_assert_equal(ds.take_all(), expected_tuples)

#
# Test #2: Verify writing tensors as ArrowTensorTypeV2
#

DataContext.get_current().use_arrow_tensor_v2 = True

tensor_v2_path = f"{tmp_path}/tensor_v2"

ds = ray.data.from_pandas([df])
ds.write_parquet(tensor_v2_path)

ds = ray.data.read_parquet(
tensor_v2_path,
tensor_column_schema={tensor_col_name: (arr.dtype, inner_shape)},
override_num_blocks=10,
)

assert isinstance(
ds.schema().base_schema.field_by_name(tensor_col_name).type, ArrowTensorTypeV2
)

_assert_equal(ds.take_all(), expected_tuples)


def test_multiple_files_with_ragged_arrays(ray_start_regular_shared, tmp_path):
# Test reading multiple parquet files, each of which has different-shaped
# ndarrays in the same column.
# See https://github.com/ray-project/ray/issues/47960 for more context.
num_rows = 3
ds = ray.data.range(num_rows)

def map(row):
id = row["id"] + 1
row["data"] = np.zeros((id * 100, id * 100), dtype=np.int8)
return row

# Write 3 parquet files with different-shaped ndarray values in the
# "data" column.
ds.map(map).repartition(num_rows).write_parquet(tmp_path)

# Read these 3 files, check that the result is correct.
ds2 = ray.data.read_parquet(tmp_path, override_num_blocks=1)
res = ds2.take_all()
res = sorted(res, key=lambda row: row["id"])
assert len(res) == num_rows
for index, item in enumerate(res):
assert item["id"] == index
assert item["data"].shape == (100 * (index + 1), 100 * (index + 1))


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit 1acc942

Please sign in to comment.