Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sunchao committed Aug 30, 2020
1 parent 86c2013 commit bfa37cc
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 21 deletions.
35 changes: 18 additions & 17 deletions core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ private[spark] object HadoopFSUtils extends Logging {
* @param paths Input paths to list
* @param hadoopConf Hadoop configuration
* @param filter Path filter used to exclude leaf files from result
* @param areSQLRootPaths Whether the input paths are SQL root paths
* @param isRootLevel Whether the input paths are at the root level, i.e., they are the root
* paths as opposed to nested paths encountered during recursive calls of this.
* @param ignoreMissingFiles Ignore missing files that happen during recursive listing
* (e.g., due to race conditions)
* @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false,
Expand All @@ -54,8 +55,8 @@ private[spark] object HadoopFSUtils extends Logging {
* is smaller than this value, this will fallback to use
* sequential listing.
* @param parallelismMax The maximum parallelism for listing. If the number of input paths is
* larger than this value, parallelism will be throttled to this value
* to avoid generating too many tasks.
* larger than this value, parallelism will be throttled to this value
* to avoid generating too many tasks.
* @param filterFun Optional predicate on the leaf files. Files who failed the check will be
* excluded from the results
* @return for each input path, the set of discovered files for the path
Expand All @@ -65,7 +66,7 @@ private[spark] object HadoopFSUtils extends Logging {
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
areSQLRootPaths: Boolean,
isRootLevel: Boolean,
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
parallelismThreshold: Int,
Expand All @@ -82,9 +83,9 @@ private[spark] object HadoopFSUtils extends Logging {
Some(sc),
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isSQLRootPath = areSQLRootPaths,
isRootPath = isRootLevel,
parallelismThreshold = parallelismThreshold,
parallelismDefault = parallelismMax,
parallelismMax = parallelismMax,
filterFun = filterFun)
(path, leafFiles)
}
Expand Down Expand Up @@ -124,10 +125,10 @@ private[spark] object HadoopFSUtils extends Logging {
contextOpt = None, // Can't execute parallel scans on workers
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isSQLRootPath = areSQLRootPaths,
isRootPath = isRootLevel,
filterFun = filterFun,
parallelismThreshold = Int.MaxValue,
parallelismDefault = 0)
parallelismMax = 0)
(path, leafFiles)
}.iterator
}.map { case (path, statuses) =>
Expand Down Expand Up @@ -181,8 +182,8 @@ private[spark] object HadoopFSUtils extends Logging {

// scalastyle:off argcount
/**
* Lists a single filesystem path recursively. If a `SparkContext`` object is specified, this
* function may launch Spark jobs to parallelize listing based on parallelismThreshold.
* Lists a single filesystem path recursively. If a `SparkContext` object is specified, this
* function may launch Spark jobs to parallelize listing based on `parallelismThreshold`.
*
* If sessionOpt is None, this may be called on executors.
*
Expand All @@ -195,10 +196,10 @@ private[spark] object HadoopFSUtils extends Logging {
contextOpt: Option[SparkContext],
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
isSQLRootPath: Boolean,
isRootPath: Boolean,
filterFun: Option[String => Boolean],
parallelismThreshold: Int,
parallelismDefault: Int): Seq[FileStatus] = {
parallelismMax: Int): Seq[FileStatus] = {

logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)
Expand Down Expand Up @@ -239,7 +240,7 @@ private[spark] object HadoopFSUtils extends Logging {
// able to detect race conditions involving root paths being deleted during
// InMemoryFileIndex construction. However, it's still a net improvement to detect and
// fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion.
case _: FileNotFoundException if isSQLRootPath || ignoreMissingFiles =>
case _: FileNotFoundException if isRootPath || ignoreMissingFiles =>
logWarning(s"The directory $path was not found. Was it deleted very recently?")
Array.empty[FileStatus]
}
Expand All @@ -261,12 +262,12 @@ private[spark] object HadoopFSUtils extends Logging {
dirs.map(_.getPath),
hadoopConf = hadoopConf,
filter = filter,
areSQLRootPaths = false,
isRootLevel = false,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
filterFun = filterFun,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismDefault
parallelismMax = parallelismMax
).flatMap(_._2)
case _ =>
dirs.flatMap { dir =>
Expand All @@ -277,10 +278,10 @@ private[spark] object HadoopFSUtils extends Logging {
contextOpt = contextOpt,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isSQLRootPath = false,
isRootPath = false,
filterFun = filterFun,
parallelismThreshold = parallelismThreshold,
parallelismDefault = parallelismDefault)
parallelismMax = parallelismMax)
}
}
val allFiles = topLevelFiles ++ nestedFiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ object CommandUtils extends Logging {
.getConfString("hive.exec.stagingdir", ".hive-staging")
val filter = new PathFilterIgnoreNonData(stagingDir)
val sizes = InMemoryFileIndex.bulkListLeafFiles(paths.flatten,
sparkSession.sessionState.newHadoopConf(), filter, sparkSession, areRootPaths = true).map {
sparkSession.sessionState.newHadoopConf(), filter, sparkSession, isRootLevel = true).map {
case (_, files) => files.map(_.getLen).sum
}
// the size is 0 where paths(i) is not defined and sizes(i) where it is defined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class InMemoryFileIndex(
}
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = InMemoryFileIndex.bulkListLeafFiles(
pathsToFetch.toSeq, hadoopConf, filter, sparkSession, areRootPaths = true)
pathsToFetch.toSeq, hadoopConf, filter, sparkSession, isRootLevel = true)
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
Expand All @@ -147,13 +147,13 @@ object InMemoryFileIndex extends Logging {
hadoopConf: Configuration,
filter: PathFilter,
sparkSession: SparkSession,
areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = {
isRootLevel: Boolean): Seq[(Path, Seq[FileStatus])] = {
HadoopFSUtils.parallelListLeafFiles(
sc = sparkSession.sparkContext,
paths = paths,
hadoopConf = hadoopConf,
filter = filter,
areSQLRootPaths = areRootPaths,
isRootLevel = isRootLevel,
ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles,
ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
Expand Down

0 comments on commit bfa37cc

Please sign in to comment.