Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Merge DefaultParquetMetadataProvider and ParquetMetadataProvider #45733

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,8 @@ MetadataProvider API

datasource.FileMetadataProvider
datasource.BaseFileMetadataProvider
datasource.ParquetMetadataProvider
datasource.DefaultFileMetadataProvider
datasource.DefaultParquetMetadataProvider
datasource.ParquetMetadataProvider
datasource.FastFileMetadataProvider


6 changes: 1 addition & 5 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@
from ray.data.datasource.parquet_bulk_datasource import ParquetBulkDatasource
from ray.data.datasource.parquet_datasink import _ParquetDatasink
from ray.data.datasource.parquet_datasource import ParquetDatasource
from ray.data.datasource.parquet_meta_provider import (
DefaultParquetMetadataProvider,
ParquetMetadataProvider,
)
from ray.data.datasource.parquet_meta_provider import ParquetMetadataProvider
from ray.data.datasource.partitioning import (
Partitioning,
PartitionStyle,
Expand Down Expand Up @@ -83,7 +80,6 @@
"SQLDatasource",
"DefaultBlockWritePathProvider",
"DefaultFileMetadataProvider",
"DefaultParquetMetadataProvider",
"DummyOutputDatasink",
"FastFileMetadataProvider",
"FileBasedDatasource",
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/datasource/_default_metadata_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
FastFileMetadataProvider,
)
from ray.data.datasource.image_datasource import _ImageFileMetadataProvider
from ray.data.datasource.parquet_meta_provider import DefaultParquetMetadataProvider
from ray.data.datasource.parquet_meta_provider import ParquetMetadataProvider


def get_generic_metadata_provider(file_extensions: Optional[List[str]]):
Expand All @@ -15,7 +15,7 @@ def get_generic_metadata_provider(file_extensions: Optional[List[str]]):

def get_parquet_metadata_provider():
# Used by `read_parquet`
return DefaultParquetMetadataProvider()
return ParquetMetadataProvider()


def get_parquet_bulk_metadata_provider():
Expand Down
7 changes: 2 additions & 5 deletions python/ray/data/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
)
from ray.data.datasource.datasource import ReadTask
from ray.data.datasource.file_meta_provider import _handle_read_os_error
from ray.data.datasource.parquet_meta_provider import (
DefaultParquetMetadataProvider,
ParquetMetadataProvider,
)
from ray.data.datasource.parquet_meta_provider import ParquetMetadataProvider
from ray.data.datasource.partitioning import PathPartitionFilter
from ray.data.datasource.path_util import (
_has_file_extension,
Expand Down Expand Up @@ -190,7 +187,7 @@ def __init__(
_block_udf: Optional[Callable[[Block], Block]] = None,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
schema: Optional[Union[type, "pyarrow.lib.Schema"]] = None,
meta_provider: ParquetMetadataProvider = DefaultParquetMetadataProvider(),
meta_provider: ParquetMetadataProvider = ParquetMetadataProvider(),
partition_filter: PathPartitionFilter = None,
shuffle: Union[Literal["files"], None] = None,
include_paths: bool = False,
Expand Down
72 changes: 17 additions & 55 deletions python/ray/data/datasource/parquet_meta_provider.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any, List, Optional, Union
from typing import TYPE_CHECKING, List, Optional, Union

from ray.data._internal.util import call_with_retry
from ray.data.block import BlockMetadata
Expand Down Expand Up @@ -32,24 +32,15 @@

@DeveloperAPI
class ParquetMetadataProvider(FileMetadataProvider):
"""Abstract callable that provides block metadata for Arrow Parquet file fragments.

All file fragments should belong to a single dataset block.

Supports optional pre-fetching of ordered metadata for all file fragments in
a single batch to help optimize metadata resolution.

Current subclasses:
- :class:`~ray.data.datasource.file_meta_provider.DefaultParquetMetadataProvider`
""" # noqa: E501
"""Provides block metadata for Arrow Parquet file fragments."""

def _get_block_metadata(
self,
paths: List[str],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
*,
num_fragments: int,
prefetched_metadata: Optional[List[Any]],
prefetched_metadata: Optional[List["_ParquetFileFragmentMetaData"]],
) -> BlockMetadata:
"""Resolves and returns block metadata for files of a single dataset block.

Expand All @@ -66,49 +57,6 @@ def _get_block_metadata(
Returns:
BlockMetadata aggregated across the given file paths.
"""
raise NotImplementedError

def prefetch_file_metadata(
self,
fragments: List["pyarrow.dataset.ParquetFileFragment"],
**ray_remote_args,
) -> Optional[List[Any]]:
"""Pre-fetches file metadata for all Parquet file fragments in a single batch.

Subsets of the metadata returned will be provided as input to subsequent calls
to ``_get_block_metadata`` together with their corresponding Parquet file
fragments.

Implementations that don't support pre-fetching file metadata shouldn't
override this method.

Args:
fragments: The Parquet file fragments to fetch metadata for.

Returns:
Metadata resolved for each input file fragment, or `None`. Metadata
must be returned in the same order as all input file fragments, such
that `metadata[i]` always contains the metadata for `fragments[i]`.
"""
return None


@DeveloperAPI
class DefaultParquetMetadataProvider(ParquetMetadataProvider):
"""The default file metadata provider for ParquetDatasource.

Aggregates total block bytes and number of rows using the Parquet file metadata
associated with a list of Arrow Parquet dataset file fragments.
"""

def _get_block_metadata(
self,
paths: List[str],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
*,
num_fragments: int,
prefetched_metadata: Optional[List["_ParquetFileFragmentMetaData"]],
) -> BlockMetadata:
if (
prefetched_metadata is not None
and len(prefetched_metadata) == num_fragments
Expand Down Expand Up @@ -140,6 +88,20 @@ def prefetch_file_metadata(
fragments: List["pyarrow.dataset.ParquetFileFragment"],
**ray_remote_args,
) -> Optional[List["pyarrow.parquet.FileMetaData"]]:
"""Pre-fetches file metadata for all Parquet file fragments in a single batch.

Subsets of the metadata returned will be provided as input to subsequent calls
to ``_get_block_metadata`` together with their corresponding Parquet file
fragments.

Args:
fragments: The Parquet file fragments to fetch metadata for.

Returns:
Metadata resolved for each input file fragment, or `None`. Metadata
must be returned in the same order as all input file fragments, such
that `metadata[i]` always contains the metadata for `fragments[i]`.
"""
from ray.data.datasource.parquet_datasource import _SerializedFragment

if len(fragments) > PARALLELIZE_META_FETCH_THRESHOLD:
Expand Down
7 changes: 1 addition & 6 deletions python/ray/data/tests/test_metadata_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from ray.data.datasource import (
BaseFileMetadataProvider,
DefaultFileMetadataProvider,
DefaultParquetMetadataProvider,
FastFileMetadataProvider,
FileMetadataProvider,
ParquetMetadataProvider,
Expand Down Expand Up @@ -71,10 +70,6 @@ def test_file_metadata_providers_not_implemented():
meta_provider(["/foo/bar.csv"], None, rows_per_file=None, file_sizes=[None])
with pytest.raises(NotImplementedError):
meta_provider.expand_paths(["/foo/bar.csv"], None)
meta_provider = ParquetMetadataProvider()
with pytest.raises(NotImplementedError):
meta_provider(["/foo/bar.csv"], None, num_fragments=0, prefetched_metadata=None)
assert meta_provider.prefetch_file_metadata(["test"]) is None


@pytest.mark.parametrize(
Expand Down Expand Up @@ -108,7 +103,7 @@ def test_default_parquet_metadata_provider(fs, data_path):
table = pa.Table.from_pandas(df2)
pq.write_table(table, paths[1], filesystem=fs)

meta_provider = DefaultParquetMetadataProvider()
meta_provider = ParquetMetadataProvider()
pq_ds = pq.ParquetDataset(paths, filesystem=fs, use_legacy_dataset=False)
file_metas = meta_provider.prefetch_file_metadata(pq_ds.fragments)
fragment_file_metas = [_ParquetFileFragmentMetaData(m) for m in file_metas]
Expand Down
7 changes: 2 additions & 5 deletions python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@
import ray
from ray.data.block import BlockAccessor
from ray.data.context import DataContext
from ray.data.datasource import (
DefaultFileMetadataProvider,
DefaultParquetMetadataProvider,
)
from ray.data.datasource import DefaultFileMetadataProvider, ParquetMetadataProvider
from ray.data.datasource.parquet_bulk_datasource import ParquetBulkDatasource
from ray.data.datasource.parquet_datasource import (
NUM_CPUS_FOR_META_FETCH_TASK,
Expand Down Expand Up @@ -205,7 +202,7 @@ def test_parquet_read_meta_provider(ray_start_regular_shared, fs, data_path):
path2 = os.path.join(setup_data_path, "test2.parquet")
pq.write_table(table, path2, filesystem=fs)

class TestMetadataProvider(DefaultParquetMetadataProvider):
class TestMetadataProvider(ParquetMetadataProvider):
def prefetch_file_metadata(self, fragments, **ray_remote_args):
assert ray_remote_args["num_cpus"] == NUM_CPUS_FOR_META_FETCH_TASK
assert (
Expand Down
Loading