-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Datasets] Add bulk Parquet file reader API #23179
Conversation
Very exciting! @clarkzinzow @naijoaix want to help shepherd this? |
partition filter is defined, then each unpartitioned file will be passed | ||
in as an empty dictionary. | ||
|
||
For Directory Partitioning, all directories under the base directory will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, is this a common partitioning style? If we only support the first (like Spark does) it may simplify the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is relatively common, but not as ubiquitous as hive-style partitioning. CloudTrail and Kinesis Firehose are two notable AWS service examples that use directory partitioning, so this is useful for parsing/filtering partitioned datasets that they produce.
self, | ||
paths: List[str], | ||
filesystem: "pyarrow.fs.FileSystem", | ||
) -> List[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pdames this looks pretty good at a high level. Any major items missing beyond tests?
I just updated the code to a version that passed a small integration test suite parsing and filtering hive-style partitioned datasets exported from Redshift. I think it's feature-complete at this point, so I'll start working on some associated unit tests. |
Based on the 2K tiny file benchmarks I just ran, the good news is that (1) the partition filter adds little-to-no overhead (and appropriately reduces overall read latency when filtering out files) and (2) the The bad news is that, without a custom metadata provider to skip path expansion (when only file paths are given), |
Hmm, so the issue is that if you provide a list of files, we're needing to double check that each file is not a directory? I guess maybe indeed we should make path expansion a flag that we can turn off, maybe even logging a warning if the user is passing a list and has expansion on. |
So, instead of providing a boolean flag to skip path expansion, I created a default The end results are looking pretty good here, with ~80% performance improvement for 2K files vs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you Patric for making this improvement!
python/ray/data/read_api.py
Outdated
block = existing_block_udf(block) | ||
return block | ||
By default, ONLY file paths should be provided as input (i.e. no directory paths). | ||
If your use-case requires directory paths, then the metadata provider should be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you document what will happen if users do feed this with directories, crash or erroring? (I think we should avoid crash in general)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An OSError (identical to the OSError for a non-existent file path) is raised currently - I've added this to the documentation.
input files due to collecting all file metadata on a single node. | ||
|
||
Also supports a wider variety of input Parquet file types than `read_parquet` due | ||
to not trying to merge and resolve a unified schema for all files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will bubble up to and break the schema semantic of Dataset API right? That is, Dataset.schema() undefined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this uses the base prepare_read()
method from FileBasedDatasource
, so the schema remains undefined by default (just like with CSV/NumPy/JSON/Binary datasources). If known, the schema can be added by either specifying it as a keyword argument and/or via a custom metadata provider.
partition path of the form "{value1}/{value2}/..." or an empty dictionary for | ||
unpartitioned files. | ||
|
||
Requires a corresponding ordered list of partition key field names to map the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra space
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed!
|
||
|
||
@DeveloperAPI | ||
class PathPartitioning: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be more accurate to call this a filter (e.g. PathPartitionFilter)?
From my reading of code, the role of this class is to filter paths based on provided filter function, rather than partitioning a bunch of paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking here is that this class is really a path-based partition parser that currently only exposes a filter to end-users. Ideally, it should be extended with methods to cover additional tasks in subsequent PRs like (1) group(paths: List[str]) -> Dict[List[Tuple[str, str]], List[str]]
to group paths by partition (e.g. to facilitate a partitioned dataset read), (2) partition_dir(base_dir: str, partition_values: List[str]) -> str
to find the correct directory for a list of partition values (e.g. for a partitioned dataset block write), (3) convert(paths: List[str], from: PartitionStyle, to: PartitionStyle)
to convert a list of paths between partition styles, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree this is extendable. Shall we name it so, e.g. PathPartitionParser, or PathPartitionProcessor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PathPartitionParser
sounds good to me.
return paths, np.empty(len(paths), dtype=object) | ||
|
||
|
||
class ParquetFastDatasource(FileBasedDatasource): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is too generic, e.g. maybe there are other ways to optimize file reading in the future. Shall we make it more specific (e.g. s/Fast/Directoryless/g, or s/Fast/DirectFile/g)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some similar thoughts. However, the class itself supports directories, so I'm not sure about either Directoryless
or DirectFile
. It's just the FastFileMetadataProvider
default in the read_parquet_fast()
API that alters it to not handle directories.
ParquetNoMetaDatasource
was one thing I was thinking of, since not overriding prepare_read()
means it's not populated with verbose metadata, but I can also imagine this changing in the future.
I think it may be best to just call this something like ParquetBasicDatasource
to indicate that it is a minimal implementation that reuses as much code as possible from FileBasedDatasource
.
As far as read_parquet_fast()
API naming goes, I've been thinking that read_parquet_bulk()
would be more accurate, since the current implementation offers little to no performance benefits when reading a small number of files (esp. if each file is relatively large), but I expect it to continue to offer long-term benefits for bulk reads of a large number of files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My last commit changes read_parquet_fast()
to read_parquet_bulk()
, ParquetFastDatasource
to ParquetBaseDatasource
, and then has ParquetDatasource
inherit from ParquetBaseDatasource
since they share the same file format and write path.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
@@ -217,6 +220,11 @@ def expand_paths( | |||
) -> Tuple[List[str], List[Optional[int]]]: | |||
from pyarrow.fs import FileType | |||
|
|||
logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should log this unconditionally: the input paths may contain directory so it's not appropriate to suggest FastFileMetadataProvider. How about commenting the class that if users are dealing with only files, they should use FastFileMetadataProvider?
@@ -217,6 +220,11 @@ def expand_paths( | |||
) -> Tuple[List[str], List[Optional[int]]]: | |||
from pyarrow.fs import FileType | |||
|
|||
logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.warning( | |
if len(paths) > 1: | |
logger.warning( |
This should hide the warning for the common case, where a single directory is passed as the path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option that I was considering here is to only log this warning after a fixed amount of time has elapsed expanding directory paths (e.g. if we've been expanding directories for more than 15 seconds, then log this warning).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, it seems fine to keep this simple since this is a precaution.
@@ -217,6 +220,11 @@ def expand_paths( | |||
) -> Tuple[List[str], List[Optional[int]]]: | |||
from pyarrow.fs import FileType | |||
|
|||
logger.warning( | |||
f"Expanding {len(paths)} path(s). This may be a HIGH LATENCY operation " | |||
f"on some cloud storage services. If faster reads are required, try " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
f"on some cloud storage services. If faster reads are required, try " | |
f"on some cloud storage services. If the specified paths all point to files and never directories, use " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding a big PR and then a follow-up big unit test PR, would it be possible to shard this PR horizontally and add in corresponding unit tests for each smaller PRs?
E.g. 1) partitioning.py etc.; 2) parque_base_datasource.py etc; 3) integration into read_api.py.
|
||
|
||
@DeveloperAPI | ||
class PathPartitioning: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree this is extendable. Shall we name it so, e.g. PathPartitionParser, or PathPartitionProcessor?
Sounds good to me. Now that we understand the total scope of changes from this PR, I can start opening up smaller PRs and linking them back to this PR and the parent issue that we're working on. I think the proposed ordering of PRs given above also makes sense. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for addressing comments, LGTM once it's split into smaller PRs with unit tests.
Rebased on top of #23624 to ensure end-to-end compatibility of that PR with upcoming PRs. No need to review here. |
fa8793f
to
2b5873d
Compare
c1d0229
to
0bc35f1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The delta part looks good. Will take another look after second PR merged.
|
||
# Expect directory path expansion to fail. | ||
with pytest.raises(OSError): | ||
ray.data.read_parquet_bulk(data_path, filesystem=fs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a test for reading directory with right metadata provider, so it should have no issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test_parquet_read_bulk_meta_provider
for this in my latest commit.
…ource (#24094) Adds a fast file metadata provider that trades comprehensive file metadata collection for speed of metadata collection, and which also disabled directory path expansion which can be very slow on some cloud storage service providers. This PR also refactors the Parquet datasource to be able to take advantage of both these changes and the content-type agnostic partitioning support from #23624. This is the second PR of a series originally proposed in #23179.
@jianoaix @clarkzinzow Rebased on top of the latest from master and ready for final review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
paths: Union[str, List[str]], | ||
*, | ||
filesystem: Optional["pyarrow.fs.FileSystem"] = None, | ||
columns: Optional[List[str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder do we perform the column projection at the s3 (maybe with file offset), or have to fetch to local first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Datasets tests look good, merging! Great work @pdames! 🙌 |
Why are these changes needed?
Adds a reader suitable for quickly reading a large number (e.g. 1-100K+) of Parquet files into a Ray Dataset from either local disk or cloud storage.
Latest Benchmarks:
read_parquet_fast()
- default fast metadata provider - parallel 200)read_parquet()
- default parquet metadata provider - parallel 200)read_parquet_fast()
- default fast metadata provider - parallel 160)read_parquet_fast()
- default fast metadata provider - parallel 320)read_parquet_fast()
- default fast metadata provider - parallel 640)read_parquet_fast()
- default fast metadata provider - parallel 1280)read_parquet_fast()
- default fast metadata provider - parallel 1600)read_parquet_fast()
- default fast metadata provider - parallel 2560)read_parquet_fast()
- default fast metadata provider - parallel 320)read_parquet_fast()
- default fast metadata provider - parallel 640)read_parquet_fast()
- default fast metadata provider - parallel 1280)read_parquet_fast()
- default fast metadata provider - parallel 1280)read_parquet_fast()
- default fast metadata provider - parallel 180)read_parquet_fast()
- default fast metadata provider - parallel 360)read_parquet_fast()
- default fast metadata provider - parallel 720)read_parquet_fast()
- default fast metadata provider - parallel 1440)read_parquet_fast()
- default fast metadata provider - parallel 2880)read_parquet_fast()
- default fast metadata provider - parallel 1440)read_parquet_fast()
- default fast metadata provider - parallel 2880)Old Benchmarks:
read_parquet()
not benchmarked due to ParquetDataset merge failure on indexed partition columns: https://issues.apache.org/jira/browse/ARROW-13851read_parquet_fast()
- cached parquet metadata provider - expand paths skipped - no partition filter)read_parquet_fast()
- cached parquet metadata provider - expand paths skipped - partition filter keeping 1/7 files -partition_filter_fn=lambda d: d["day"] == "MO"
)read_parquet_fast()
- cached parquet metadata provider - expand paths skipped - partition filter keeping all files -partition_filter_fn=lambda d: d["day"] != "NA"
)read_parquet_fast()
- cached parquet metadata provider - expand paths skipped)read_parquet()
- cached parquet metadata provider - expand paths skipped)read_parquet()
)read_parquet_fast()
)Related issue number
Closes #22910
Checks
scripts/format.sh
to lint the changes in this PR.