From 1acc94277259235dc29c52fa0f3937aaf5fc496c Mon Sep 17 00:00:00 2001 From: Balaji Veeramani Date: Mon, 16 Sep 2024 14:15:25 +0900 Subject: [PATCH] [Data] Add `partitioning` parameter to `read_parquet` (#47553) To extract path partition information with `read_parquet`, you pass a PyArrow `partitioning` object to `dataset_kwargs`. For example: ``` schema = pa.schema([("one", pa.int32()), ("two", pa.string())]) partitioning = pa.dataset.partitioning(schema, flavor="hive") ds = ray.data.read_parquet(... dataset_kwargs=dict(partitioning=partitioning)) ``` This is problematic for two reasons: 1. It tightly couples the interface with the implementation; partitioning only works if we use `pyarrow.Dataset` in a specific way in the implementation. 2. It's inconsistent with all of the other file-based API. All other APIs use expose a top-level `partitioning` parameter (rather than `dataset_kwargs`) where you pass a Ray Data `Partitioning` object (rather than a PyArrow partitioning object). --------- Signed-off-by: Balaji Veeramani Signed-off-by: ujjawal-khare --- .../datasource/parquet_datasource.py | 8 -- python/ray/data/tests/test_parquet.py | 112 ------------------ 2 files changed, 120 deletions(-) diff --git a/python/ray/data/_internal/datasource/parquet_datasource.py b/python/ray/data/_internal/datasource/parquet_datasource.py index b688c2630d68..e361faf2d9e6 100644 --- a/python/ray/data/_internal/datasource/parquet_datasource.py +++ b/python/ray/data/_internal/datasource/parquet_datasource.py @@ -230,14 +230,6 @@ def __init__( # duplicating the partition data, we disable PyArrow's partitioning. dataset_kwargs["partitioning"] = None - # `read_schema` is the schema object that will be used to perform - # read operations. - # It should be None, unless user has specified the schema or columns. - # We don't use the inferred schema for read, because the pyarrow only infers - # schema based on the first file. Thus, files with different schemas will end - # up producing blocks with wrong schema. - # See https://github.com/ray-project/ray/issues/47960 for more context. - read_schema = schema pq_ds = get_parquet_dataset(paths, filesystem, dataset_kwargs) if schema is None: diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 23969d736f04..1615e9922aca 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -1205,118 +1205,6 @@ def test_partitioning_in_dataset_kwargs_raises_error(ray_start_regular_shared): ) -def test_tensors_in_tables_parquet( - ray_start_regular_shared, tmp_path, restore_data_context -): - """This test verifies both V1 and V2 Tensor Type extensions of - Arrow Array types - """ - - num_rows = 10_000 - num_groups = 10 - - inner_shape = (2, 2, 2) - shape = (num_rows,) + inner_shape - num_tensor_elem = np.prod(np.array(shape)) - - arr = np.arange(num_tensor_elem).reshape(shape) - - id_col_name = "_id" - group_col_name = "group" - tensor_col_name = "tensor" - - id_vals = list(range(num_rows)) - group_vals = [i % num_groups for i in id_vals] - - df = pd.DataFrame( - { - id_col_name: id_vals, - group_col_name: group_vals, - tensor_col_name: [a.tobytes() for a in arr], - } - ) - - # - # Test #1: Verify writing tensors as ArrowTensorType (v1) - # - - tensor_v1_path = f"{tmp_path}/tensor_v1" - - ds = ray.data.from_pandas([df]) - ds.write_parquet(tensor_v1_path) - - ds = ray.data.read_parquet( - tensor_v1_path, - tensor_column_schema={tensor_col_name: (arr.dtype, inner_shape)}, - override_num_blocks=10, - ) - - assert isinstance( - ds.schema().base_schema.field_by_name(tensor_col_name).type, ArrowTensorType - ) - - expected_tuples = list(zip(id_vals, group_vals, arr)) - - def _assert_equal(rows, expected): - values = [[s[id_col_name], s[group_col_name], s[tensor_col_name]] for s in rows] - - assert len(values) == len(expected) - - for v, e in zip(sorted(values, key=lambda v: v[0]), expected): - np.testing.assert_equal(v, e) - - _assert_equal(ds.take_all(), expected_tuples) - - # - # Test #2: Verify writing tensors as ArrowTensorTypeV2 - # - - DataContext.get_current().use_arrow_tensor_v2 = True - - tensor_v2_path = f"{tmp_path}/tensor_v2" - - ds = ray.data.from_pandas([df]) - ds.write_parquet(tensor_v2_path) - - ds = ray.data.read_parquet( - tensor_v2_path, - tensor_column_schema={tensor_col_name: (arr.dtype, inner_shape)}, - override_num_blocks=10, - ) - - assert isinstance( - ds.schema().base_schema.field_by_name(tensor_col_name).type, ArrowTensorTypeV2 - ) - - _assert_equal(ds.take_all(), expected_tuples) - - -def test_multiple_files_with_ragged_arrays(ray_start_regular_shared, tmp_path): - # Test reading multiple parquet files, each of which has different-shaped - # ndarrays in the same column. - # See https://github.com/ray-project/ray/issues/47960 for more context. - num_rows = 3 - ds = ray.data.range(num_rows) - - def map(row): - id = row["id"] + 1 - row["data"] = np.zeros((id * 100, id * 100), dtype=np.int8) - return row - - # Write 3 parquet files with different-shaped ndarray values in the - # "data" column. - ds.map(map).repartition(num_rows).write_parquet(tmp_path) - - # Read these 3 files, check that the result is correct. - ds2 = ray.data.read_parquet(tmp_path, override_num_blocks=1) - res = ds2.take_all() - res = sorted(res, key=lambda row: row["id"]) - assert len(res) == num_rows - for index, item in enumerate(res): - assert item["id"] == index - assert item["data"].shape == (100 * (index + 1), 100 * (index + 1)) - - if __name__ == "__main__": import sys