From 64d364580c58b8c9f390eeb732513c6979da9374 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 6 Oct 2020 09:57:43 -0700 Subject: [PATCH 1/6] Remove extra path filter --- .../org/apache/spark/util/HadoopFSUtils.scala | 26 +++---------------- .../datasources/InMemoryFileIndex.scala | 13 +++++++--- 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index c0a135e04bac5..1345dd928c02b 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.viewfs.ViewFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.spark._ -import org.apache.spark.annotation.Private import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics @@ -57,8 +56,6 @@ private[spark] object HadoopFSUtils extends Logging { * @param parallelismMax The maximum parallelism for listing. If the number of input paths is * larger than this value, parallelism will be throttled to this value * to avoid generating too many tasks. - * @param filterFun Optional predicate on the leaf files. Files who failed the check will be - * excluded from the results * @return for each input path, the set of discovered files for the path */ def parallelListLeafFiles( @@ -70,8 +67,7 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles: Boolean, ignoreLocality: Boolean, parallelismThreshold: Int, - parallelismMax: Int, - filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = { + parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = { // Short-circuits parallel listing when serial listing is likely to be faster. if (paths.size <= parallelismThreshold) { @@ -85,8 +81,7 @@ private[spark] object HadoopFSUtils extends Logging { ignoreLocality = ignoreLocality, isRootPath = isRootLevel, parallelismThreshold = parallelismThreshold, - parallelismMax = parallelismMax, - filterFun = filterFun) + parallelismMax = parallelismMax) (path, leafFiles) } } @@ -126,7 +121,6 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, isRootPath = isRootLevel, - filterFun = filterFun, parallelismThreshold = Int.MaxValue, parallelismMax = 0) (path, leafFiles) @@ -197,7 +191,6 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles: Boolean, ignoreLocality: Boolean, isRootPath: Boolean, - filterFun: Option[String => Boolean], parallelismThreshold: Int, parallelismMax: Int): Seq[FileStatus] = { @@ -245,16 +238,8 @@ private[spark] object HadoopFSUtils extends Logging { Array.empty[FileStatus] } - def doFilter(statuses: Array[FileStatus]) = filterFun match { - case Some(shouldFilterOut) => - statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) - case None => - statuses - } - - val filteredStatuses = doFilter(statuses) val allLeafStatuses = { - val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) + val (dirs, topLevelFiles) = statuses.partition(_.isDirectory) val nestedFiles: Seq[FileStatus] = contextOpt match { case Some(context) if dirs.size > parallelismThreshold => parallelListLeafFiles( @@ -265,7 +250,6 @@ private[spark] object HadoopFSUtils extends Logging { isRootLevel = false, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - filterFun = filterFun, parallelismThreshold = parallelismThreshold, parallelismMax = parallelismMax ).flatMap(_._2) @@ -279,7 +263,6 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, isRootPath = false, - filterFun = filterFun, parallelismThreshold = parallelismThreshold, parallelismMax = parallelismMax) } @@ -289,8 +272,7 @@ private[spark] object HadoopFSUtils extends Logging { } val missingFiles = mutable.ArrayBuffer.empty[String] - val filteredLeafStatuses = doFilter(allLeafStatuses) - val resolvedLeafStatuses = filteredLeafStatuses.flatMap { + val resolvedLeafStatuses = allLeafStatuses.flatMap { case f: LocatedFileStatus => Some(f) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 130894e9bc025..5ec18e03a990f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -152,14 +152,13 @@ object InMemoryFileIndex extends Logging { sc = sparkSession.sparkContext, paths = paths, hadoopConf = hadoopConf, - filter = filter, + filter = new PathFilterWrapper(filter), isRootLevel = isRootLevel, ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles, ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality, parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold, - parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism, - filterFun = Some(shouldFilterOut)) - } + parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism) + } /** Checks if we should filter out this path name. */ def shouldFilterOut(pathName: String): Boolean = { @@ -175,3 +174,9 @@ object InMemoryFileIndex extends Logging { exclude && !include } } + +private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable { + override def accept(path: Path): Boolean = { + (filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName) + } +} \ No newline at end of file From c679d33926e31ead88ec0bb0f83a4d584223b5bd Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 6 Oct 2020 14:14:08 -0700 Subject: [PATCH 2/6] Remove SerializableBlockLocation and SerializableFileStatus --- .../org/apache/spark/util/HadoopFSUtils.scala | 61 +------------------ 1 file changed, 1 insertion(+), 60 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 1345dd928c02b..e5bb4438bcd9f 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -125,53 +125,12 @@ private[spark] object HadoopFSUtils extends Logging { parallelismMax = 0) (path, leafFiles) }.iterator - }.map { case (path, statuses) => - val serializableStatuses = statuses.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - } - (path.toString, serializableStatuses) }.collect() } finally { sc.setJobDescription(previousJobDescription) } - // turn SerializableFileStatus back to Status - statusMap.map { case (path, serializableStatuses) => - val statuses = serializableStatuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, - new Path(f.path)), - blockLocations) - } - (new Path(path), statuses) - } + statusMap.toSeq } // scalastyle:off argcount @@ -321,22 +280,4 @@ private[spark] object HadoopFSUtils extends Logging { resolvedLeafStatuses } // scalastyle:on argcount - - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) } From 0df061cdb70779070f244fb8e3129d7812a4862e Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 6 Oct 2020 16:25:45 -0700 Subject: [PATCH 3/6] Hide isRootLevel parameter --- .../org/apache/spark/util/HadoopFSUtils.scala | 16 +++++++++++++--- .../sql/execution/command/CommandUtils.scala | 2 +- .../datasources/InMemoryFileIndex.scala | 6 ++---- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index e5bb4438bcd9f..7c3c2112f1ab7 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -44,8 +44,6 @@ private[spark] object HadoopFSUtils extends Logging { * @param paths Input paths to list * @param hadoopConf Hadoop configuration * @param filter Path filter used to exclude leaf files from result - * @param isRootLevel Whether the input paths are at the root level, i.e., they are the root - * paths as opposed to nested paths encountered during recursive calls of this. * @param ignoreMissingFiles Ignore missing files that happen during recursive listing * (e.g., due to race conditions) * @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false, @@ -59,6 +57,19 @@ private[spark] object HadoopFSUtils extends Logging { * @return for each input path, the set of discovered files for the path */ def parallelListLeafFiles( + sc: SparkContext, + paths: Seq[Path], + hadoopConf: Configuration, + filter: PathFilter, + ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, + parallelismThreshold: Int, + parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = { + parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, true, ignoreMissingFiles, + ignoreLocality, parallelismThreshold, parallelismMax) + } + + private def parallelListLeafFilesInternal( sc: SparkContext, paths: Seq[Path], hadoopConf: Configuration, @@ -206,7 +217,6 @@ private[spark] object HadoopFSUtils extends Logging { dirs.map(_.getPath), hadoopConf = hadoopConf, filter = filter, - isRootLevel = false, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, parallelismThreshold = parallelismThreshold, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 8bf7504716f79..6495463be02c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -163,7 +163,7 @@ object CommandUtils extends Logging { .getConfString("hive.exec.stagingdir", ".hive-staging") val filter = new PathFilterIgnoreNonData(stagingDir) val sizes = InMemoryFileIndex.bulkListLeafFiles(paths.flatten, - sparkSession.sessionState.newHadoopConf(), filter, sparkSession, isRootLevel = true).map { + sparkSession.sessionState.newHadoopConf(), filter, sparkSession).map { case (_, files) => files.map(_.getLen).sum } // the size is 0 where paths(i) is not defined and sizes(i) where it is defined diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 5ec18e03a990f..1456da0288358 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -128,7 +128,7 @@ class InMemoryFileIndex( } val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( - pathsToFetch.toSeq, hadoopConf, filter, sparkSession, isRootLevel = true) + pathsToFetch.toSeq, hadoopConf, filter, sparkSession) discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) @@ -146,14 +146,12 @@ object InMemoryFileIndex extends Logging { paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, - sparkSession: SparkSession, - isRootLevel: Boolean): Seq[(Path, Seq[FileStatus])] = { + sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { HadoopFSUtils.parallelListLeafFiles( sc = sparkSession.sparkContext, paths = paths, hadoopConf = hadoopConf, filter = new PathFilterWrapper(filter), - isRootLevel = isRootLevel, ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles, ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality, parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold, From 76f5e23875b7a68ea549b7d159bd2b0c11ec317f Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 6 Oct 2020 17:44:14 -0700 Subject: [PATCH 4/6] Fix lint --- .../spark/sql/execution/datasources/InMemoryFileIndex.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 1456da0288358..21275951b5603 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -177,4 +177,4 @@ private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with override def accept(path: Path): Boolean = { (filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName) } -} \ No newline at end of file +} From cb76047370a71deaa3b1a50e709ffe68a2b2a52a Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 9 Oct 2020 11:02:09 -0700 Subject: [PATCH 5/6] Fix a silly mistake --- .../main/scala/org/apache/spark/util/HadoopFSUtils.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 7c3c2112f1ab7..a3a528cddee37 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -65,8 +65,8 @@ private[spark] object HadoopFSUtils extends Logging { ignoreLocality: Boolean, parallelismThreshold: Int, parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = { - parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, true, ignoreMissingFiles, - ignoreLocality, parallelismThreshold, parallelismMax) + parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, isRootLevel = true, + ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax) } private def parallelListLeafFilesInternal( @@ -212,11 +212,12 @@ private[spark] object HadoopFSUtils extends Logging { val (dirs, topLevelFiles) = statuses.partition(_.isDirectory) val nestedFiles: Seq[FileStatus] = contextOpt match { case Some(context) if dirs.size > parallelismThreshold => - parallelListLeafFiles( + parallelListLeafFilesInternal( context, dirs.map(_.getPath), hadoopConf = hadoopConf, filter = filter, + isRootLevel = false, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, parallelismThreshold = parallelismThreshold, From e9d399de621a9cbfc32438b3460f78bef0e73de9 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 17 Nov 2020 16:36:34 -0800 Subject: [PATCH 6/6] Empty commit to trigger CI again