Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32381][CORE][SQL] Move and refactor parallel listing & non-location sensitive listing to core #29471

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2d6add1
Start working on re-implementing the resolution logic to avoid gettin…
holdenk Jul 2, 2020
02ea25d
Revert "Start working on re-implementing the resolution logic to avoi…
holdenk Jul 2, 2020
fded394
Start moving InMemoryFileIndex over to HadoopFSUtils
holdenk Jul 2, 2020
f2fdcd7
Get the SQL layer compiling against the common shim layer
holdenk Jul 6, 2020
37253da
Keep working on plumbing through the type info we need to avoid direc…
holdenk Jul 6, 2020
e6eee1d
Ok core compiles now
holdenk Jul 8, 2020
0492bee
Get the input filter based on the jobContext
holdenk Jul 8, 2020
138e14a
Backout some small changes we don't need anymore
holdenk Jul 8, 2020
dace630
Revert the class change to BinaryFileRDD we don't depend on that anym…
holdenk Jul 14, 2020
20586d3
Fix dropped annotation
holdenk Jul 14, 2020
7bb0770
Back out NewHadoopRDD changes, we'll expose a trait instead and have …
holdenk Jul 29, 2020
4eb770a
Rework the HadoopFSUtils to: use the serilizablility of block locatio…
holdenk Jul 29, 2020
8a5fd8b
Fix the bug and remove default params so it's more difficult to write
holdenk Jul 29, 2020
d85c5a4
Remove un-used changes to NewHadoopRDD
holdenk Aug 3, 2020
3b0bf18
Put the utility function in HadoopFSUtils to make it easier.
holdenk Aug 4, 2020
8b7a2fa
Fix NPE
sunchao Aug 19, 2020
e0ec9a6
Defaults ignoreLocality to false and small cleanups
sunchao Aug 19, 2020
b1bf5e9
Don't use null as BlockLocations
sunchao Aug 20, 2020
5529047
Revert changes on WholeTextInputFileFormat
sunchao Aug 20, 2020
6f5c7e5
Remove unused import
sunchao Aug 20, 2020
43fc5a0
Address comments and limit this to only refactoring
sunchao Aug 21, 2020
1f9bbbc
Remove Hive metrics call and nits
sunchao Aug 21, 2020
0ab104d
Make checkstyle happy
sunchao Aug 21, 2020
2186a66
Fix checkstyle issue (for real) and change to Private API
sunchao Aug 21, 2020
549f335
Fix unidoc
sunchao Aug 22, 2020
2b1aacd
Bring back parallel listing metrics
sunchao Aug 22, 2020
f5e9581
Fix style issue again (sigh)
sunchao Aug 22, 2020
86c2013
Switch to private[spark]
sunchao Aug 28, 2020
bfa37cc
Address comments
sunchao Aug 30, 2020
2d8e64d
Remove redundant .toSeq
sunchao Sep 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,40 @@ package object config {
.booleanConf
.createWithDefault(false)

val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
sunchao marked this conversation as resolved.
Show resolved Hide resolved
ConfigBuilder("spark.rdd.sources.parallelPartitionDiscovery.threshold")
.doc("The maximum number of paths allowed for listing files at driver side. If the number " +
"of detected paths exceeds this value during partition discovery, it tries to list the " +
"files with another Spark distributed job. This configuration is effective only when " +
"using file-based using HadoopFSUtils")
.version("1.5.0")
sunchao marked this conversation as resolved.
Show resolved Hide resolved
.intConf
.checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " +
"files at driver side must not be negative")
.createWithDefault(2)
sunchao marked this conversation as resolved.
Show resolved Hide resolved

val PARALLEL_PARTITION_DISCOVERY_PARALLELISM =
ConfigBuilder("spark.rdd.sources.parallelPartitionDiscovery.parallelism")
.doc("The number of parallelism to list a collection of path recursively, Set the " +
"number to prevent file listing from generating too many tasks.")
.version("3.1.0")
.internal()
.intConf
.createWithDefault(10000)

val IGNORE_DATA_LOCALITY =
ConfigBuilder("spark.rdd.sources.ignoreDataLocality")
.doc("If true, Spark will not fetch the block locations for each file on " +
"listing files. This speeds up file listing, but the scheduler cannot " +
"schedule tasks to take advantage of data locality. It can be particularly " +
"useful if data is read from a remote cluster so the scheduler could never " +
"take advantage of locality anyway. This configuration is effective only when " +
"using file-based using HadoopFSUtils")
.version("3.1.0")
.internal()
.booleanConf
.createWithDefault(false)

private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
.internal()
Expand Down
259 changes: 259 additions & 0 deletions core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import java.io.FileNotFoundException

import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.viewfs.ViewFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics

/**
* :: DeveloperApi ::
* Utility functions to simplify and speed-up file listing.
*/
@DeveloperApi
object HadoopFSUtils extends Logging {
sunchao marked this conversation as resolved.
Show resolved Hide resolved
/**
* :: DeveloperApi ::
* Lists a collection of paths recursively. Picks the listing strategy adaptively depending
* on the number of paths to list.
*
* This may only be called on the driver.
sunchao marked this conversation as resolved.
Show resolved Hide resolved
*
* @return for each input path, the set of discovered files for the path
*/
@DeveloperApi
sunchao marked this conversation as resolved.
Show resolved Hide resolved
def parallelListLeafFiles(sc: SparkContext, paths: Seq[Path], hadoopConf: Configuration,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you target to expose this to Java users as well, we shouldn't use Scala friendly collections like Seq or Option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we need to pass SparkContext into this method, why not add it to SparkContext and return an RDD instead? Then the user can use RDD APIs to do other work in parallel, for example, add a filter to avoid pulling all paths to the driver.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'll address this in a separate PR.

I'm not sure about adding this to RDD API though: the things done are very specific, such as handling FileNotFoundException, dealing with locality, threshold on whether to enable/disable parallelism etc. It is composed of basic API calls like parallelize and mapPartitions, and I'm not sure which part of this is general enough to be extracted as a separate RDD API.

sunchao marked this conversation as resolved.
Show resolved Hide resolved
filter: PathFilter, areSQLRootPaths: Boolean, ignoreMissingFiles: Boolean,
ignoreLocality: Boolean, maxParallelism: Int,
filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow the above comment https://github.com/apache/spark/pull/29471/files#r474382279, I am not sure if String => Boolean is Java-friendly? If not consider to expose to Java users, seems okay.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'll limit the current PR to refactoring and address this in a later PR.

HiveCatalogMetrics.incrementParallelListingJobCount(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put HiveCatalogMetrics here looks strange. This is a "Metrics for access to the hive external catalog". Should we skip this or create another metrics?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. It seems inappropriate here. Not sure what is the best way to plugin Hive/SQL metrics here but I'll think over it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could pass in a callback for listing metrics?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or call this HiveCatalogMetrics.incrementParallelListingJobCount(1) in SQL side before calling parallelListLeafFiles?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the problem with that is that if we have a root directory with many sub-directories in it, we may initially choose to do non-parallel listing and then as the sub directories build up switch to parallel listing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah move this to SQL will not work - I think we can perhaps add a callback later just like @holdenk suggested (if you don't mind leaving it here in this PR).

On the other hand, I think the HiveCatalogMetrics is already misleading - InMemoryFileIndex is used by non-Hive data sources like file-based ones and this particular metric itself has nothing to do with Hive catalog. Perhaps a better approach is to move the metrics to some place in core that are for storage-specific things. In future we can include more such as listing total time etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A callback sounds good. We can do it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, @sunchao want to file a JIRA for switching this to a callback?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure filed https://issues.apache.org/jira/browse/SPARK-32880 for the follow-ups.


val serializableConfiguration = new SerializableConfiguration(hadoopConf)
sunchao marked this conversation as resolved.
Show resolved Hide resolved
val serializedPaths = paths.map(_.toString)

// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(paths.size, maxParallelism)

val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
val statusMap = try {
val description = paths.size match {
case 0 =>
s"Listing leaf files and directories 0 paths"
sunchao marked this conversation as resolved.
Show resolved Hide resolved
case 1 =>
s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
case s =>
s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..."
}
sc.setJobDescription(description) // TODO(holden): should we use jobgroup?
sunchao marked this conversation as resolved.
Show resolved Hide resolved
sc
.parallelize(serializedPaths, numParallelism)
.mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
pathStrings.map(new Path(_)).toSeq.map { path =>
val leafFiles = listLeafFiles(
contextOpt = None, // Can't execute parallel scans on workers
path = path,
hadoopConf = hadoopConf,
filter = filter,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isSQLRootPath = areSQLRootPaths,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we have a consistent name here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from the current impl in InMemoryFileIndex. I can resolve this in a follow-up PR if you like (as this PR is limited to refactoring).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the is/are is because listLeafFiles takes a single name and this method takes in a list of files.

filterFun = filterFun,
parallelismThreshold = Int.MaxValue,
maxParallelism = 0)
(path, leafFiles)
}.iterator
}.collect() // TODO(holden): should we use local itr here?
sunchao marked this conversation as resolved.
Show resolved Hide resolved
} finally {
sc.setJobDescription(previousJobDescription)
}

statusMap.toSeq
}
/**
* :: DeveloperApi ::
* Lists a single filesystem path recursively. If a Sparkcontext object is specified, this
* function may launch Spark jobs to parallelize listing based on parallelismThreshold.
sunchao marked this conversation as resolved.
Show resolved Hide resolved
*
* If sessionOpt is None, this may be called on executors.
*
* @return all children of path that match the specified filter.
*/
// scalastyle:off argcount
@DeveloperApi
def listLeafFiles(
path: Path,
hadoopConf: Configuration,
filter: PathFilter,
contextOpt: Option[SparkContext],
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
isSQLRootPath: Boolean,
filterFun: Option[String => Boolean],
parallelismThreshold: Int,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a new parameter that did not exist before. Why do we need this? If people want parallelized listing, people can invoke parallelListLeafFiles above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because listLeafFiles also recursively call parallelListLeafFiles inside so we need a way to pass down the arguments. These used to be read from conf in SparkSession but now made explicit as parameters, as we no longer have a session object.

maxParallelism: Int): Seq[FileStatus] = {
// scalastyle:on argcount

logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)

// Note that statuses only include FileStatus for the files and dirs directly under path,
// and does not include anything else recursively.
val statuses: Array[FileStatus] = try {
fs match {
// DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode
// to retrieve the file status with the file block location. The reason to still fallback
// to listStatus is because the default implementation would potentially throw a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd question that -and its much better to use the incremental listLocatedStatus for better performance on paged object store listings, especially if you can do useful stuff while it takes place

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add S3AFileSystem (in a followup) and perhaps a few others here since they also support pagination. The fallback part of the comments confuses me since I don't really see a fallback here for DistributedFileSystem etc.

Copy link
Member Author

@sunchao sunchao Aug 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do you think we can add something to CommonPathCapabilities for this? so that we don't have to enumerate all these here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just switch to the incremental one everywhere

HDFS likes it because when you have many, many files in a path they can release the lock on the NN; for object stores they always have to page in (s3, azure, GCS)....which gives the implementors an opportunity to move from the default implementation to exposing the incremental one. Add this and we can just let the relevant teams know.

w.r.t CommonPathCapabilities, I suppose we could add one which declares that the listing ops are paged. But do you really want code to try and be that clever? I'm trying to use that feature to mark up

  • where optional features will fail with UnimplementedException or similar without you having to try using them (append, truncate, xattrs)
  • where there are fundamental semantics its important for algorithms to worry about (hflush)
  • where your store wants to expose some state to callers (e.g. s3a exposing its HADOOP-13230 dir marker policy)

If you want a performance option, we could add one.

BTW, PathCapabilities is in hadoop-3.2.x now, will be in next release. I might do it for 3.1 too...it makes a good way to programmatically/CLI probe for s3a dir marker policy, see.

Also, cloudstore has some CLI commands for the list calls (and pathcapabilities) to help explore what's going on, including, on s3a, listing cost in #of HTTP requests. Worth playing with to see what is good/bad, though as Mukund has been doing lots of work on 3.3.x s3a list, it will look worse than you'd expect on a dir treewalk,
https://github.com/steveloughran/cloudstore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think changing this in a follow up PR sounds fine to me. I'd like to us to use the faster method by default and fall back on exception, but that could be a follow on. Want to file a JIRA for broadening the scope of using the faster method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w.r.t the faster (incremental) calls, yes, something to consider next. At the very least, you will be able to collect and report/aggregate stats. For example, here is me getting the stats on a large/deep list just by calling toString on the RemoteIterator after it's done its work

Listing statistics:
  counters=((object_list_request=1) (object_continue_list_request=40)); gauges=(); minimums=((object_list_request.min=1298) (object_continue_list_request.min=414)); maximums=((object_continue_list_request.max=746) (object_list_request.max=1298)); means=((object_continue_list_request.mean=(sum=18210, samples=39, mean=466.9231)) (object_list_request.mean=(sum=1298, samples=1, mean=1298.0000))); 

2020-08-27 13:25:21,122 [main] INFO  tools.MarkerTool (DurationInfo.java:close(98)) - marker scan s3a://landsat-pds/: duration 0:19.563s
Listed 40000 objects under s3a://landsat-pds/

Now, who wouldn't like to know things like that. And ideally, collect across threads and merge back in.

As an aside, looked at {{org.apache.hadoop.mapred.LocatedFileStatusFetcher}}. This does multithreaded status fetching and collects those stats. Although it's tagged Private, I've noticed Parquet uses it so my next PR will convert to public/evolving and document the fact.

If you could use that, we could look @ evolving it better, especially returning a RemoteIterator of results which we could incrementally fill in across threads rather than block for the final results. Anything which makes it possible for app code to process data (read footers, etc) while the listing goes on has significant benefit in the World of Object Stores

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, stats will be nice. Looking forward to that 👍 (is it tracked by this PR?)

w.r.t LocatedFileStatusFetcher, yes it will be great if this is exposed as a public API. I think we should also consider moving this to another module such as hadoop-common, and perhaps adding an option to turn on/off locality (so that we don't have to get the block locations if they are not needed). Pairing it with batch listing can also potentially help (although this is only available in HDFS currently).

Instead of multiple threads, Spark currently use a distributed job so I'm not sure whether there will be any regression by doing that, but we can explore this later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

w.r.t LocatedFileStatusFetcher, yes it will be great if this is exposed as a public API. I think we should also consider moving this to another module such as hadoop-common, and perhaps adding an option to turn on/off locality (so that we don't have to get the block locations if they are not needed). Pairing it with batch listing can also potentially help (although this is only available in HDFS currently).

I'd worry about the effect of moving the package as is. It'd have to be a copy and paste, or move and subclass

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao -yes, that's the stats API. Now is the time to review it and tell me where it doesn't suit your (perceived future) needs. I've been playing with a Parquet branch which uses it, not in Spark itself.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the info @steveloughran . I'll check out the PR (it is a very big one though).

// FileNotFoundException which is better handled by doing the lookups manually below.
case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality =>
val remoteIter = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {
def next(): LocatedFileStatus = remoteIter.next
def hasNext(): Boolean = remoteIter.hasNext
}.toArray
case _ => fs.listStatus(path)
}
} catch {
// If we are listing a root path for SQL (e.g. a top level directory of a table), we need to
// ignore FileNotFoundExceptions during this root level of the listing because
//
// (a) certain code paths might construct an InMemoryFileIndex with root paths that
// might not exist (i.e. not all callers are guaranteed to have checked
// path existence prior to constructing InMemoryFileIndex) and,
// (b) we need to ignore deleted root paths during REFRESH TABLE, otherwise we break
// existing behavior and break the ability drop SessionCatalog tables when tables'
// root directories have been deleted (which breaks a number of Spark's own tests).
//
// If we are NOT listing a root path then a FileNotFoundException here means that the
// directory was present in a previous level of file listing but is absent in this
// listing, likely indicating a race condition (e.g. concurrent table overwrite or S3
// list inconsistency).
//
// The trade-off in supporting existing behaviors / use-cases is that we won't be
// able to detect race conditions involving root paths being deleted during
// InMemoryFileIndex construction. However, it's still a net improvement to detect and
// fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion.
case _: FileNotFoundException if isSQLRootPath || ignoreMissingFiles =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]
}

def doFilter(statuses: Array[FileStatus]) = filterFun match {
case Some(shouldFilterOut) =>
statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
case None =>
statuses
}

val filteredStatuses = doFilter(statuses)
val allLeafStatuses = {
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = contextOpt match {
case Some(context) if dirs.size > parallelismThreshold =>
parallelListLeafFiles(
context,
dirs.map(_.getPath),
hadoopConf = hadoopConf,
filter = filter,
areSQLRootPaths = false,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
filterFun = filterFun,
maxParallelism = maxParallelism
).flatMap(_._2)
case _ =>
dirs.flatMap { dir =>
listLeafFiles(
path = dir.getPath,
hadoopConf = hadoopConf,
filter = filter,
contextOpt = contextOpt,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isSQLRootPath = false,
filterFun = filterFun,
parallelismThreshold = parallelismThreshold,
maxParallelism = maxParallelism)
}
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}

val missingFiles = mutable.ArrayBuffer.empty[String]
val filteredLeafStatuses = doFilter(allLeafStatuses)
val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
case f: LocatedFileStatus =>
Some(f)

// NOTE:
//
// - Although S3/S3A/S3N file system can be quite slow for remote file metadata
// operations, calling `getFileBlockLocations` does no harm here since these file system
// implementations don't actually issue RPC for this method.
//
// - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
// be a big deal since we always use to `parallelListLeafFiles` when the number of
// paths exceeds threshold.
case f if !ignoreLocality =>
// The other constructor of LocatedFileStatus will call FileStatus.getPermission(),
// which is very slow on some file system (RawLocalFileSystem, which is launch a
// subprocess and parse the stdout).
try {
val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc =>
// Store BlockLocation objects to consume less memory
if (loc.getClass == classOf[BlockLocation]) {
loc
} else {
new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength)
}
}
val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize,
f.getModificationTime, 0, null, null, null, null, f.getPath, locations)
if (f.isSymlink) {
lfs.setSymlink(f.getSymlink)
}
Some(lfs)
} catch {
case _: FileNotFoundException if ignoreMissingFiles =>
missingFiles += f.getPath.toString
None
}

case f => Some(f)
}

if (missingFiles.nonEmpty) {
logWarning(
s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}")
}

resolvedLeafStatuses.toSeq
sunchao marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading