Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Remove _default_metadata_providers #47575

Merged
merged 3 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/ray/data/_internal/datasource/image_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def __init__(
self.mode = mode

meta_provider = file_based_datasource_kwargs.get("meta_provider", None)
if isinstance(meta_provider, _ImageFileMetadataProvider):
if isinstance(meta_provider, ImageFileMetadataProvider):
self._encoding_ratio = self._estimate_files_encoding_ratio()
meta_provider._set_encoding_ratio(self._encoding_ratio)
else:
Expand Down Expand Up @@ -154,7 +154,7 @@ def _estimate_files_encoding_ratio(self) -> float:
return max(ratio, IMAGE_ENCODING_RATIO_ESTIMATE_LOWER_BOUND)


class _ImageFileMetadataProvider(DefaultFileMetadataProvider):
class ImageFileMetadataProvider(DefaultFileMetadataProvider):
def _set_encoding_ratio(self, encoding_ratio: int):
"""Set image file encoding ratio, to provide accurate size in bytes metadata."""
self._encoding_ratio = encoding_ratio
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/_internal/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
from ray.data.block import Block
from ray.data.context import DataContext
from ray.data.datasource import Datasource
from ray.data.datasource._default_metadata_providers import (
get_generic_metadata_provider,
)
from ray.data.datasource.datasource import ReadTask
from ray.data.datasource.file_meta_provider import _handle_read_os_error
from ray.data.datasource.file_meta_provider import (
DefaultFileMetadataProvider,
_handle_read_os_error,
)
from ray.data.datasource.parquet_meta_provider import ParquetMetadataProvider
from ray.data.datasource.partitioning import PathPartitionFilter
from ray.data.datasource.path_util import (
Expand Down Expand Up @@ -194,7 +194,7 @@ def __init__(
# files. To avoid this, we expand the input paths with the default metadata
# provider and then apply the partition filter or file extensions.
if partition_filter is not None or file_extensions is not None:
default_meta_provider = get_generic_metadata_provider(file_extensions=None)
default_meta_provider = DefaultFileMetadataProvider()
expanded_paths, _ = map(
list, zip(*default_meta_provider.expand_paths(paths, filesystem))
)
Expand Down
28 changes: 0 additions & 28 deletions python/ray/data/datasource/_default_metadata_providers.py

This file was deleted.

44 changes: 20 additions & 24 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
DeltaSharingDatasource,
)
from ray.data._internal.datasource.iceberg_datasource import IcebergDatasource
from ray.data._internal.datasource.image_datasource import ImageDatasource
from ray.data._internal.datasource.image_datasource import (
ImageDatasource,
ImageFileMetadataProvider,
)
from ray.data._internal.datasource.json_datasource import JSONDatasource
from ray.data._internal.datasource.lance_datasource import LanceDatasource
from ray.data._internal.datasource.mongo_datasource import MongoDatasource
Expand Down Expand Up @@ -67,19 +70,17 @@
BaseFileMetadataProvider,
Connection,
Datasource,
ParquetMetadataProvider,
PathPartitionFilter,
)
from ray.data.datasource._default_metadata_providers import (
get_generic_metadata_provider,
get_image_metadata_provider,
get_parquet_bulk_metadata_provider,
get_parquet_metadata_provider,
)
from ray.data.datasource.datasource import Reader
from ray.data.datasource.file_based_datasource import (
_unwrap_arrow_serialization_workaround,
)
from ray.data.datasource.file_meta_provider import (
DefaultFileMetadataProvider,
FastFileMetadataProvider,
)
from ray.data.datasource.parquet_meta_provider import ParquetMetadataProvider
from ray.data.datasource.partitioning import Partitioning
from ray.types import ObjectRef
from ray.util.annotations import DeveloperAPI, PublicAPI
Expand Down Expand Up @@ -727,7 +728,7 @@ def read_parquet(
_validate_shuffle_arg(shuffle)

if meta_provider is None:
meta_provider = get_parquet_metadata_provider(override_num_blocks)
meta_provider = ParquetMetadataProvider()
arrow_parquet_args = _resolve_parquet_args(
tensor_column_schema,
**arrow_parquet_args,
Expand Down Expand Up @@ -886,7 +887,7 @@ class string
ValueError: if ``mode`` is unsupported.
"""
if meta_provider is None:
meta_provider = get_image_metadata_provider()
meta_provider = ImageFileMetadataProvider()

datasource = ImageDatasource(
paths,
Expand Down Expand Up @@ -1012,7 +1013,7 @@ def read_parquet_bulk(
:class:`~ray.data.Dataset` producing records read from the specified paths.
"""
if meta_provider is None:
meta_provider = get_parquet_bulk_metadata_provider()
meta_provider = FastFileMetadataProvider()
read_table_args = _resolve_parquet_args(
tensor_column_schema,
**arrow_parquet_args,
Expand Down Expand Up @@ -1157,7 +1158,7 @@ def read_json(
:class:`~ray.data.Dataset` producing records read from the specified paths.
""" # noqa: E501
if meta_provider is None:
meta_provider = get_generic_metadata_provider(JSONDatasource._FILE_EXTENSIONS)
meta_provider = DefaultFileMetadataProvider()

datasource = JSONDatasource(
paths,
Expand Down Expand Up @@ -1323,7 +1324,7 @@ def read_csv(
:class:`~ray.data.Dataset` producing records read from the specified paths.
"""
if meta_provider is None:
meta_provider = get_generic_metadata_provider(CSVDatasource._FILE_EXTENSIONS)
meta_provider = DefaultFileMetadataProvider()

datasource = CSVDatasource(
paths,
Expand Down Expand Up @@ -1434,7 +1435,7 @@ def read_text(
paths.
"""
if meta_provider is None:
meta_provider = get_generic_metadata_provider(TextDatasource._FILE_EXTENSIONS)
meta_provider = DefaultFileMetadataProvider()

datasource = TextDatasource(
paths,
Expand Down Expand Up @@ -1542,7 +1543,7 @@ def read_avro(
:class:`~ray.data.Dataset` holding records from the Avro files.
"""
if meta_provider is None:
meta_provider = get_generic_metadata_provider(AvroDatasource._FILE_EXTENSIONS)
meta_provider = DefaultFileMetadataProvider()

datasource = AvroDatasource(
paths,
Expand Down Expand Up @@ -1637,7 +1638,7 @@ def read_numpy(
Dataset holding Tensor records read from the specified paths.
""" # noqa: E501
if meta_provider is None:
meta_provider = get_generic_metadata_provider(NumpyDatasource._FILE_EXTENSIONS)
meta_provider = DefaultFileMetadataProvider()

datasource = NumpyDatasource(
paths,
Expand Down Expand Up @@ -1788,10 +1789,7 @@ def read_tfrecords(
)

if meta_provider is None:
meta_provider = get_generic_metadata_provider(
TFRecordDatasource._FILE_EXTENSIONS
)

meta_provider = DefaultFileMetadataProvider()
datasource = TFRecordDatasource(
paths,
tf_schema=tf_schema,
Expand Down Expand Up @@ -1893,9 +1891,7 @@ def read_webdataset(
.. _tf.train.Example: https://www.tensorflow.org/api_docs/python/tf/train/Example
""" # noqa: E501
if meta_provider is None:
meta_provider = get_generic_metadata_provider(
WebDatasetDatasource._FILE_EXTENSIONS
)
meta_provider = DefaultFileMetadataProvider()

datasource = WebDatasetDatasource(
paths,
Expand Down Expand Up @@ -2011,7 +2007,7 @@ def read_binary_files(
:class:`~ray.data.Dataset` producing rows read from the specified paths.
"""
if meta_provider is None:
meta_provider = get_generic_metadata_provider(BinaryDatasource._FILE_EXTENSIONS)
meta_provider = DefaultFileMetadataProvider()

datasource = BinaryDatasource(
paths,
Expand Down
4 changes: 3 additions & 1 deletion python/ray/data/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,9 @@ def test_csv_read_filter_non_csv_file(shutdown_only, tmp_path):

# Directory of CSV files.
ds = ray.data.read_csv(tmp_path)
assert ds.to_pandas().equals(pd.concat([df, df], ignore_index=True))
actual_data = sorted(ds.to_pandas().itertuples(index=False))
expected_data = sorted(pd.concat([df, df]).itertuples(index=False))
assert actual_data == expected_data, (actual_data, expected_data)

# Non-CSV file in Parquet format.
table = pa.Table.from_pandas(df)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/tests/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import ray
from ray.data._internal.datasource.image_datasource import (
ImageDatasource,
_ImageFileMetadataProvider,
ImageFileMetadataProvider,
)
from ray.data.datasource import Partitioning
from ray.data.datasource.file_meta_provider import FastFileMetadataProvider
Expand Down Expand Up @@ -241,7 +241,7 @@ def test_data_size_estimate(
mode=image_mode,
filesystem=LocalFileSystem(),
partitioning=None,
meta_provider=_ImageFileMetadataProvider(),
meta_provider=ImageFileMetadataProvider(),
)
assert (
datasource._encoding_ratio >= expected_ratio
Expand Down