Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32381][CORE][SQL][FOLLOWUP] More cleanup on HadoopFSUtils #29959

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 19 additions & 85 deletions core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.viewfs.ViewFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem

import org.apache.spark._
import org.apache.spark.annotation.Private
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics

Expand All @@ -45,8 +44,6 @@ private[spark] object HadoopFSUtils extends Logging {
* @param paths Input paths to list
* @param hadoopConf Hadoop configuration
* @param filter Path filter used to exclude leaf files from result
* @param isRootLevel Whether the input paths are at the root level, i.e., they are the root
* paths as opposed to nested paths encountered during recursive calls of this.
* @param ignoreMissingFiles Ignore missing files that happen during recursive listing
* (e.g., due to race conditions)
* @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false,
Expand All @@ -57,11 +54,22 @@ private[spark] object HadoopFSUtils extends Logging {
* @param parallelismMax The maximum parallelism for listing. If the number of input paths is
* larger than this value, parallelism will be throttled to this value
* to avoid generating too many tasks.
* @param filterFun Optional predicate on the leaf files. Files who failed the check will be
* excluded from the results
* @return for each input path, the set of discovered files for the path
*/
def parallelListLeafFiles(
sc: SparkContext,
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
parallelismThreshold: Int,
parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {
parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, isRootLevel = true,
ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax)
}

private def parallelListLeafFilesInternal(
sc: SparkContext,
paths: Seq[Path],
hadoopConf: Configuration,
Expand All @@ -70,8 +78,7 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
parallelismThreshold: Int,
parallelismMax: Int,
filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = {
parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = {

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= parallelismThreshold) {
Expand All @@ -85,8 +92,7 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreLocality = ignoreLocality,
isRootPath = isRootLevel,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax,
filterFun = filterFun)
parallelismMax = parallelismMax)
(path, leafFiles)
}
}
Expand Down Expand Up @@ -126,58 +132,16 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = isRootLevel,
filterFun = filterFun,
parallelismThreshold = Int.MaxValue,
parallelismMax = 0)
(path, leafFiles)
}.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
val blockLocations = status match {
case f: LocatedFileStatus =>
f.getBlockLocations.map { loc =>
SerializableBlockLocation(
loc.getNames,
loc.getHosts,
loc.getOffset,
loc.getLength)
}

case _ =>
Array.empty[SerializableBlockLocation]
}

SerializableFileStatus(
status.getPath.toString,
status.getLen,
status.isDirectory,
status.getReplication,
status.getBlockSize,
status.getModificationTime,
status.getAccessTime,
blockLocations)
}
(path.toString, serializableStatuses)
}.collect()
} finally {
sc.setJobDescription(previousJobDescription)
}

// turn SerializableFileStatus back to Status
statusMap.map { case (path, serializableStatuses) =>
val statuses = serializableStatuses.map { f =>
val blockLocations = f.blockLocations.map { loc =>
new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length)
}
new LocatedFileStatus(
new FileStatus(
f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime,
new Path(f.path)),
blockLocations)
}
(new Path(path), statuses)
}
statusMap.toSeq
}

// scalastyle:off argcount
Expand All @@ -197,7 +161,6 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles: Boolean,
ignoreLocality: Boolean,
isRootPath: Boolean,
filterFun: Option[String => Boolean],
parallelismThreshold: Int,
parallelismMax: Int): Seq[FileStatus] = {

Expand Down Expand Up @@ -245,27 +208,18 @@ private[spark] object HadoopFSUtils extends Logging {
Array.empty[FileStatus]
}

def doFilter(statuses: Array[FileStatus]) = filterFun match {
case Some(shouldFilterOut) =>
statuses.filterNot(status => shouldFilterOut(status.getPath.getName))
case None =>
statuses
}

val filteredStatuses = doFilter(statuses)
val allLeafStatuses = {
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao the dirs here may contain hidden directories. We still need to filter them before listing leaf files.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang you're right. Thanks for catching this! and sorry for introducing this regression.

val nestedFiles: Seq[FileStatus] = contextOpt match {
case Some(context) if dirs.size > parallelismThreshold =>
parallelListLeafFiles(
parallelListLeafFilesInternal(
context,
dirs.map(_.getPath),
hadoopConf = hadoopConf,
filter = filter,
isRootLevel = false,
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
filterFun = filterFun,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax
).flatMap(_._2)
Expand All @@ -279,7 +233,6 @@ private[spark] object HadoopFSUtils extends Logging {
ignoreMissingFiles = ignoreMissingFiles,
ignoreLocality = ignoreLocality,
isRootPath = false,
filterFun = filterFun,
parallelismThreshold = parallelismThreshold,
parallelismMax = parallelismMax)
}
Expand All @@ -289,8 +242,7 @@ private[spark] object HadoopFSUtils extends Logging {
}

val missingFiles = mutable.ArrayBuffer.empty[String]
val filteredLeafStatuses = doFilter(allLeafStatuses)
val resolvedLeafStatuses = filteredLeafStatuses.flatMap {
val resolvedLeafStatuses = allLeafStatuses.flatMap {
case f: LocatedFileStatus =>
Some(f)

Expand Down Expand Up @@ -339,22 +291,4 @@ private[spark] object HadoopFSUtils extends Logging {
resolvedLeafStatuses
}
// scalastyle:on argcount

/** A serializable variant of HDFS's BlockLocation. */
private case class SerializableBlockLocation(
names: Array[String],
hosts: Array[String],
offset: Long,
length: Long)

/** A serializable variant of HDFS's FileStatus. */
private case class SerializableFileStatus(
path: String,
length: Long,
isDir: Boolean,
blockReplication: Short,
blockSize: Long,
modificationTime: Long,
accessTime: Long,
blockLocations: Array[SerializableBlockLocation])
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ object CommandUtils extends Logging {
.getConfString("hive.exec.stagingdir", ".hive-staging")
val filter = new PathFilterIgnoreNonData(stagingDir)
val sizes = InMemoryFileIndex.bulkListLeafFiles(paths.flatten,
sparkSession.sessionState.newHadoopConf(), filter, sparkSession, isRootLevel = true).map {
sparkSession.sessionState.newHadoopConf(), filter, sparkSession).map {
case (_, files) => files.map(_.getLen).sum
}
// the size is 0 where paths(i) is not defined and sizes(i) where it is defined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class InMemoryFileIndex(
}
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = InMemoryFileIndex.bulkListLeafFiles(
pathsToFetch.toSeq, hadoopConf, filter, sparkSession, isRootLevel = true)
pathsToFetch.toSeq, hadoopConf, filter, sparkSession)
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
Expand All @@ -146,20 +146,17 @@ object InMemoryFileIndex extends Logging {
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
sparkSession: SparkSession,
isRootLevel: Boolean): Seq[(Path, Seq[FileStatus])] = {
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
HadoopFSUtils.parallelListLeafFiles(
sc = sparkSession.sparkContext,
paths = paths,
hadoopConf = hadoopConf,
filter = filter,
isRootLevel = isRootLevel,
filter = new PathFilterWrapper(filter),
ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles,
ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality,
parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold,
parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism,
filterFun = Some(shouldFilterOut))
}
parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
}

/** Checks if we should filter out this path name. */
def shouldFilterOut(pathName: String): Boolean = {
Expand All @@ -175,3 +172,9 @@ object InMemoryFileIndex extends Logging {
exclude && !include
}
}

private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable {
override def accept(path: Path): Boolean = {
(filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName)
}
}