Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32381][CORE][SQL] Move and refactor parallel listing & non-location sensitive listing to core #29471

Closed
wants to merge 30 commits into from

Conversation

sunchao
Copy link
Member

@sunchao sunchao commented Aug 18, 2020

What changes were proposed in this pull request?

This moves and refactors the parallel listing utilities from InMemoryFileIndex to Spark core so it can be reused by modules beside SQL. Along the process this also did some cleanups/refactorings:

  • Created a HadoopFSUtils class under core
  • Moved InMemoryFileIndex.bulkListLeafFiles into HadoopFSUtils.parallelListLeafFiles. It now depends on a SparkContext instead of SparkSession in SQL. Also added a few parameters which used to be read from SparkSession.conf: ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax and filterFun (for additional filtering support but we may be able to merge this with filter parameter in future).
  • Moved InMemoryFileIndex.listLeafFiles into HadoopFSUtils.listLeafFiles with similar changes above.

Why are the changes needed?

Currently the locality-aware parallel listing mechanism only applies to InMemoryFileIndex. By moving this to core, we can potentially reuse the same mechanism for other code paths as well.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Since this is mostly a refactoring, it relies on existing unit tests such as those for InMemoryFileIndex.

@dongjoon-hyun
Copy link
Member

ok to test

@dongjoon-hyun
Copy link
Member

cc @holdenk

@viirya
Copy link
Member

viirya commented Aug 20, 2020

btw, we should better use Spark's PR template. But this is still in WIP, I suppose it will fill the template later.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking up this PR. Let me know when you want me to do a review pass (I see it's still marked as draft).

@SparkQA
Copy link

SparkQA commented Aug 20, 2020

Test build #127705 has finished for PR 29471 at commit 5529047.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sunchao sunchao marked this pull request as ready for review August 20, 2020 23:43
/**
* Utility functions to simplify and speed-up file listing.
*/
@Private
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Private maybe seems a bit too restrictive in scope, what about DeveloperAPI?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing suggested to make this private in the meanwhile and change it after the follow-up PR is done.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I suggested to make this private. I think these APIs are not ready to expose yet. For example, 10 parameters in a method is not user friendly. It's better to design a better API for this.

filter: PathFilter, areSQLRootPaths: Boolean, ignoreMissingFiles: Boolean,
ignoreLocality: Boolean, maxParallelism: Int,
filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = {
HiveCatalogMetrics.incrementParallelListingJobCount(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could pass in a callback for listing metrics?

filter = filter,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isSQLRootPath = areSQLRootPaths,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the is/are is because listLeafFiles takes a single name and this method takes in a list of files.

fs match {
// DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode
// to retrieve the file status with the file block location. The reason to still fallback
// to listStatus is because the default implementation would potentially throw a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think changing this in a follow up PR sounds fine to me. I'd like to us to use the faster method by default and fall back on exception, but that could be a follow on. Want to file a JIRA for broadening the scope of using the faster method?

length: Long)

/** A serializable variant of HDFS's FileStatus. */
private case class SerializableFileStatus(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah a seperate JIRA is best.

@sunchao
Copy link
Member Author

sunchao commented Aug 25, 2020

@HyukjinKwon @gengliangwang @viirya @zsxwing this PR is mostly a refactoring now. Could you take another look? Thanks!

@SparkQA
Copy link

SparkQA commented Aug 28, 2020

Test build #127973 has finished for PR 29471 at commit 86c2013.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @param paths Input paths to list
* @param hadoopConf Hadoop configuration
* @param filter Path filter used to exclude leaf files from result
* @param areSQLRootPaths Whether the input paths are SQL root paths
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add more few words for areSQLRootPaths? Seeing SQL in core is already a bit strange, so it's nicer to let developers can quickly get better idea just from reading doc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is unfortunate. I think this parameter doesn't have to be visible to the callers though as it is set to true on the initial call and false on subsequent recursive calls. We can potentially add another overloaded method without this parameter and make this one private.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea @sunchao

@SparkQA
Copy link

SparkQA commented Aug 30, 2020

Test build #128051 has finished for PR 29471 at commit bfa37cc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@steveloughran
Copy link
Contributor

BTW, wrote something up on listing.
https://github.com/steveloughran/engineering-proposals/blob/trunk/listing-performance.md

anywhere you do listStatus(path): List[FileStatus], switch to listStatusIterator, but, if the returned iterator is Closeable, make sure you close it after. Then I or a someone else will not only add the s3a and abfs speedups (alongside today's HDFS), I'll do the same for the local FS.

Copy link
Contributor

@holdenk holdenk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, if no one objects I'll merge this on Monday so we can unblock the follow on work that folks seem interested in.

filter: PathFilter, areSQLRootPaths: Boolean, ignoreMissingFiles: Boolean,
ignoreLocality: Boolean, maxParallelism: Int,
filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = {
HiveCatalogMetrics.incrementParallelListingJobCount(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, @sunchao want to file a JIRA for switching this to a callback?

ignoreLocality: Boolean,
isRootPath: Boolean,
filterFun: Option[String => Boolean],
parallelismThreshold: Int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a new parameter that did not exist before. Why do we need this? If people want parallelized listing, people can invoke parallelListLeafFiles above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because listLeafFiles also recursively call parallelListLeafFiles inside so we need a way to pass down the arguments. These used to be read from conf in SparkSession but now made explicit as parameters, as we no longer have a session object.

@HyukjinKwon
Copy link
Member

@sunchao Let's update PR title as well. This PR doesn't "expose" something but just moves the codes within the codebase. Also:

Along the process this also did some cleanups/refactorings.

Do you mind clarify the additional diffs this PR introduces? I took a cursory look. Seems there are some diffs above.

@HyukjinKwon
Copy link
Member

Also, let's make sure file s JIRA at #29471 (comment) as @holdenk suggested. I agree with @viirya's comment there.

@sunchao sunchao changed the title [SPARK-32381][CORE][SQL] Explore allowing parallel listing & non-location sensitive listing in core [SPARK-32381][CORE][SQL] Move and refactor parallel listing & non-location sensitive listing to core Sep 14, 2020
@sunchao
Copy link
Member Author

sunchao commented Sep 14, 2020

Thanks @HyukjinKwon and @holdenk for the review! I updated the PR title as well as description. Also created SPARK-32880 for the follow-up work.

@SparkQA
Copy link

SparkQA commented Sep 14, 2020

Test build #128666 has finished for PR 29471 at commit 2d8e64d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sunchao
Copy link
Member Author

sunchao commented Sep 21, 2020

ping @HyukjinKwon @holdenk : anymore comments?

@holdenk
Copy link
Contributor

holdenk commented Sep 23, 2020

Looks like we've reached a lazy consesus here I'll merge this today :)

@holdenk
Copy link
Contributor

holdenk commented Sep 24, 2020

Ok I meant to merge this yesterday but I got distracted with the K8s stuff.

@asfgit asfgit closed this in 8ccfbc1 Sep 24, 2020
@holdenk
Copy link
Contributor

holdenk commented Sep 24, 2020

Merged to the current development branch :)

@sunchao
Copy link
Member Author

sunchao commented Sep 24, 2020

Thanks @holdenk for the merge, and all for the review comments!

asfgit pushed a commit that referenced this pull request Nov 18, 2020
### What changes were proposed in this pull request?

This PR is a follow-up of #29471 and does the following improvements for `HadoopFSUtils`:
1. Removes the extra `filterFun` from the listing API and combines it with the `filter`.
2. Removes `SerializableBlockLocation` and `SerializableFileStatus` given that `BlockLocation` and `FileStatus` are already serializable.
3. Hides the `isRootLevel` flag from the top-level API.

### Why are the changes needed?

Main purpose is to simplify the logic within `HadoopFSUtils` as well as cleanup the API.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit tests (e.g., `FileIndexSuite`)

Closes #29959 from sunchao/hadoop-fs-utils-followup.

Authored-by: Chao Sun <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants