Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Add bulk Parquet file reader API #23179

Merged
merged 2 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
range_arrow,
range_tensor,
read_parquet,
read_parquet_bulk,
read_json,
read_csv,
read_binary_files,
Expand Down Expand Up @@ -59,5 +60,6 @@
"read_json",
"read_numpy",
"read_parquet",
"read_parquet_bulk",
"set_progress_bars",
]
8 changes: 0 additions & 8 deletions python/ray/data/datasource/binary_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,5 @@ def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args):
else:
return [data]

def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
return filesystem.open_input_stream(path, **open_args)

def _rows_per_file(self):
return 1
8 changes: 0 additions & 8 deletions python/ray/data/datasource/csv_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@ def _read_stream(
except StopIteration:
return

def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
return filesystem.open_input_stream(path, **open_args)

def _write_block(
self,
f: "pyarrow.NativeFile",
Expand Down
12 changes: 6 additions & 6 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,14 @@ def _open_input_source(
path: str,
**open_args,
) -> "pyarrow.NativeFile":
"""Opens a source path for reading and returns the associated Arrow
NativeFile.
"""Opens a source path for reading and returns the associated Arrow NativeFile.

This method should be implemented by subclasses.
The default implementation opens the source path as a sequential input stream.

Implementations that do not support streaming reads (e.g. that require random
access) should override this method.
"""
raise NotImplementedError(
"Subclasses of FileBasedDatasource must implement _open_input_source()."
)
return filesystem.open_input_stream(path, **open_args)

def do_write(
self,
Expand Down
8 changes: 0 additions & 8 deletions python/ray/data/datasource/json_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args):
)
return json.read_json(f, read_options=read_options, **reader_args)

def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
return filesystem.open_input_stream(path, **open_args)

def _write_block(
self,
f: "pyarrow.NativeFile",
Expand Down
8 changes: 0 additions & 8 deletions python/ray/data/datasource/numpy_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args):
{"value": TensorArray(np.load(buf, allow_pickle=True))}
)

def _open_input_source(
self,
filesystem: "pyarrow.fs.FileSystem",
path: str,
**open_args,
) -> "pyarrow.NativeFile":
return filesystem.open_input_stream(path, **open_args)

def _write_block(
self,
f: "pyarrow.NativeFile",
Expand Down
155 changes: 130 additions & 25 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@
ReadTask,
BaseFileMetadataProvider,
DefaultFileMetadataProvider,
FastFileMetadataProvider,
ParquetMetadataProvider,
DefaultParquetMetadataProvider,
PathPartitionFilter,
ParquetBaseDatasource,
)
from ray.data.datasource.file_based_datasource import (
_wrap_arrow_serialization_workaround,
Expand Down Expand Up @@ -307,42 +309,110 @@ def read_parquet(
Returns:
Dataset holding Arrow records read from the specified paths.
"""
if tensor_column_schema is not None:
existing_block_udf = arrow_parquet_args.pop("_block_udf", None)
arrow_parquet_args = _resolve_parquet_args(
tensor_column_schema,
**arrow_parquet_args,
)
return read_datasource(
ParquetDatasource(),
parallelism=parallelism,
paths=paths,
filesystem=filesystem,
columns=columns,
ray_remote_args=ray_remote_args,
meta_provider=meta_provider,
**arrow_parquet_args,
)

def _block_udf(block: "pyarrow.Table") -> "pyarrow.Table":
from ray.data.extensions import ArrowTensorArray

for tensor_col_name, (dtype, shape) in tensor_column_schema.items():
# NOTE(Clark): We use NumPy to consolidate these potentially
# non-contiguous buffers, and to do buffer bookkeeping in
# general.
np_col = np.array(
[
np.ndarray(shape, buffer=buf.as_buffer(), dtype=dtype)
for buf in block.column(tensor_col_name)
]
)
@PublicAPI
def read_parquet_bulk(
paths: Union[str, List[str]],
*,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
columns: Optional[List[str]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder do we perform the column projection at the s3 (maybe with file offset), or have to fetch to local first?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jianoaix I believe that column projection will happen via Arrow's pq.read_table() API, where the column selection is passed through to that call when using this bulk Parquet API.

parallelism: int = 200,
ray_remote_args: Dict[str, Any] = None,
arrow_open_file_args: Optional[Dict[str, Any]] = None,
tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None,
meta_provider: BaseFileMetadataProvider = FastFileMetadataProvider(),
partition_filter: PathPartitionFilter = None,
**arrow_parquet_args,
) -> Dataset[ArrowRow]:
"""Create an Arrow dataset from a large number (e.g. >1K) of parquet files quickly.

block = block.set_column(
block._ensure_integer_index(tensor_col_name),
tensor_col_name,
ArrowTensorArray.from_numpy(np_col),
)
if existing_block_udf is not None:
# Apply UDF after casting the tensor columns.
block = existing_block_udf(block)
return block
By default, ONLY file paths should be provided as input (i.e. no directory paths),
and an OSError will be raised if one or more paths point to directories. If your
use-case requires directory paths, then the metadata provider should be changed to
one that supports directory expansion (e.g. DefaultFileMetadataProvider).

arrow_parquet_args["_block_udf"] = _block_udf
Offers improved performance vs. `read_parquet` due to not using PyArrow's
`ParquetDataset` abstraction, whose latency scales linearly with the number of
input files due to collecting all file metadata on a single node.

Also supports a wider variety of input Parquet file types than `read_parquet` due
to not trying to merge and resolve a unified schema for all files.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will bubble up to and break the schema semantic of Dataset API right? That is, Dataset.schema() undefined.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this uses the base prepare_read() method from FileBasedDatasource, so the schema remains undefined by default (just like with CSV/NumPy/JSON/Binary datasources). If known, the schema can be added by either specifying it as a keyword argument and/or via a custom metadata provider.


However, unlike `read_parquet`, this does not offer file metadata resolution by
default, so a custom metadata provider should be provided if your use-case requires
a unified dataset schema, block sizes, row counts, etc.

Examples:
>>> # Read multiple local files. You should always provide only input file
>>> # paths (i.e. no directory paths) when known to minimize read latency.
>>> ray.data.read_parquet_bulk(["/path/to/file1", "/path/to/file2"])

>>> # Read a directory of files in remote storage. Caution should be taken
>>> # when providing directory paths, since the time to both check each path
>>> # type and expand its contents may result in greatly increased latency
>>> # and/or request rate throttling from cloud storage service providers.
>>> ray.data.read_parquet_bulk(
>>> "s3://bucket/path",
>>> meta_provider=DefaultFileMetadataProvider(),
>>> )

Args:
paths: A single file path or a list of file paths. If one or more directories
are provided, then `meta_provider` should also be set to an implementation
that supports directory expansion (e.g. DefaultFileMetadataProvider).
filesystem: The filesystem implementation to read from.
columns: A list of column names to read.
parallelism: The requested parallelism of the read. Parallelism may be
limited by the number of files of the dataset.
ray_remote_args: kwargs passed to ray.remote in the read tasks.
arrow_open_file_args: kwargs passed to
pyarrow.fs.FileSystem.open_input_file
tensor_column_schema: A dict of column name --> tensor dtype and shape
mappings for converting a Parquet column containing serialized
tensors (ndarrays) as their elements to our tensor column extension
type. This assumes that the tensors were serialized in the raw
NumPy array format in C-contiguous order (e.g. via
`arr.tobytes()`).
meta_provider: File metadata provider. Defaults to a fast file metadata
provider that skips file size collection and requires all input paths to be
files. Change to DefaultFileMetadataProvider or a custom metadata provider
if directory expansion and/or file metadata resolution is required.
partition_filter: Path-based partition filter, if any. Can be used
with a custom callback to read only selected partitions of a dataset.
arrow_parquet_args: Other parquet read options to pass to pyarrow.

Returns:
Dataset holding Arrow records read from the specified paths.
"""
arrow_parquet_args = _resolve_parquet_args(
tensor_column_schema,
**arrow_parquet_args,
)
return read_datasource(
ParquetDatasource(),
ParquetBaseDatasource(),
parallelism=parallelism,
paths=paths,
filesystem=filesystem,
columns=columns,
ray_remote_args=ray_remote_args,
open_stream_args=arrow_open_file_args,
meta_provider=meta_provider,
partition_filter=partition_filter,
**arrow_parquet_args,
)

Expand Down Expand Up @@ -932,3 +1002,38 @@ def _prepare_read(
kwargs = _unwrap_arrow_serialization_workaround(kwargs)
DatasetContext._set_current(ctx)
return ds.prepare_read(parallelism, **kwargs)


def _resolve_parquet_args(
tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None,
**arrow_parquet_args,
) -> Dict[str, Any]:
if tensor_column_schema is not None:
existing_block_udf = arrow_parquet_args.pop("_block_udf", None)

def _block_udf(block: "pyarrow.Table") -> "pyarrow.Table":
from ray.data.extensions import ArrowTensorArray

for tensor_col_name, (dtype, shape) in tensor_column_schema.items():
# NOTE(Clark): We use NumPy to consolidate these potentially
# non-contiguous buffers, and to do buffer bookkeeping in
# general.
np_col = np.array(
[
np.ndarray(shape, buffer=buf.as_buffer(), dtype=dtype)
for buf in block.column(tensor_col_name)
]
)

block = block.set_column(
block._ensure_integer_index(tensor_col_name),
tensor_col_name,
ArrowTensorArray.from_numpy(np_col),
)
if existing_block_udf is not None:
# Apply UDF after casting the tensor columns.
block = existing_block_udf(block)
return block

arrow_parquet_args["_block_udf"] = _block_udf
return arrow_parquet_args
Loading