Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Add Path Partitioning Support for All Content Types #23624

Merged
merged 10 commits into from
Apr 22, 2022

Conversation

pdames
Copy link
Member

@pdames pdames commented Mar 31, 2022

Why are these changes needed?

Adds a content-type-agnostic partition parser with support for filtering files. Also adds some corner-case bug fixes and usability improvements for supporting more robust input path types. This is the first PR of a series originally proposed in #23179.

The primary difference from #23179 is that this PR (1) only includes changes related to path-based partitioning and extended input path type support, (2) includes unit tests for path-based partitioning and corner-case path type support, and (3) refactors the single PathPartitioning class into PathPartitionBase, PathPartitionGenerator, and PathPartitionParser.

Related issue number

Partially resolves #22910.

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@pdames
Copy link
Member Author

pdames commented Apr 4, 2022

@jianoaix @ericl @clarkzinzow This should be ready for review - it doesn't look like the failing tests are related.

@ericl
Copy link
Contributor

ericl commented Apr 4, 2022

@clarkzinzow @jianoaix can you review?

@ericl ericl removed their assignment Apr 4, 2022
Copy link
Contributor

@jianoaix jianoaix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Patrick for splitting the PR and adding unit tests!
Overall looking good to me, just some small comments.

python/ray/data/datasource/partitioning.py Outdated Show resolved Hide resolved
paths: List[str],
filesystem: "pyarrow.fs.FileSystem",
) -> List[str]:
"""Removes all paths that don't pass this partition scheme's partition filter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one-liner reads like it's going to mutate the state of this class. Maybe just say "Returns ...."

@@ -114,7 +117,7 @@ def _get_write_path_for_block(
# Use forward slashes for cross-filesystem compatibility, since PyArrow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update to reflect this is choosing posix format for cross-fs compability?

@@ -93,7 +110,116 @@ def _get_write_path_for_block(
suffix = (
f"{block_index:06}_{num_rows:02}_{dataset_uuid}" f".test.{file_format}"
)
print(f"Writing to: {base_path}/{suffix}")
return f"{base_path}/{suffix}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use posix util?

python/ray/data/datasource/partitioning.py Show resolved Hide resolved
python/ray/data/datasource/partitioning.py Outdated Show resolved Hide resolved
@pdames
Copy link
Member Author

pdames commented Apr 8, 2022

Hmm, seems like the latest change to just python/ray/data/datasource/partitioning.py didn't trigger the CI datasets tests. Do we need to change a CI script somewhere to ensure that changes to this file automatically run dataset tests?

@clarkzinzow
Copy link
Contributor

@pdames CI was broken yesterday due to a TypeScript dependency change causing the dashboard compilation to fail, if you rebase onto latest master it should be fixed!

@pdames
Copy link
Member Author

pdames commented Apr 11, 2022

@pdames CI was broken yesterday due to a TypeScript dependency change causing the dashboard compilation to fail, if you rebase onto latest master it should be fixed!

Tests are passing after rebase. Failing checks appear to be unrelated.

@pdames
Copy link
Member Author

pdames commented Apr 13, 2022

@jianoaix @clarkzinzow This should be ready for a final review.

@jianoaix
Copy link
Contributor

@jianoaix @clarkzinzow This should be ready for a final review.

Thank you for your patience. I'll get to it soon after 1.12 release (last push for it now, should happen these 1-2 days).

def _assert_base_partitioned_ds(
ds,
count=6,
input_files=2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: num_input_files

assert path_partition_generator.normalized_base_dir is None
partition_values = ["1", "2"]
partition_path = path_partition_generator(partition_values, fs)
assert path_partition_generator.normalized_base_dir is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this assert its actual content?

)
from ray.data.tests.conftest import * # noqa


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding unit tests, the coverage looks good!

"""Gets the partition key field names."""
return self._field_names

def _normalize_base_dir(self, filesystem: "pyarrow.fs.FileSystem"):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it make more sense to have filesystem in the constructor?
It looks a path or partition is always in the context of a filesystem. There is no need to use the same PathPartitionBase object for different filesystems?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed - I've moved the filesystem to the constructor. We just have to make sure that FileBasedDatasource and PathPartitionScheme keep their filesystem resolution methods in-sync going forward. Passing the filesystem in post-construction made more sense in an earlier iteration when filesystem resolution wasn't built into this class.



@DeveloperAPI
class PathPartitionGenerator(PathPartitionBase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of roles for these 3 classes is that

  • PathPartitionBase (or its subclass - do we need that?) describes a path-based partition
  • PathPartitionGenerator encodes the path-based partition (into string): so it seems not a IS-A relationship, hence not a good fit to model as a subclass
  • PathPartitionParser decodes a path-based partition (out of string), which can then be used for use cases like filtering: so similar it's not a IS-A relationship
    So it looks we should model them with composition not inheritance. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can discuss this, but developing on top of the above, I think we can create abstractions and their roles:

  • PathPartition: describes a path-based partition
  • PathPartitionEncoder: encodes the path-based partition into string, so we can pass it around and use it in writing out Dataset to file systems
  • PathPartitionParser (or PathPartitionDecoder): decodes a path-based partition out of string, so we can easily access each fields/values, and use them for e.g. filtering
  • PathPartitionSelector (or PathPartitionFilter): given the PathPartition, the selector/filter function, and the field/values, it produces a subset of partitions (to read into Dataset)

Btw, we can have an offline meeting if this is a bit complex or inefficient to discuss over GitHub :)

Copy link
Member Author

@pdames pdames Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely like the cleaner separation of responsibilities that composition provides vs. inheritance. Let me know what you think of the latest refactor here. One naming difference is that I explicitly renamed PathPartitionBase to PathPartitionScheme instead of PathPartition, since I felt like the latter gave the impression that it was only referring to one partition, when in fact it holds the spec for an arbitrarily large number of partitions.

One thing I didn't like initially was that constructing something like a PathPartitionFilter required first constructing a PathPartitionParser and a PathPartitionScheme, but I decided to strike a balance here with static factories that provide alternate constructors with flattened arguments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! For the naming, the PathPartitionScheme looks great!

What do you think making PathPartitionFilter as a function v.s. as a class?

)
self._filter_fn = filter_fn

def filter_paths(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking more about abstractions we are building here, how about we separate this out from Parser as a Filter?
I mean whether this will make sense depends on what you think about the roles of those classes as mentioned above :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've separated the filter out from the parser in the latest revision. One tangential benefit is the ability to now test the path partition parser independent of partition filtering.

Copy link
Contributor

@jianoaix jianoaix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making the change, looks nice!

)
return {field_names[i]: d for i, d in enumerate(dirs)} if dirs else {}

def __call__(self, path: str) -> Dict[str, str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: move this up before private methods and other public methods.

)
return self._encoder_fn(values)

def __call__(self, partition_values: List[str]) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move magic method up - I think it's the core of this class.

) -> "PathPartitionParser":
"""Creates a path-based partition parser using a flattened argument list.

Args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add "filesystem" arg to this section?

self._filter_fn = filter_fn

@property
def parser(self) -> PathPartitionParser:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose parser? It looks never used; if so, can we remove this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's currently only used by tests, but I'd expect it to also be useful to anyone constructing a filter since it provides the only path back to the properties of the underlying partition scheme (e.g. via filter.parser.scheme.* to retrieve the base directory, partition field names, etc.).

) -> "PathPartitionFilter":
"""Creates a path-based partition filter using a flattened argument list.

Args:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add 'filesystem' to Args as well here.

Copy link
Contributor

@jianoaix jianoaix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, LGTM

Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! @pdames awesome tests and documentation, and @jianoaix great reviewing! 🙌

return parsed.netloc + parsed.path
parsed = urllib.parse.urlparse(path, allow_fragments=False) # support '#' in path
query = "?" + parsed.query if parsed.query else "" # support '?' in path
return parsed.netloc + parsed.path + query
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@clarkzinzow
Copy link
Contributor

Test failures appear to be unrelated, merging.

@clarkzinzow clarkzinzow merged commit 9f4cb9b into ray-project:master Apr 22, 2022
clarkzinzow pushed a commit that referenced this pull request Apr 29, 2022
…ource (#24094)

Adds a fast file metadata provider that trades comprehensive file metadata collection for speed of metadata collection, and which also disabled directory path expansion which can be very slow on some cloud storage service providers. This PR also refactors the Parquet datasource to be able to take advantage of both these changes and the content-type agnostic partitioning support from #23624.

This is the second PR of a series originally proposed in #23179.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Ray dataset loading large list of parquet files is extremely slow
4 participants