From 2d6add1ef6a22bc263444a3f8a05c8cd5e1e23ce Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 2 Jul 2020 15:02:37 -0700 Subject: [PATCH 01/30] Start working on re-implementing the resolution logic to avoid getting locations --- .../org/apache/spark/rdd/NewHadoopRDD.scala | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index a7a6cf43b14a0..004dc5e1bdd49 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -120,6 +120,34 @@ class NewHadoopRDD[K, V]( } } + protected def getSplits(): Array[FileSplit] = { + val jobContext = new JobContextImpl(_conf, jobId) + if (!ignoreLocatity) { + inputFormat.getSplits(jobContext).asScala + } else { + inputFormat match { + case fileFormat: FileInputFormat => + // dirs can be a mixture of dirs and files, but this matches the Hadoop impl + val dirs = fileFormat.getInputPaths(jobContext) + val filter = fileFormat.getInputPathFilter(jobContext) + def processInputPath(p: Path): Try[FileStatus] = { + val fs = p.getFileSystem(jobContext) + try { + if + Success() + } catch { + } + } + val fileStatuses = dirs.map { p => + processInputPath(p) + } + case _ => + throw new SparkException( + s"Input form ${inputFormat} was not a FileInputFormat but asked to skip locations") + } + } + } + override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.getConstructor().newInstance() inputFormat match { @@ -128,7 +156,7 @@ class NewHadoopRDD[K, V]( case _ => } try { - val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala + val allRowSplits = getSplits() val rawSplits = if (ignoreEmptySplits) { allRowSplits.filter(_.getLength > 0) } else { From 02ea25da560bf052de95c5ba3c7e4996d72c8309 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 2 Jul 2020 15:02:47 -0700 Subject: [PATCH 02/30] Revert "Start working on re-implementing the resolution logic to avoid getting locations" This reverts commit bbe63444b4cac0bd6858f1c684a38578e9c4ab5e. --- .../org/apache/spark/rdd/NewHadoopRDD.scala | 30 +------------------ 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 004dc5e1bdd49..a7a6cf43b14a0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -120,34 +120,6 @@ class NewHadoopRDD[K, V]( } } - protected def getSplits(): Array[FileSplit] = { - val jobContext = new JobContextImpl(_conf, jobId) - if (!ignoreLocatity) { - inputFormat.getSplits(jobContext).asScala - } else { - inputFormat match { - case fileFormat: FileInputFormat => - // dirs can be a mixture of dirs and files, but this matches the Hadoop impl - val dirs = fileFormat.getInputPaths(jobContext) - val filter = fileFormat.getInputPathFilter(jobContext) - def processInputPath(p: Path): Try[FileStatus] = { - val fs = p.getFileSystem(jobContext) - try { - if - Success() - } catch { - } - } - val fileStatuses = dirs.map { p => - processInputPath(p) - } - case _ => - throw new SparkException( - s"Input form ${inputFormat} was not a FileInputFormat but asked to skip locations") - } - } - } - override def getPartitions: Array[Partition] = { val inputFormat = inputFormatClass.getConstructor().newInstance() inputFormat match { @@ -156,7 +128,7 @@ class NewHadoopRDD[K, V]( case _ => } try { - val allRowSplits = getSplits() + val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala val rawSplits = if (ignoreEmptySplits) { allRowSplits.filter(_.getLength > 0) } else { From fded3948f73159509827ac75b4b82e5ed03fcc93 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 2 Jul 2020 16:06:41 -0700 Subject: [PATCH 03/30] Start moving InMemoryFileIndex over to HadoopFSUtils --- .../org/apache/spark/util/HadoopFSUtils.scala | 270 ++++++++++++++++++ .../datasources/InMemoryFileIndex.scala | 251 ++-------------- 2 files changed, 286 insertions(+), 235 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala new file mode 100644 index 0000000000000..d0091fb71d1df --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +private[spark] HadoopFSUtils extends Logging { + /** + * Lists a collection of paths recursively. Picks the listing strategy adaptively depending + * on the number of paths to list. + * + * This may only be called on the driver. + * + * @return for each input path, the set of discovered files for the path + */ + + def parallelListLeafFiles(sc: SparkContext, paths: Seq[Path], hadoopConf: HadoopConf, + filter: Filter, areSQLRootPaths: Boolean, ignoreMissingFiles: Boolean, ignoreLocality: Boolean, + maxParallelism: Int, parallelScanCallBack: Option[() => Unit] = None, + filterFun: Option[String => Boolean] = None): Seq[FileStatus] = { + + val sparkContext = sparkSession.sparkContext + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val serializedPaths = paths.map(_.toString) + + // Set the number of parallelism to prevent following file listing from generating many tasks + // in case of large #defaultParallelism. + val numParallelism = Math.min(paths.size, maxParallelism) + + val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) + val statusMap = try { + val description = paths.size match { + case 0 => + s"Listing leaf files and directories 0 paths" + case 1 => + s"Listing leaf files and directories for 1 path:
${paths(0)}" + case s => + s"Listing leaf files and directories for $s paths:
${paths(0)}, ..." + } + sparkContext.setJobDescription(description) + sparkContext + .parallelize(serializedPaths, numParallelism) + .mapPartitions { pathStrings => + val hadoopConf = serializableConfiguration.value + pathStrings.map(new Path(_)).toSeq.map { path => + val leafFiles = listLeafFiles( + path, + hadoopConf, + filter, + ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, + isRootPath = areSQLRootPaths, + filterFun) + (path, leafFiles) + }.iterator + }.map { case (path, statuses) => + val serializableStatuses = statuses.map { status => + // Turn FileStatus into SerializableFileStatus so we can send it back to the driver + val blockLocations = status match { + case f: LocatedFileStatus => + f.getBlockLocations.map { loc => + SerializableBlockLocation( + loc.getNames, + loc.getHosts, + loc.getOffset, + loc.getLength) + } + + case _ => + Array.empty[SerializableBlockLocation] + } + + SerializableFileStatus( + status.getPath.toString, + status.getLen, + status.isDirectory, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime, + blockLocations) + } + (path.toString, serializableStatuses) + }.collect() + } finally { + sparkContext.setJobDescription(previousJobDescription) + } + + // turn SerializableFileStatus back to Status + statusMap.map { case (path, serializableStatuses) => + val statuses = serializableStatuses.map { f => + val blockLocations = f.blockLocations.map { loc => + new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) + } + new LocatedFileStatus( + new FileStatus( + f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, + new Path(f.path)), + blockLocations) + } + (new Path(path), statuses) + } + } + /** + * Lists a single filesystem path recursively. If a Sparkcontext object is specified, this + * function may launch Spark jobs to parallelize listing based on parallelismThreshold. + * + * If sessionOpt is None, this may be called on executors. + * + * @return all children of path that match the specified filter. + */ + private def listLeafFiles( + path: Path, + hadoopConf: Configuration, + filter: PathFilter, + contextOpt: Option[SparkContext], + ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, + isRootPath: Boolean, + filterFun: Option[String => Boolean] = None, + parallelisThreshold: Option[Int]): Seq[FileStatus] = { + logTrace(s"Listing $path") + val fs = path.getFileSystem(hadoopConf) + + // Note that statuses only include FileStatus for the files and dirs directly under path, + // and does not include anything else recursively. + val statuses: Array[FileStatus] = try { + fs match { + // DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode + // to retrieve the file status with the file block location. The reason to still fallback + // to listStatus is because the default implementation would potentially throw a + // FileNotFoundException which is better handled by doing the lookups manually below. + case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality => + val remoteIter = fs.listLocatedStatus(path) + new Iterator[LocatedFileStatus]() { + def next(): LocatedFileStatus = remoteIter.next + def hasNext(): Boolean = remoteIter.hasNext + }.toArray + case _ => fs.listStatus(path) + } + } catch { + // If we are listing a root path (e.g. a top level directory of a table), we need to + // ignore FileNotFoundExceptions during this root level of the listing because + // + // (a) certain code paths might construct an InMemoryFileIndex with root paths that + // might not exist (i.e. not all callers are guaranteed to have checked + // path existence prior to constructing InMemoryFileIndex) and, + // (b) we need to ignore deleted root paths during REFRESH TABLE, otherwise we break + // existing behavior and break the ability drop SessionCatalog tables when tables' + // root directories have been deleted (which breaks a number of Spark's own tests). + // + // If we are NOT listing a root path then a FileNotFoundException here means that the + // directory was present in a previous level of file listing but is absent in this + // listing, likely indicating a race condition (e.g. concurrent table overwrite or S3 + // list inconsistency). + // + // The trade-off in supporting existing behaviors / use-cases is that we won't be + // able to detect race conditions involving root paths being deleted during + // InMemoryFileIndex construction. However, it's still a net improvement to detect and + // fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion. + case _: FileNotFoundException if isRootPath || ignoreMissingFiles => + logWarning(s"The directory $path was not found. Was it deleted very recently?") + Array.empty[FileStatus] + } + + val filteredStatuses = filterFun match { + case Some(shouldfilterOut) => + statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) + case None => + statuses + } + + val allLeafStatuses = { + val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) + val nestedFiles: Seq[FileStatus] = contextOpt match { + case Some(context) => + bulkListLeafFiles( + context, + dirs.map(_.getPath), + hadoopConf, + filter, + , + areSQLRootPaths = false + ).flatMap(_._2) + case _ => + dirs.flatMap { dir => + listLeafFiles( + dir.getPath, + hadoopConf, + filter, + sessionOpt, + ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, + isSQLRootPath = false) + } + } + val allFiles = topLevelFiles ++ nestedFiles + if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles + } + + val missingFiles = mutable.ArrayBuffer.empty[String] + val filteredLeafStatuses = filterFun match { + case Some(shouldFilterOut) => + allLeafStatuses.filterNot( + status => shouldFilterOut(status.getPath.getName)) + case None => + allLeafStatuses + } + val resolvedLeafStatuses = filteredLeafStatuses.flatMap { + case f: LocatedFileStatus => + Some(f) + + // NOTE: + // + // - Although S3/S3A/S3N file system can be quite slow for remote file metadata + // operations, calling `getFileBlockLocations` does no harm here since these file system + // implementations don't actually issue RPC for this method. + // + // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not + // be a big deal since we always use to `bulkListLeafFiles` when the number of + // paths exceeds threshold. + case f if !ignoreLocality => + // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), + // which is very slow on some file system (RawLocalFileSystem, which is launch a + // subprocess and parse the stdout). + try { + val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc => + // Store BlockLocation objects to consume less memory + if (loc.getClass == classOf[BlockLocation]) { + loc + } else { + new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength) + } + } + val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, + f.getModificationTime, 0, null, null, null, null, f.getPath, locations) + if (f.isSymlink) { + lfs.setSymlink(f.getSymlink) + } + Some(lfs) + } catch { + case _: FileNotFoundException if ignoreMissingFiles => + missingFiles += f.getPath.toString + None + } + + case f => Some(f) + } + + if (missingFiles.nonEmpty) { + logWarning( + s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}") + } + + resolvedLeafStatuses + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index a488ed16a835a..a58e312aeafbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -165,14 +165,6 @@ object InMemoryFileIndex extends Logging { accessTime: Long, blockLocations: Array[SerializableBlockLocation]) - /** - * Lists a collection of paths recursively. Picks the listing strategy adaptively depending - * on the number of paths to list. - * - * This may only be called on the driver. - * - * @return for each input path, the set of discovered files for the path - */ private[sql] def bulkListLeafFiles( paths: Seq[Path], hadoopConf: Configuration, @@ -183,249 +175,38 @@ object InMemoryFileIndex extends Logging { val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles val ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality + val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) + // Short-circuits parallel listing when serial listing is likely to be faster. - if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) { + val threshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold + if (paths.size <= threshold) { return paths.map { path => - val leafFiles = listLeafFiles( + val leafFiles = HadoopFSUtils.listLeafFiles( path, hadoopConf, filter, Some(sparkSession), ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - isRootPath = areRootPaths) + isSQLRootPath = areRootPaths, + filterFun = Some(shouldFilterOut), + parallelismThreshold = Some(threshold)) (path, leafFiles) } } + val parallelPartitionDiscoveryParallelism = + sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism + } + private def parallelScanCallBack(): Unit = { logInfo(s"Listing leaf files and directories in parallel under ${paths.length} paths." + s" The first several paths are: ${paths.take(10).mkString(", ")}.") HiveCatalogMetrics.incrementParallelListingJobCount(1) - - val sparkContext = sparkSession.sparkContext - val serializableConfiguration = new SerializableConfiguration(hadoopConf) - val serializedPaths = paths.map(_.toString) - val parallelPartitionDiscoveryParallelism = - sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism - - // Set the number of parallelism to prevent following file listing from generating many tasks - // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) - - val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) - val statusMap = try { - val description = paths.size match { - case 0 => - s"Listing leaf files and directories 0 paths" - case 1 => - s"Listing leaf files and directories for 1 path:
${paths(0)}" - case s => - s"Listing leaf files and directories for $s paths:
${paths(0)}, ..." - } - sparkContext.setJobDescription(description) - sparkContext - .parallelize(serializedPaths, numParallelism) - .mapPartitions { pathStrings => - val hadoopConf = serializableConfiguration.value - pathStrings.map(new Path(_)).toSeq.map { path => - val leafFiles = listLeafFiles( - path, - hadoopConf, - filter, - None, - ignoreMissingFiles = ignoreMissingFiles, - ignoreLocality = ignoreLocality, - isRootPath = areRootPaths) - (path, leafFiles) - }.iterator - }.map { case (path, statuses) => - val serializableStatuses = statuses.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - } - (path.toString, serializableStatuses) - }.collect() - } finally { - sparkContext.setJobDescription(previousJobDescription) - } - - // turn SerializableFileStatus back to Status - statusMap.map { case (path, serializableStatuses) => - val statuses = serializableStatuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, - new Path(f.path)), - blockLocations) - } - (new Path(path), statuses) - } } - - /** - * Lists a single filesystem path recursively. If a SparkSession object is specified, this - * function may launch Spark jobs to parallelize listing. - * - * If sessionOpt is None, this may be called on executors. - * - * @return all children of path that match the specified filter. - */ - private def listLeafFiles( - path: Path, - hadoopConf: Configuration, - filter: PathFilter, - sessionOpt: Option[SparkSession], - ignoreMissingFiles: Boolean, - ignoreLocality: Boolean, - isRootPath: Boolean): Seq[FileStatus] = { - logTrace(s"Listing $path") - val fs = path.getFileSystem(hadoopConf) - - // Note that statuses only include FileStatus for the files and dirs directly under path, - // and does not include anything else recursively. - val statuses: Array[FileStatus] = try { - fs match { - // DistributedFileSystem overrides listLocatedStatus to make 1 single call to namenode - // to retrieve the file status with the file block location. The reason to still fallback - // to listStatus is because the default implementation would potentially throw a - // FileNotFoundException which is better handled by doing the lookups manually below. - case (_: DistributedFileSystem | _: ViewFileSystem) if !ignoreLocality => - val remoteIter = fs.listLocatedStatus(path) - new Iterator[LocatedFileStatus]() { - def next(): LocatedFileStatus = remoteIter.next - def hasNext(): Boolean = remoteIter.hasNext - }.toArray - case _ => fs.listStatus(path) - } - } catch { - // If we are listing a root path (e.g. a top level directory of a table), we need to - // ignore FileNotFoundExceptions during this root level of the listing because - // - // (a) certain code paths might construct an InMemoryFileIndex with root paths that - // might not exist (i.e. not all callers are guaranteed to have checked - // path existence prior to constructing InMemoryFileIndex) and, - // (b) we need to ignore deleted root paths during REFRESH TABLE, otherwise we break - // existing behavior and break the ability drop SessionCatalog tables when tables' - // root directories have been deleted (which breaks a number of Spark's own tests). - // - // If we are NOT listing a root path then a FileNotFoundException here means that the - // directory was present in a previous level of file listing but is absent in this - // listing, likely indicating a race condition (e.g. concurrent table overwrite or S3 - // list inconsistency). - // - // The trade-off in supporting existing behaviors / use-cases is that we won't be - // able to detect race conditions involving root paths being deleted during - // InMemoryFileIndex construction. However, it's still a net improvement to detect and - // fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion. - case _: FileNotFoundException if isRootPath || ignoreMissingFiles => - logWarning(s"The directory $path was not found. Was it deleted very recently?") - Array.empty[FileStatus] - } - - val filteredStatuses = statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) - - val allLeafStatuses = { - val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) - val nestedFiles: Seq[FileStatus] = sessionOpt match { - case Some(session) => - bulkListLeafFiles( - dirs.map(_.getPath), - hadoopConf, - filter, - session, - areRootPaths = false - ).flatMap(_._2) - case _ => - dirs.flatMap { dir => - listLeafFiles( - dir.getPath, - hadoopConf, - filter, - sessionOpt, - ignoreMissingFiles = ignoreMissingFiles, - ignoreLocality = ignoreLocality, - isRootPath = false) - } - } - val allFiles = topLevelFiles ++ nestedFiles - if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles - } - - val missingFiles = mutable.ArrayBuffer.empty[String] - val filteredLeafStatuses = allLeafStatuses.filterNot( - status => shouldFilterOut(status.getPath.getName)) - val resolvedLeafStatuses = filteredLeafStatuses.flatMap { - case f: LocatedFileStatus => - Some(f) - - // NOTE: - // - // - Although S3/S3A/S3N file system can be quite slow for remote file metadata - // operations, calling `getFileBlockLocations` does no harm here since these file system - // implementations don't actually issue RPC for this method. - // - // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `bulkListLeafFiles` when the number of - // paths exceeds threshold. - case f if !ignoreLocality => - // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), - // which is very slow on some file system (RawLocalFileSystem, which is launch a - // subprocess and parse the stdout). - try { - val locations = fs.getFileBlockLocations(f, 0, f.getLen).map { loc => - // Store BlockLocation objects to consume less memory - if (loc.getClass == classOf[BlockLocation]) { - loc - } else { - new BlockLocation(loc.getNames, loc.getHosts, loc.getOffset, loc.getLength) - } - } - val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, f.getReplication, f.getBlockSize, - f.getModificationTime, 0, null, null, null, null, f.getPath, locations) - if (f.isSymlink) { - lfs.setSymlink(f.getSymlink) - } - Some(lfs) - } catch { - case _: FileNotFoundException if ignoreMissingFiles => - missingFiles += f.getPath.toString - None - } - - case f => Some(f) - } - - if (missingFiles.nonEmpty) { - logWarning( - s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}") - } - - resolvedLeafStatuses + HadoopFSUtils.parallelListLeafFiles(sparkSession.sparkContext, paths, hadoopConf, filter, + areSQLRootPaths = areRootPaths, ignoreMissingFiles = ignorMissingFiles, + ignoreLocality = ignoreLocality, parallelPartitionDiscoveryParallelism, + Some(parallelScanCallBack), Some(shouldFilterOut)) } /** Checks if we should filter out this path name. */ From f2fdcd7b900bb879258a16b43532b6af4e7bd57d Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 6 Jul 2020 12:15:12 -0700 Subject: [PATCH 04/30] Get the SQL layer compiling against the common shim layer --- .../org/apache/spark/util/HadoopFSUtils.scala | 112 ++++++++++++------ .../datasources/InMemoryFileIndex.scala | 63 ++++------ 2 files changed, 102 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index d0091fb71d1df..8a8466ecdbd3e 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -17,7 +17,20 @@ package org.apache.spark.util -private[spark] HadoopFSUtils extends Logging { +import java.io.FileNotFoundException + +import scala.collection.mutable + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.viewfs.ViewFileSystem +import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.mapred.{FileInputFormat, JobConf} + +import org.apache.spark.SparkContext +import org.apache.spark.internal.Logging + +private[spark] object HadoopFSUtils extends Logging { /** * Lists a collection of paths recursively. Picks the listing strategy adaptively depending * on the number of paths to list. @@ -27,12 +40,11 @@ private[spark] HadoopFSUtils extends Logging { * @return for each input path, the set of discovered files for the path */ - def parallelListLeafFiles(sc: SparkContext, paths: Seq[Path], hadoopConf: HadoopConf, - filter: Filter, areSQLRootPaths: Boolean, ignoreMissingFiles: Boolean, ignoreLocality: Boolean, - maxParallelism: Int, parallelScanCallBack: Option[() => Unit] = None, - filterFun: Option[String => Boolean] = None): Seq[FileStatus] = { + def parallelListLeafFiles(sc: SparkContext, paths: Seq[Path], hadoopConf: Configuration, + filter: PathFilter, areSQLRootPaths: Boolean, ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, maxParallelism: Int, + filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = { - val sparkContext = sparkSession.sparkContext val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializedPaths = paths.map(_.toString) @@ -40,7 +52,7 @@ private[spark] HadoopFSUtils extends Logging { // in case of large #defaultParallelism. val numParallelism = Math.min(paths.size, maxParallelism) - val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) + val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) val statusMap = try { val description = paths.size match { case 0 => @@ -50,20 +62,23 @@ private[spark] HadoopFSUtils extends Logging { case s => s"Listing leaf files and directories for $s paths:
${paths(0)}, ..." } - sparkContext.setJobDescription(description) - sparkContext + sc.setJobDescription(description) + sc .parallelize(serializedPaths, numParallelism) .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value pathStrings.map(new Path(_)).toSeq.map { path => val leafFiles = listLeafFiles( - path, - hadoopConf, - filter, + contextOpt = None, // Can't execute parallel scans on workers + path = path, + hadoopConf = hadoopConf, + filter = filter, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - isRootPath = areSQLRootPaths, - filterFun) + isSQLRootPath = areSQLRootPaths, + filterFun = filterFun, + parallelScanCallBack = None // Can't execute parallel scans on workers + ) (path, leafFiles) }.iterator }.map { case (path, statuses) => @@ -96,7 +111,7 @@ private[spark] HadoopFSUtils extends Logging { (path.toString, serializableStatuses) }.collect() } finally { - sparkContext.setJobDescription(previousJobDescription) + sc.setJobDescription(previousJobDescription) } // turn SerializableFileStatus back to Status @@ -122,16 +137,21 @@ private[spark] HadoopFSUtils extends Logging { * * @return all children of path that match the specified filter. */ - private def listLeafFiles( + // scalastyle:off argcount + def listLeafFiles( path: Path, hadoopConf: Configuration, filter: PathFilter, contextOpt: Option[SparkContext], ignoreMissingFiles: Boolean, ignoreLocality: Boolean, - isRootPath: Boolean, - filterFun: Option[String => Boolean] = None, - parallelisThreshold: Option[Int]): Seq[FileStatus] = { + isSQLRootPath: Boolean, + parallelScanCallBack: Option[() => Unit], + filterFun: Option[String => Boolean], + parallelismThreshold: Int = 1, + maxParallelism: Int = 1): Seq[FileStatus] = { + // scalastyle:on argcount + logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) @@ -152,7 +172,7 @@ private[spark] HadoopFSUtils extends Logging { case _ => fs.listStatus(path) } } catch { - // If we are listing a root path (e.g. a top level directory of a table), we need to + // If we are listing a root path for SQL (e.g. a top level directory of a table), we need to // ignore FileNotFoundExceptions during this root level of the listing because // // (a) certain code paths might construct an InMemoryFileIndex with root paths that @@ -171,13 +191,13 @@ private[spark] HadoopFSUtils extends Logging { // able to detect race conditions involving root paths being deleted during // InMemoryFileIndex construction. However, it's still a net improvement to detect and // fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion. - case _: FileNotFoundException if isRootPath || ignoreMissingFiles => + case _: FileNotFoundException if isSQLRootPath || ignoreMissingFiles => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] } val filteredStatuses = filterFun match { - case Some(shouldfilterOut) => + case Some(shouldFilterOut) => statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) case None => statuses @@ -186,25 +206,31 @@ private[spark] HadoopFSUtils extends Logging { val allLeafStatuses = { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) val nestedFiles: Seq[FileStatus] = contextOpt match { - case Some(context) => - bulkListLeafFiles( + case Some(context) if dirs.size > parallelismThreshold => + parallelScanCallBack.foreach(f => f()) + parallelListLeafFiles( context, dirs.map(_.getPath), - hadoopConf, - filter, - , - areSQLRootPaths = false + hadoopConf = hadoopConf, + filter = filter, + areSQLRootPaths = false, + ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, + filterFun = filterFun, + maxParallelism = maxParallelism ).flatMap(_._2) case _ => dirs.flatMap { dir => listLeafFiles( - dir.getPath, - hadoopConf, - filter, - sessionOpt, + path = dir.getPath, + hadoopConf = hadoopConf, + filter = filter, + contextOpt = contextOpt, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - isSQLRootPath = false) + isSQLRootPath = false, + parallelScanCallBack = parallelScanCallBack, + filterFun = filterFun) } } val allFiles = topLevelFiles ++ nestedFiles @@ -230,7 +256,7 @@ private[spark] HadoopFSUtils extends Logging { // implementations don't actually issue RPC for this method. // // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not - // be a big deal since we always use to `bulkListLeafFiles` when the number of + // be a big deal since we always use to `parallelListLeafFiles` when the number of // paths exceeds threshold. case f if !ignoreLocality => // The other constructor of LocatedFileStatus will call FileStatus.getPermission(), @@ -267,4 +293,22 @@ private[spark] HadoopFSUtils extends Logging { resolvedLeafStatuses } + + /** A serializable variant of HDFS's BlockLocation. */ + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + /** A serializable variant of HDFS's FileStatus. */ + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index a58e312aeafbd..b602ebfa7fe89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources -import java.io.FileNotFoundException - import scala.collection.mutable import org.apache.hadoop.conf.Configuration @@ -33,7 +31,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.FileStreamSink import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.HadoopFSUtils /** @@ -147,24 +145,6 @@ class InMemoryFileIndex( object InMemoryFileIndex extends Logging { - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) - private[sql] def bulkListLeafFiles( paths: Seq[Path], hadoopConf: Configuration, @@ -177,36 +157,41 @@ object InMemoryFileIndex extends Logging { val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) + val parallelPartitionDiscoveryParallelism = + sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism + val sc = sparkSession.sparkContext + + def parallelScanCallBack(): Unit = { + logInfo(s"Listing leaf files and directories in parallel under ${paths.length} paths." + + s" The first several paths are: ${paths.take(10).mkString(", ")}.") + HiveCatalogMetrics.incrementParallelListingJobCount(1) + } + // Short-circuits parallel listing when serial listing is likely to be faster. val threshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold if (paths.size <= threshold) { return paths.map { path => val leafFiles = HadoopFSUtils.listLeafFiles( - path, - hadoopConf, - filter, - Some(sparkSession), + path = path, + hadoopConf = hadoopConf, + filter = filter, + contextOpt = Some(sc), ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, isSQLRootPath = areRootPaths, - filterFun = Some(shouldFilterOut), - parallelismThreshold = Some(threshold)) + parallelScanCallBack = Some(parallelScanCallBack _), + filterFun = Some(shouldFilterOut _), + parallelismThreshold = threshold, + maxParallelism = parallelPartitionDiscoveryParallelism) (path, leafFiles) } } - val parallelPartitionDiscoveryParallelism = - sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism - } - private def parallelScanCallBack(): Unit = { - logInfo(s"Listing leaf files and directories in parallel under ${paths.length} paths." + - s" The first several paths are: ${paths.take(10).mkString(", ")}.") - HiveCatalogMetrics.incrementParallelListingJobCount(1) - } - HadoopFSUtils.parallelListLeafFiles(sparkSession.sparkContext, paths, hadoopConf, filter, - areSQLRootPaths = areRootPaths, ignoreMissingFiles = ignorMissingFiles, - ignoreLocality = ignoreLocality, parallelPartitionDiscoveryParallelism, - Some(parallelScanCallBack), Some(shouldFilterOut)) + parallelScanCallBack() + HadoopFSUtils.parallelListLeafFiles(sparkSession.sparkContext, paths, hadoopConf, filter, + areSQLRootPaths = areRootPaths, ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, parallelPartitionDiscoveryParallelism, + Some(shouldFilterOut _)) } /** Checks if we should filter out this path name. */ From 37253da81bd8ccd67fa437f57756cf16b0fd430e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 6 Jul 2020 14:23:05 -0700 Subject: [PATCH 05/30] Keep working on plumbing through the type info we need to avoid directly manipulating bytecode --- .../org/apache/spark/rdd/BinaryFileRDD.scala | 6 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 89 +++++++++++++++++-- .../apache/spark/rdd/WholeTextFileRDD.scala | 6 +- 3 files changed, 86 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 039dbcbd5e035..6b207977925b9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -26,14 +26,14 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl import org.apache.spark.{Partition, SparkContext} import org.apache.spark.input.StreamFileInputFormat -private[spark] class BinaryFileRDD[T]( +private[spark] class BinaryFileRDD[T, F <: <: StreamFileInputFormat[T]]( @transient private val sc: SparkContext, - inputFormatClass: Class[_ <: StreamFileInputFormat[T]], + inputFormatClass: Class[F], keyClass: Class[String], valueClass: Class[T], conf: Configuration, minPartitions: Int) - extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { + extends NewHadoopRDD[String, T, F](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { val conf = getConf diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index a7a6cf43b14a0..de1007b9fec10 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -68,9 +68,9 @@ private[spark] class NewHadoopPartition( * `org.apache.spark.SparkContext.newAPIHadoopRDD()` */ @DeveloperApi -class NewHadoopRDD[K, V]( +class NewHadoopRDD[K, V, F <: InputFormat[K, V]]( sc : SparkContext, - inputFormatClass: Class[_ <: InputFormat[K, V]], + inputFormatClass: Class[F], keyClass: Class[K], valueClass: Class[V], @transient private val _conf: Configuration) @@ -95,6 +95,10 @@ class NewHadoopRDD[K, V]( private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + private val listingParallelismThreshold = 2 + + private val accelerateListing = true + def getConf: Configuration = { val conf: Configuration = confBroadcast.value.value if (shouldCloneJobConf) { @@ -121,14 +125,8 @@ class NewHadoopRDD[K, V]( } override def getPartitions: Array[Partition] = { - val inputFormat = inputFormatClass.getConstructor().newInstance() - inputFormat match { - case configurable: Configurable => - configurable.setConf(_conf) - case _ => - } try { - val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala + val allRowSplits = getSplits() val rawSplits = if (ignoreEmptySplits) { allRowSplits.filter(_.getLength > 0) } else { @@ -164,6 +162,79 @@ class NewHadoopRDD[K, V]( } } + /** + * If configured and the format supports it, uses acceleratedGetSplits, otherwise + * asks the InputFormat directly for it's splits. + */ + private def getSplits(): Array[InputSplit] = { + if (accelerateListing) { + inputFormatClass.getMethod("listStatus") + acceleratedGetSplits() + } + delegateGetSplits() + } + + /** + * A version of get partitions which can use jobs on multiple workers, + * like in SQL. This does not delegate to getSplits, so any file + * format with custom getSplits logic should not be used. + * Only enable skipLocation if you know that location information is not + * used in split calculation. + */ + private final def acceleratedGetSplits(): + Array[InputSplit] = { + // Override listStatus to use Spark listing code + object OverriddenFormat extends F { + override def listStatus(job: JobConf): Array[FileStatus] = { + // get tokens for all the required FileSystems.. + TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); + + val filters = List(hiddenFileFilter, getInputPathFilter(job)).filter(_ != null) + val filter = MultiPathFilter(filters) + val recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); + + if (dirs.size < parallelListThreshold) { + dirs.flatMap{dir => + HadoopFSUtils.listLeafFiles( + path = dir, + hadoopConf = jobConf, + filter = filter, + contextOpt = Some(sc), + ignoreMissingFiles = ignoreMissingFiles, + isSQLRootPath = false, + ignoreLocality = ignoreLocality, + parallelScanCallBack = None, + filterFun = None, + parallelismThreshold = listingParallelismThreshold, + maxParallelism = maxListingParallelism) + } + } else { + HadoopFSUtils.parallelListLeafFiles( + sc = sc, + paths = dirs, + hadoopConf = jobConf, + filter = filter, + areSQLRootPaths = false, + ignoreMissingFiles = ignoreMissingFiles, + ignorLocality = ignoreLocality, + maxParallelism = maxListingParallelism, + filterFun = None) + } + } + } + OverriddenFormat.getSplits() + } + + private def delegateGetSplits(): Array[InputSplit] = { + val inputFormat = inputFormatClass.getConstructor().newInstance() + inputFormat match { + case configurable: Configurable => + configurable.setConf(_conf) + case _ => + } + inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala + } + override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new Iterator[(K, V)] { private val split = theSplit.asInstanceOf[NewHadoopPartition] diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala index eada762b99c8e..0b83c03673289 100644 --- a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala @@ -29,14 +29,14 @@ import org.apache.spark.input.WholeTextFileInputFormat /** * An RDD that reads a bunch of text files in, and each text file becomes one record. */ -private[spark] class WholeTextFileRDD( +private[spark] class WholeTextFileRDD[F <: WholeTextFileInputFormat]( sc : SparkContext, - inputFormatClass: Class[_ <: WholeTextFileInputFormat], + inputFormatClass: Class[F], keyClass: Class[Text], valueClass: Class[Text], conf: Configuration, minPartitions: Int) - extends NewHadoopRDD[Text, Text](sc, inputFormatClass, keyClass, valueClass, conf) { + extends NewHadoopRDD[Text, Text, F](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { val conf = getConf From e6eee1db58d1844cfa970658b8e7b807fa84d895 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 7 Jul 2020 18:47:12 -0700 Subject: [PATCH 06/30] Ok core compiles now --- .../org/apache/spark/rdd/BinaryFileRDD.scala | 4 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 176 +++++++++++++----- .../apache/spark/rdd/WholeTextFileRDD.scala | 2 +- 3 files changed, 134 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 6b207977925b9..590a0bb1840ae 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -26,14 +26,14 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl import org.apache.spark.{Partition, SparkContext} import org.apache.spark.input.StreamFileInputFormat -private[spark] class BinaryFileRDD[T, F <: <: StreamFileInputFormat[T]]( +private[spark] class BinaryFileRDD[T, F <: StreamFileInputFormat[T]]( @transient private val sc: SparkContext, inputFormatClass: Class[F], keyClass: Class[String], valueClass: Class[T], conf: Configuration, minPartitions: Int) - extends NewHadoopRDD[String, T, F](sc, inputFormatClass, keyClass, valueClass, conf) { + extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { val conf = getConf diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index de1007b9fec10..acff86e5a63c4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -22,9 +22,11 @@ import java.text.SimpleDateFormat import java.util.{Date, Locale} import scala.collection.JavaConverters.asScalaBufferConverter +import scala.collection.mutable import scala.reflect.ClassTag import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path, PathFilter} import org.apache.hadoop.io.Writable import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.hadoop.mapred.JobConf @@ -39,7 +41,8 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils} +import org.apache.spark.util.{HadoopFSUtils, SerializableConfiguration, ShutdownHookManager, Utils} + private[spark] class NewHadoopPartition( rddId: Int, @@ -68,9 +71,9 @@ private[spark] class NewHadoopPartition( * `org.apache.spark.SparkContext.newAPIHadoopRDD()` */ @DeveloperApi -class NewHadoopRDD[K, V, F <: InputFormat[K, V]]( +class NewHadoopRDD[K, V]( sc : SparkContext, - inputFormatClass: Class[F], + inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @transient private val _conf: Configuration) @@ -99,6 +102,10 @@ class NewHadoopRDD[K, V, F <: InputFormat[K, V]]( private val accelerateListing = true + private val ignoreLocality = true + + private val maxListingParallelism = 100 + def getConf: Configuration = { val conf: Configuration = confBroadcast.value.value if (shouldCloneJobConf) { @@ -166,10 +173,21 @@ class NewHadoopRDD[K, V, F <: InputFormat[K, V]]( * If configured and the format supports it, uses acceleratedGetSplits, otherwise * asks the InputFormat directly for it's splits. */ - private def getSplits(): Array[InputSplit] = { + private def getSplits(): Seq[InputSplit] = { + val inputFormat = inputFormatClass.getConstructor().newInstance() + inputFormat match { + case configurable: Configurable => + configurable.setConf(_conf) + case _ => + } + if (accelerateListing) { - inputFormatClass.getMethod("listStatus") - acceleratedGetSplits() + inputFormat match { + case fileFormat: FileInputFormat[K, V] => + acceleratedGetSplits(fileFormat) + case _ => + delegateGetSplits() + } } delegateGetSplits() } @@ -181,51 +199,120 @@ class NewHadoopRDD[K, V, F <: InputFormat[K, V]]( * Only enable skipLocation if you know that location information is not * used in split calculation. */ - private final def acceleratedGetSplits(): - Array[InputSplit] = { - // Override listStatus to use Spark listing code - object OverriddenFormat extends F { - override def listStatus(job: JobConf): Array[FileStatus] = { - // get tokens for all the required FileSystems.. - TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job); - - val filters = List(hiddenFileFilter, getInputPathFilter(job)).filter(_ != null) - val filter = MultiPathFilter(filters) - val recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false); - - if (dirs.size < parallelListThreshold) { - dirs.flatMap{dir => - HadoopFSUtils.listLeafFiles( - path = dir, - hadoopConf = jobConf, - filter = filter, - contextOpt = Some(sc), - ignoreMissingFiles = ignoreMissingFiles, - isSQLRootPath = false, - ignoreLocality = ignoreLocality, - parallelScanCallBack = None, - filterFun = None, - parallelismThreshold = listingParallelismThreshold, - maxParallelism = maxListingParallelism) - } - } else { - HadoopFSUtils.parallelListLeafFiles( - sc = sc, - paths = dirs, - hadoopConf = jobConf, + private final def acceleratedGetSplits(fileFormat: FileInputFormat[K, V]): + Seq[InputSplit] = { + + val jobContext = new JobContextImpl(_conf, jobId) + val conf = getConf + + // Get the file statuses + def listStatus(): Seq[FileStatus] = { + // The hiddenFileFilter is private but we want it, but it's final so + // we can recreate it safely. + val filter = new PathFilter() { + override def accept(p: Path): Boolean = { + val name = p.getName() + !name.startsWith("_") && !name.startsWith(".") + } + } + + val recursive = conf.getBoolean("mapred.input.dir.recursive", false) + val dirs = FileInputFormat.getInputPaths(jobContext) + + if (dirs.size < listingParallelismThreshold) { + dirs.flatMap{dir => + HadoopFSUtils.listLeafFiles( + path = dir, + hadoopConf = conf, filter = filter, - areSQLRootPaths = false, + contextOpt = Some(sc), ignoreMissingFiles = ignoreMissingFiles, - ignorLocality = ignoreLocality, - maxParallelism = maxListingParallelism, - filterFun = None) + isSQLRootPath = false, + ignoreLocality = ignoreLocality, + parallelScanCallBack = None, + filterFun = None, + parallelismThreshold = listingParallelismThreshold, + maxParallelism = maxListingParallelism) } + } else { + HadoopFSUtils.parallelListLeafFiles( + sc = sc, + paths = dirs, + hadoopConf = conf, + filter = filter, + areSQLRootPaths = false, + ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, + maxParallelism = maxListingParallelism, + filterFun = None).flatMap(_._2) } } - OverriddenFormat.getSplits() + + val files = listStatus() + val totalSize = files.map(_.getLen()).sum + val minSize = Math.max(1, FileInputFormat.getMinSplitSize(jobContext)) + val maxSize = FileInputFormat.getMaxSplitSize(jobContext) + val ret = new mutable.ArrayBuilder.ofRef[FileSplit]() + + files.foreach { file => + val path = file.getPath() + file.getLen() match { + // Empty files are easier + case 0 => + ret += new FileSplit(file.getPath(), 0, 0, null) + case length => + lazy val blockLocations = file match { + case located: LocatedFileStatus => + located.getBlockLocations() + case _ => + val fs = path.getFileSystem(jobContext.getConfiguration()) + fs.getFileBlockLocations(file, 0, length); + } + + def blockHosts(idx: Int) = { + ignoreLocality match { + case true => null + case false => blockLocations(idx).getHosts() + } + } + def getBlockIndex(offset: Long): Int = { + ignoreLocality match { + case true => 0 + case false => + val loc = blockLocations.indexWhere{loc => + (loc.getOffset() <= offset) && (offset <= loc.getOffset() + loc.getLength()) + } + if (loc == -1) { + throw new IllegalArgumentException( + s"Offset ${offset} is outside of file") + } + loc + } + } + val blockSize = file.getBlockSize() + val splitSize = Math.max(minSize, Math.min(blockSize, maxSize)) + + var bytesRemaining = length + + while ((bytesRemaining.toDouble) / splitSize > 1.1) { + val blkIndex = getBlockIndex(length-bytesRemaining) + ret += (new FileSplit(file.getPath(), length-bytesRemaining, + bytesRemaining, blockHosts(blkIndex))) + } + // If we've got any bytes remaining one last split + if (bytesRemaining != 0) { + val blkIndex = getBlockIndex(length-bytesRemaining) + ret += (new FileSplit(file.getPath(), length-bytesRemaining, + bytesRemaining, blockHosts(blkIndex))) + } + } + } + ret.result() } - private def delegateGetSplits(): Array[InputSplit] = { + + + private def delegateGetSplits(): Seq[InputSplit] = { val inputFormat = inputFormatClass.getConstructor().newInstance() inputFormat match { case configurable: Configurable => @@ -407,7 +494,6 @@ class NewHadoopRDD[K, V, F <: InputFormat[K, V]]( } super.persist(storageLevel) } - } private[spark] object NewHadoopRDD { diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala index 0b83c03673289..7f75200e98917 100644 --- a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala @@ -36,7 +36,7 @@ private[spark] class WholeTextFileRDD[F <: WholeTextFileInputFormat]( valueClass: Class[Text], conf: Configuration, minPartitions: Int) - extends NewHadoopRDD[Text, Text, F](sc, inputFormatClass, keyClass, valueClass, conf) { + extends NewHadoopRDD[Text, Text](sc, inputFormatClass, keyClass, valueClass, conf) { override def getPartitions: Array[Partition] = { val conf = getConf From 0492bee70476d1595b09f48fcf5ddc92fd9af777 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 7 Jul 2020 18:50:11 -0700 Subject: [PATCH 07/30] Get the input filter based on the jobContext --- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index acff86e5a63c4..f3f97deb44cd2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -210,9 +210,10 @@ class NewHadoopRDD[K, V]( // The hiddenFileFilter is private but we want it, but it's final so // we can recreate it safely. val filter = new PathFilter() { + val parentFilter = FileInputFormat.getInputPathFilter(jobContext) override def accept(p: Path): Boolean = { val name = p.getName() - !name.startsWith("_") && !name.startsWith(".") + !name.startsWith("_") && !name.startsWith(".") && parentFilter.accept(p) } } From 138e14a867faa26af4539e2b1a3110deb6e6649e Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 7 Jul 2020 18:55:27 -0700 Subject: [PATCH 08/30] Backout some small changes we don't need anymore --- core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala | 2 +- .../main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index 590a0bb1840ae..efb3180d61e79 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl import org.apache.spark.{Partition, SparkContext} import org.apache.spark.input.StreamFileInputFormat -private[spark] class BinaryFileRDD[T, F <: StreamFileInputFormat[T]]( +private[spark] class BinaryFileRDD[T]( @transient private val sc: SparkContext, inputFormatClass: Class[F], keyClass: Class[String], diff --git a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala index 7f75200e98917..eada762b99c8e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/WholeTextFileRDD.scala @@ -29,9 +29,9 @@ import org.apache.spark.input.WholeTextFileInputFormat /** * An RDD that reads a bunch of text files in, and each text file becomes one record. */ -private[spark] class WholeTextFileRDD[F <: WholeTextFileInputFormat]( +private[spark] class WholeTextFileRDD( sc : SparkContext, - inputFormatClass: Class[F], + inputFormatClass: Class[_ <: WholeTextFileInputFormat], keyClass: Class[Text], valueClass: Class[Text], conf: Configuration, From dace630637d1c77e840752566890bd705f4fffbc Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 13 Jul 2020 18:14:29 -0700 Subject: [PATCH 09/30] Revert the class change to BinaryFileRDD we don't depend on that anymore. --- core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala index efb3180d61e79..039dbcbd5e035 100644 --- a/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala @@ -28,7 +28,7 @@ import org.apache.spark.input.StreamFileInputFormat private[spark] class BinaryFileRDD[T]( @transient private val sc: SparkContext, - inputFormatClass: Class[F], + inputFormatClass: Class[_ <: StreamFileInputFormat[T]], keyClass: Class[String], valueClass: Class[T], conf: Configuration, From 20586d3b90ed70952247288276ac0dcf17f9234a Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 14 Jul 2020 05:52:31 -0700 Subject: [PATCH 10/30] Fix dropped annotation --- core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index f3f97deb44cd2..902c290d06124 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -72,8 +72,8 @@ private[spark] class NewHadoopPartition( */ @DeveloperApi class NewHadoopRDD[K, V]( - sc : SparkContext, - inputFormatClass: Class[_ <: InputFormat[K, V]], + @transient sc: SparkContext, + inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @transient private val _conf: Configuration) From 7bb0770433bb6252aa42510e0b6c0bc98c738ae9 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 28 Jul 2020 20:37:12 -0700 Subject: [PATCH 11/30] Back out NewHadoopRDD changes, we'll expose a trait instead and have people mix it in --- .../org/apache/spark/rdd/NewHadoopRDD.scala | 156 +----------------- 1 file changed, 2 insertions(+), 154 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 902c290d06124..34587b17dddb0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -22,11 +22,9 @@ import java.text.SimpleDateFormat import java.util.{Date, Locale} import scala.collection.JavaConverters.asScalaBufferConverter -import scala.collection.mutable import scala.reflect.ClassTag import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path, PathFilter} import org.apache.hadoop.io.Writable import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.hadoop.mapred.JobConf @@ -41,7 +39,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{HadoopFSUtils, SerializableConfiguration, ShutdownHookManager, Utils} +import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils} private[spark] class NewHadoopPartition( @@ -98,14 +96,6 @@ class NewHadoopRDD[K, V]( private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) - private val listingParallelismThreshold = 2 - - private val accelerateListing = true - - private val ignoreLocality = true - - private val maxListingParallelism = 100 - def getConf: Configuration = { val conf: Configuration = confBroadcast.value.value if (shouldCloneJobConf) { @@ -169,10 +159,6 @@ class NewHadoopRDD[K, V]( } } - /** - * If configured and the format supports it, uses acceleratedGetSplits, otherwise - * asks the InputFormat directly for it's splits. - */ private def getSplits(): Seq[InputSplit] = { val inputFormat = inputFormatClass.getConstructor().newInstance() inputFormat match { @@ -181,145 +167,6 @@ class NewHadoopRDD[K, V]( case _ => } - if (accelerateListing) { - inputFormat match { - case fileFormat: FileInputFormat[K, V] => - acceleratedGetSplits(fileFormat) - case _ => - delegateGetSplits() - } - } - delegateGetSplits() - } - - /** - * A version of get partitions which can use jobs on multiple workers, - * like in SQL. This does not delegate to getSplits, so any file - * format with custom getSplits logic should not be used. - * Only enable skipLocation if you know that location information is not - * used in split calculation. - */ - private final def acceleratedGetSplits(fileFormat: FileInputFormat[K, V]): - Seq[InputSplit] = { - - val jobContext = new JobContextImpl(_conf, jobId) - val conf = getConf - - // Get the file statuses - def listStatus(): Seq[FileStatus] = { - // The hiddenFileFilter is private but we want it, but it's final so - // we can recreate it safely. - val filter = new PathFilter() { - val parentFilter = FileInputFormat.getInputPathFilter(jobContext) - override def accept(p: Path): Boolean = { - val name = p.getName() - !name.startsWith("_") && !name.startsWith(".") && parentFilter.accept(p) - } - } - - val recursive = conf.getBoolean("mapred.input.dir.recursive", false) - val dirs = FileInputFormat.getInputPaths(jobContext) - - if (dirs.size < listingParallelismThreshold) { - dirs.flatMap{dir => - HadoopFSUtils.listLeafFiles( - path = dir, - hadoopConf = conf, - filter = filter, - contextOpt = Some(sc), - ignoreMissingFiles = ignoreMissingFiles, - isSQLRootPath = false, - ignoreLocality = ignoreLocality, - parallelScanCallBack = None, - filterFun = None, - parallelismThreshold = listingParallelismThreshold, - maxParallelism = maxListingParallelism) - } - } else { - HadoopFSUtils.parallelListLeafFiles( - sc = sc, - paths = dirs, - hadoopConf = conf, - filter = filter, - areSQLRootPaths = false, - ignoreMissingFiles = ignoreMissingFiles, - ignoreLocality = ignoreLocality, - maxParallelism = maxListingParallelism, - filterFun = None).flatMap(_._2) - } - } - - val files = listStatus() - val totalSize = files.map(_.getLen()).sum - val minSize = Math.max(1, FileInputFormat.getMinSplitSize(jobContext)) - val maxSize = FileInputFormat.getMaxSplitSize(jobContext) - val ret = new mutable.ArrayBuilder.ofRef[FileSplit]() - - files.foreach { file => - val path = file.getPath() - file.getLen() match { - // Empty files are easier - case 0 => - ret += new FileSplit(file.getPath(), 0, 0, null) - case length => - lazy val blockLocations = file match { - case located: LocatedFileStatus => - located.getBlockLocations() - case _ => - val fs = path.getFileSystem(jobContext.getConfiguration()) - fs.getFileBlockLocations(file, 0, length); - } - - def blockHosts(idx: Int) = { - ignoreLocality match { - case true => null - case false => blockLocations(idx).getHosts() - } - } - def getBlockIndex(offset: Long): Int = { - ignoreLocality match { - case true => 0 - case false => - val loc = blockLocations.indexWhere{loc => - (loc.getOffset() <= offset) && (offset <= loc.getOffset() + loc.getLength()) - } - if (loc == -1) { - throw new IllegalArgumentException( - s"Offset ${offset} is outside of file") - } - loc - } - } - val blockSize = file.getBlockSize() - val splitSize = Math.max(minSize, Math.min(blockSize, maxSize)) - - var bytesRemaining = length - - while ((bytesRemaining.toDouble) / splitSize > 1.1) { - val blkIndex = getBlockIndex(length-bytesRemaining) - ret += (new FileSplit(file.getPath(), length-bytesRemaining, - bytesRemaining, blockHosts(blkIndex))) - } - // If we've got any bytes remaining one last split - if (bytesRemaining != 0) { - val blkIndex = getBlockIndex(length-bytesRemaining) - ret += (new FileSplit(file.getPath(), length-bytesRemaining, - bytesRemaining, blockHosts(blkIndex))) - } - } - } - ret.result() - } - - - - private def delegateGetSplits(): Seq[InputSplit] = { - val inputFormat = inputFormatClass.getConstructor().newInstance() - inputFormat match { - case configurable: Configurable => - configurable.setConf(_conf) - case _ => - } inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala } @@ -495,6 +342,7 @@ class NewHadoopRDD[K, V]( } super.persist(storageLevel) } + } private[spark] object NewHadoopRDD { From 4eb770a96be0c2b9f3f01bcc1ea8a83573690ea7 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 28 Jul 2020 20:38:09 -0700 Subject: [PATCH 12/30] Rework the HadoopFSUtils to: use the serilizablility of block locations, try and avoid expensive S3A location lookups (and other general file systems), etc. --- .../org/apache/spark/util/HadoopFSUtils.scala | 81 ++++--------------- 1 file changed, 17 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 8a8466ecdbd3e..905be8eeff1f3 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -62,7 +62,7 @@ private[spark] object HadoopFSUtils extends Logging { case s => s"Listing leaf files and directories for $s paths:
${paths(0)}, ..." } - sc.setJobDescription(description) + sc.setJobDescription(description) // TODO(holden): should we use jobgroup? sc .parallelize(serializedPaths, numParallelism) .mapPartitions { pathStrings => @@ -81,53 +81,12 @@ private[spark] object HadoopFSUtils extends Logging { ) (path, leafFiles) }.iterator - }.map { case (path, statuses) => - val serializableStatuses = statuses.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - } - (path.toString, serializableStatuses) - }.collect() + }.collect() // TODO(holden): should we use local itr here? } finally { sc.setJobDescription(previousJobDescription) } - // turn SerializableFileStatus back to Status - statusMap.map { case (path, serializableStatuses) => - val statuses = serializableStatuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, - new Path(f.path)), - blockLocations) - } - (new Path(path), statuses) - } + statusMap.toSeq } /** * Lists a single filesystem path recursively. If a Sparkcontext object is specified, this @@ -169,7 +128,19 @@ private[spark] object HadoopFSUtils extends Logging { def next(): LocatedFileStatus = remoteIter.next def hasNext(): Boolean = remoteIter.hasNext }.toArray - case _ => fs.listStatus(path) + case _ => + // Try and use the accelerated code path even if it isn't known + // to support it, and fall back. + try { + val remoteIter = fs.listLocatedStatus(path) + new Iterator[LocatedFileStatus]() { + def next(): LocatedFileStatus = remoteIter.next + def hasNext(): Boolean = remoteIter.hasNext + }.toArray + } catch { + case e: FileNotFoundException => + fs.listStatus(path) + } } } catch { // If we are listing a root path for SQL (e.g. a top level directory of a table), we need to @@ -291,24 +262,6 @@ private[spark] object HadoopFSUtils extends Logging { s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}") } - resolvedLeafStatuses + resolvedLeafStatuses.toSeq } - - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) } From 8a5fd8bc1741dc203cf0c08751f1bccd50b35445 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 29 Jul 2020 15:35:04 -0700 Subject: [PATCH 13/30] Fix the bug and remove default params so it's more difficult to write --- .../org/apache/spark/util/HadoopFSUtils.scala | 22 ++++++++++-------- .../datasources/InMemoryFileIndex.scala | 23 +++++++------------ .../datasources/FileIndexSuite.scala | 5 ++-- 3 files changed, 24 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 905be8eeff1f3..02ba9cb729255 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -29,6 +29,8 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.SparkContext import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics + private[spark] object HadoopFSUtils extends Logging { /** @@ -44,6 +46,7 @@ private[spark] object HadoopFSUtils extends Logging { filter: PathFilter, areSQLRootPaths: Boolean, ignoreMissingFiles: Boolean, ignoreLocality: Boolean, maxParallelism: Int, filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = { + HiveCatalogMetrics.incrementParallelListingJobCount(1) val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializedPaths = paths.map(_.toString) @@ -77,8 +80,8 @@ private[spark] object HadoopFSUtils extends Logging { ignoreLocality = ignoreLocality, isSQLRootPath = areSQLRootPaths, filterFun = filterFun, - parallelScanCallBack = None // Can't execute parallel scans on workers - ) + parallelismThreshold = Int.MaxValue, + maxParallelism = 0) (path, leafFiles) }.iterator }.collect() // TODO(holden): should we use local itr here? @@ -105,10 +108,9 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles: Boolean, ignoreLocality: Boolean, isSQLRootPath: Boolean, - parallelScanCallBack: Option[() => Unit], filterFun: Option[String => Boolean], - parallelismThreshold: Int = 1, - maxParallelism: Int = 1): Seq[FileStatus] = { + parallelismThreshold: Int, + maxParallelism: Int): Seq[FileStatus] = { // scalastyle:on argcount logTrace(s"Listing $path") @@ -128,7 +130,7 @@ private[spark] object HadoopFSUtils extends Logging { def next(): LocatedFileStatus = remoteIter.next def hasNext(): Boolean = remoteIter.hasNext }.toArray - case _ => + case _ if !ignoreLocality => // Try and use the accelerated code path even if it isn't known // to support it, and fall back. try { @@ -141,6 +143,8 @@ private[spark] object HadoopFSUtils extends Logging { case e: FileNotFoundException => fs.listStatus(path) } + case _ => + fs.listStatus(path) } } catch { // If we are listing a root path for SQL (e.g. a top level directory of a table), we need to @@ -178,7 +182,6 @@ private[spark] object HadoopFSUtils extends Logging { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) val nestedFiles: Seq[FileStatus] = contextOpt match { case Some(context) if dirs.size > parallelismThreshold => - parallelScanCallBack.foreach(f => f()) parallelListLeafFiles( context, dirs.map(_.getPath), @@ -200,8 +203,9 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, isSQLRootPath = false, - parallelScanCallBack = parallelScanCallBack, - filterFun = filterFun) + filterFun = filterFun, + parallelismThreshold = parallelismThreshold, + maxParallelism = maxParallelism) } } val allFiles = topLevelFiles ++ nestedFiles diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index b602ebfa7fe89..58c0faec6d37f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -161,16 +161,10 @@ object InMemoryFileIndex extends Logging { sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism val sc = sparkSession.sparkContext - def parallelScanCallBack(): Unit = { - logInfo(s"Listing leaf files and directories in parallel under ${paths.length} paths." + - s" The first several paths are: ${paths.take(10).mkString(", ")}.") - HiveCatalogMetrics.incrementParallelListingJobCount(1) - } - // Short-circuits parallel listing when serial listing is likely to be faster. val threshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold - if (paths.size <= threshold) { - return paths.map { path => + val result = if (paths.size <= threshold) { + paths.map { path => val leafFiles = HadoopFSUtils.listLeafFiles( path = path, hadoopConf = hadoopConf, @@ -179,19 +173,18 @@ object InMemoryFileIndex extends Logging { ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, isSQLRootPath = areRootPaths, - parallelScanCallBack = Some(parallelScanCallBack _), filterFun = Some(shouldFilterOut _), parallelismThreshold = threshold, maxParallelism = parallelPartitionDiscoveryParallelism) (path, leafFiles) } + } else { + HadoopFSUtils.parallelListLeafFiles(sparkSession.sparkContext, paths, hadoopConf, filter, + areSQLRootPaths = areRootPaths, ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, parallelPartitionDiscoveryParallelism, + Some(shouldFilterOut _)) } - - parallelScanCallBack() - HadoopFSUtils.parallelListLeafFiles(sparkSession.sparkContext, paths, hadoopConf, filter, - areSQLRootPaths = areRootPaths, ignoreMissingFiles = ignoreMissingFiles, - ignoreLocality = ignoreLocality, parallelPartitionDiscoveryParallelism, - Some(shouldFilterOut _)) + result } /** Checks if we should filter out this path name. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 02be8c9221704..3f67fede7740e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -290,9 +290,9 @@ class FileIndexSuite extends SharedSparkSession { } } HiveCatalogMetrics.reset() - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() === 0) new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None) - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar) + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() === expectedNumPar) } } } @@ -516,6 +516,7 @@ class FileIndexSuite extends SharedSparkSession { SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) } } + } object DeletionRaceFileSystem { From d85c5a4c8264f8ebae70dc930b72670b7240cc11 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 3 Aug 2020 14:07:51 -0700 Subject: [PATCH 14/30] Remove un-used changes to NewHadoopRDD --- .../org/apache/spark/rdd/NewHadoopRDD.scala | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 34587b17dddb0..a7a6cf43b14a0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -41,7 +41,6 @@ import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils} - private[spark] class NewHadoopPartition( rddId: Int, val index: Int, @@ -70,7 +69,7 @@ private[spark] class NewHadoopPartition( */ @DeveloperApi class NewHadoopRDD[K, V]( - @transient sc: SparkContext, + sc : SparkContext, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @@ -122,8 +121,14 @@ class NewHadoopRDD[K, V]( } override def getPartitions: Array[Partition] = { + val inputFormat = inputFormatClass.getConstructor().newInstance() + inputFormat match { + case configurable: Configurable => + configurable.setConf(_conf) + case _ => + } try { - val allRowSplits = getSplits() + val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala val rawSplits = if (ignoreEmptySplits) { allRowSplits.filter(_.getLength > 0) } else { @@ -159,17 +164,6 @@ class NewHadoopRDD[K, V]( } } - private def getSplits(): Seq[InputSplit] = { - val inputFormat = inputFormatClass.getConstructor().newInstance() - inputFormat match { - case configurable: Configurable => - configurable.setConf(_conf) - case _ => - } - - inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala - } - override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = new Iterator[(K, V)] { private val split = theSplit.asInstanceOf[NewHadoopPartition] From 3b0bf1823ed4e74d3cd35ecdd72c081a90ab95d4 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 3 Aug 2020 21:15:02 -0700 Subject: [PATCH 15/30] Put the utility function in HadoopFSUtils to make it easier. --- .../input/WholeTextFileInputFormat.scala | 7 ++ .../spark/internal/config/package.scala | 34 ++++++ .../org/apache/spark/util/HadoopFSUtils.scala | 100 ++++++++++++++++-- 3 files changed, 134 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index 692deb7a3282f..395be076c1ffc 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -24,6 +24,8 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat +import org.apache.spark.util.HadoopFSUtils + /** * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for * reading whole text files. Each file is read as key-value pair, where the key is the file path and @@ -68,4 +70,9 @@ private[spark] class WholeTextFileInputFormat } super.setMaxSplitSize(maxSplitSize) } + + // Note: we override listStatus but not getSplits because our parent class overrides it. + override protected def listStatus(job: JobContext) = { + HadoopFSUtils.alternativeStatus(job, this) + } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b308115935d64..ca303b10a2a52 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -443,6 +443,40 @@ package object config { .booleanConf .createWithDefault(false) + val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = + ConfigBuilder("spark.rdd.sources.parallelPartitionDiscovery.threshold") + .doc("The maximum number of paths allowed for listing files at driver side. If the number " + + "of detected paths exceeds this value during partition discovery, it tries to list the " + + "files with another Spark distributed job. This configuration is effective only when " + + "using file-based using HadoopFSUtils") + .version("1.5.0") + .intConf + .checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " + + "files at driver side must not be negative") + .createWithDefault(2) + + val PARALLEL_PARTITION_DISCOVERY_PARALLELISM = + ConfigBuilder("spark.rdd.sources.parallelPartitionDiscovery.parallelism") + .doc("The number of parallelism to list a collection of path recursively, Set the " + + "number to prevent file listing from generating too many tasks.") + .version("3.1.0") + .internal() + .intConf + .createWithDefault(10000) + + val IGNORE_DATA_LOCALITY = + ConfigBuilder("spark.rdd.sources.ignoreDataLocality") + .doc("If true, Spark will not fetch the block locations for each file on " + + "listing files. This speeds up file listing, but the scheduler cannot " + + "schedule tasks to take advantage of data locality. It can be particularly " + + "useful if data is read from a remote cluster so the scheduler could never " + + "take advantage of locality anyway. This configuration is effective only when " + + "using file-based using HadoopFSUtils") + .version("3.1.0") + .internal() + .booleanConf + .createWithDefault(true) + private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") .internal() diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 02ba9cb729255..6f48319e537a6 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -18,22 +18,33 @@ package org.apache.spark.util import java.io.FileNotFoundException +import java.util.{List => jList} +import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.viewfs.ViewFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem -import org.apache.hadoop.mapred.{FileInputFormat, JobConf} +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.JobContext +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging +import org.apache.spark._ +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.{config, Logging} import org.apache.spark.metrics.source.HiveCatalogMetrics - -private[spark] object HadoopFSUtils extends Logging { +/** + * :: DeveloperApi :: + * Utility functions to simplify and speed-up file listing. + * To see how to use these look at + */ +@DeveloperApi +object HadoopFSUtils extends Logging { /** + * :: DeveloperApi :: * Lists a collection of paths recursively. Picks the listing strategy adaptively depending * on the number of paths to list. * @@ -41,7 +52,7 @@ private[spark] object HadoopFSUtils extends Logging { * * @return for each input path, the set of discovered files for the path */ - + @DeveloperApi def parallelListLeafFiles(sc: SparkContext, paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, areSQLRootPaths: Boolean, ignoreMissingFiles: Boolean, ignoreLocality: Boolean, maxParallelism: Int, @@ -92,6 +103,7 @@ private[spark] object HadoopFSUtils extends Logging { statusMap.toSeq } /** + * :: DeveloperApi :: * Lists a single filesystem path recursively. If a Sparkcontext object is specified, this * function may launch Spark jobs to parallelize listing based on parallelismThreshold. * @@ -100,6 +112,7 @@ private[spark] object HadoopFSUtils extends Logging { * @return all children of path that match the specified filter. */ // scalastyle:off argcount + @DeveloperApi def listLeafFiles( path: Path, hadoopConf: Configuration, @@ -144,7 +157,13 @@ private[spark] object HadoopFSUtils extends Logging { fs.listStatus(path) } case _ => - fs.listStatus(path) + // We are ignoring locality, so we'll use a null locality + fs.listStatus(path).map{f => + f match { + case _: LocatedFileStatus => f + case _ => new LocatedFileStatus(f, null) + } + } } } catch { // If we are listing a root path for SQL (e.g. a top level directory of a table), we need to @@ -268,4 +287,71 @@ private[spark] object HadoopFSUtils extends Logging { resolvedLeafStatuses.toSeq } + + /** + * Utility function to make it easier for FileInputFormats to use + * HadoopFSUtils. + */ + def alternativeStatus(job: JobContext, format: FileInputFormat[_, _]): + jList[FileStatus] = { + + val conf = job.getConfiguration() + + val sc = SparkContext.getActive match { + case None => throw new SparkException("No active SparkContext.") + case Some(x) => x + } + + // TODO: use real configurations + + val listingParallelismThreshold = sc.conf.get(config.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) + + val ignoreLocality = sc.conf.get(config.IGNORE_DATA_LOCALITY) + + val maxListingParallelism = sc.conf.get(config.PARALLEL_PARTITION_DISCOVERY_PARALLELISM) + + val ignoreMissingFiles = sc.conf.get(config.IGNORE_MISSING_FILES) + + val recursive = conf.getBoolean("mapred.input.dir.recursive", false) + + // The hiddenFileFilter is private but we want it, but it's final so + // we can recreate it safely. + val filter = new PathFilter() { + val parentFilter = FileInputFormat.getInputPathFilter(job) + override def accept(p: Path): Boolean = { + val name = p.getName() + !name.startsWith("_") && !name.startsWith(".") && parentFilter.accept(p) + } + } + + val dirs: Seq[Path] = FileInputFormat.getInputPaths(job) + + val statuses = if (dirs.size < listingParallelismThreshold) { + dirs.flatMap{dir => + listLeafFiles( + path = dir, + hadoopConf = conf, + filter = filter, + contextOpt = Some(sc), + ignoreMissingFiles = ignoreMissingFiles, + isSQLRootPath = false, + ignoreLocality = ignoreLocality, + filterFun = None, + parallelismThreshold = listingParallelismThreshold, + maxParallelism = maxListingParallelism) + }.toList + } else { + parallelListLeafFiles( + sc = sc, + paths = dirs, + hadoopConf = conf, + filter = filter, + areSQLRootPaths = false, + ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, + maxParallelism = maxListingParallelism, + filterFun = None).flatMap(_._2).toList + } + statuses.asJava + } } From 8b7a2fa9015278c654f1d0bbb01727d3d77d7157 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Tue, 18 Aug 2020 22:26:38 -0700 Subject: [PATCH 16/30] Fix NPE --- core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 6f48319e537a6..b878a20bcaa43 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -320,7 +320,8 @@ object HadoopFSUtils extends Logging { val parentFilter = FileInputFormat.getInputPathFilter(job) override def accept(p: Path): Boolean = { val name = p.getName() - !name.startsWith("_") && !name.startsWith(".") && parentFilter.accept(p) + !name.startsWith("_") && !name.startsWith(".") && + parentFilter != null && parentFilter.accept(p) } } From e0ec9a6fa7e87e32b59f687f6cb88125660bde75 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 19 Aug 2020 11:01:18 -0700 Subject: [PATCH 17/30] Defaults ignoreLocality to false and small cleanups --- .../spark/internal/config/package.scala | 2 +- .../org/apache/spark/util/HadoopFSUtils.scala | 28 ++----------------- 2 files changed, 4 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index ca303b10a2a52..6096d66318a39 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -475,7 +475,7 @@ package object config { .version("3.1.0") .internal() .booleanConf - .createWithDefault(true) + .createWithDefault(false) private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index b878a20bcaa43..db284d441ed0b 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.viewfs.ViewFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem -import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.JobContext import org.apache.hadoop.mapreduce.lib.input.FileInputFormat @@ -143,19 +142,6 @@ object HadoopFSUtils extends Logging { def next(): LocatedFileStatus = remoteIter.next def hasNext(): Boolean = remoteIter.hasNext }.toArray - case _ if !ignoreLocality => - // Try and use the accelerated code path even if it isn't known - // to support it, and fall back. - try { - val remoteIter = fs.listLocatedStatus(path) - new Iterator[LocatedFileStatus]() { - def next(): LocatedFileStatus = remoteIter.next - def hasNext(): Boolean = remoteIter.hasNext - }.toArray - } catch { - case e: FileNotFoundException => - fs.listStatus(path) - } case _ => // We are ignoring locality, so we'll use a null locality fs.listStatus(path).map{f => @@ -294,34 +280,26 @@ object HadoopFSUtils extends Logging { */ def alternativeStatus(job: JobContext, format: FileInputFormat[_, _]): jList[FileStatus] = { - - val conf = job.getConfiguration() - + val conf = job.getConfiguration val sc = SparkContext.getActive match { case None => throw new SparkException("No active SparkContext.") case Some(x) => x } // TODO: use real configurations - val listingParallelismThreshold = sc.conf.get(config.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) - val ignoreLocality = sc.conf.get(config.IGNORE_DATA_LOCALITY) - val maxListingParallelism = sc.conf.get(config.PARALLEL_PARTITION_DISCOVERY_PARALLELISM) - val ignoreMissingFiles = sc.conf.get(config.IGNORE_MISSING_FILES) - val recursive = conf.getBoolean("mapred.input.dir.recursive", false) - // The hiddenFileFilter is private but we want it, but it's final so // we can recreate it safely. val filter = new PathFilter() { val parentFilter = FileInputFormat.getInputPathFilter(job) override def accept(p: Path): Boolean = { - val name = p.getName() + val name = p.getName !name.startsWith("_") && !name.startsWith(".") && - parentFilter != null && parentFilter.accept(p) + (parentFilter == null || parentFilter.accept(p)) } } From b1bf5e98d82b76b088efce40938e5098e84c736a Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 19 Aug 2020 22:47:05 -0700 Subject: [PATCH 18/30] Don't use null as BlockLocations --- .../main/scala/org/apache/spark/util/HadoopFSUtils.scala | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index db284d441ed0b..860912eb936f0 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -142,14 +142,7 @@ object HadoopFSUtils extends Logging { def next(): LocatedFileStatus = remoteIter.next def hasNext(): Boolean = remoteIter.hasNext }.toArray - case _ => - // We are ignoring locality, so we'll use a null locality - fs.listStatus(path).map{f => - f match { - case _: LocatedFileStatus => f - case _ => new LocatedFileStatus(f, null) - } - } + case _ => fs.listStatus(path) } } catch { // If we are listing a root path for SQL (e.g. a top level directory of a table), we need to From 5529047f44b0f451f63e79651a19704721d21920 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Thu, 20 Aug 2020 11:41:45 -0700 Subject: [PATCH 19/30] Revert changes on WholeTextInputFileFormat --- .../input/WholeTextFileInputFormat.scala | 5 -- .../org/apache/spark/util/HadoopFSUtils.scala | 78 +------------------ .../datasources/InMemoryFileIndex.scala | 10 +-- 3 files changed, 6 insertions(+), 87 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index 395be076c1ffc..6a9ebbb7471ea 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -70,9 +70,4 @@ private[spark] class WholeTextFileInputFormat } super.setMaxSplitSize(maxSplitSize) } - - // Note: we override listStatus but not getSplits because our parent class overrides it. - override protected def listStatus(job: JobContext) = { - HadoopFSUtils.alternativeStatus(job, this) - } } diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 860912eb936f0..5023a5faae614 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -18,27 +18,22 @@ package org.apache.spark.util import java.io.FileNotFoundException -import java.util.{List => jList} -import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.viewfs.ViewFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem -import org.apache.hadoop.mapreduce.JobContext -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics /** * :: DeveloperApi :: * Utility functions to simplify and speed-up file listing. - * To see how to use these look at */ @DeveloperApi object HadoopFSUtils extends Logging { @@ -169,13 +164,14 @@ object HadoopFSUtils extends Logging { Array.empty[FileStatus] } - val filteredStatuses = filterFun match { + def doFilter(statuses: Array[FileStatus]) = filterFun match { case Some(shouldFilterOut) => statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) case None => statuses } + val filteredStatuses = doFilter(statuses) val allLeafStatuses = { val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) val nestedFiles: Seq[FileStatus] = contextOpt match { @@ -211,13 +207,7 @@ object HadoopFSUtils extends Logging { } val missingFiles = mutable.ArrayBuffer.empty[String] - val filteredLeafStatuses = filterFun match { - case Some(shouldFilterOut) => - allLeafStatuses.filterNot( - status => shouldFilterOut(status.getPath.getName)) - case None => - allLeafStatuses - } + val filteredLeafStatuses = doFilter(allLeafStatuses) val resolvedLeafStatuses = filteredLeafStatuses.flatMap { case f: LocatedFileStatus => Some(f) @@ -266,64 +256,4 @@ object HadoopFSUtils extends Logging { resolvedLeafStatuses.toSeq } - - /** - * Utility function to make it easier for FileInputFormats to use - * HadoopFSUtils. - */ - def alternativeStatus(job: JobContext, format: FileInputFormat[_, _]): - jList[FileStatus] = { - val conf = job.getConfiguration - val sc = SparkContext.getActive match { - case None => throw new SparkException("No active SparkContext.") - case Some(x) => x - } - - // TODO: use real configurations - val listingParallelismThreshold = sc.conf.get(config.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) - val ignoreLocality = sc.conf.get(config.IGNORE_DATA_LOCALITY) - val maxListingParallelism = sc.conf.get(config.PARALLEL_PARTITION_DISCOVERY_PARALLELISM) - val ignoreMissingFiles = sc.conf.get(config.IGNORE_MISSING_FILES) - - // The hiddenFileFilter is private but we want it, but it's final so - // we can recreate it safely. - val filter = new PathFilter() { - val parentFilter = FileInputFormat.getInputPathFilter(job) - override def accept(p: Path): Boolean = { - val name = p.getName - !name.startsWith("_") && !name.startsWith(".") && - (parentFilter == null || parentFilter.accept(p)) - } - } - - val dirs: Seq[Path] = FileInputFormat.getInputPaths(job) - - val statuses = if (dirs.size < listingParallelismThreshold) { - dirs.flatMap{dir => - listLeafFiles( - path = dir, - hadoopConf = conf, - filter = filter, - contextOpt = Some(sc), - ignoreMissingFiles = ignoreMissingFiles, - isSQLRootPath = false, - ignoreLocality = ignoreLocality, - filterFun = None, - parallelismThreshold = listingParallelismThreshold, - maxParallelism = maxListingParallelism) - }.toList - } else { - parallelListLeafFiles( - sc = sc, - paths = dirs, - hadoopConf = conf, - filter = filter, - areSQLRootPaths = false, - ignoreMissingFiles = ignoreMissingFiles, - ignoreLocality = ignoreLocality, - maxParallelism = maxListingParallelism, - filterFun = None).flatMap(_._2).toList - } - statuses.asJava - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 58c0faec6d37f..27c78dfbd5436 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -21,11 +21,8 @@ import scala.collection.mutable import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ -import org.apache.hadoop.fs.viewfs.ViewFileSystem -import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.hadoop.mapred.{FileInputFormat, JobConf} -import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession @@ -154,9 +151,6 @@ object InMemoryFileIndex extends Logging { val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles val ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality - - val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) - val parallelPartitionDiscoveryParallelism = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism val sc = sparkSession.sparkContext @@ -173,7 +167,7 @@ object InMemoryFileIndex extends Logging { ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, isSQLRootPath = areRootPaths, - filterFun = Some(shouldFilterOut _), + filterFun = Some(shouldFilterOut), parallelismThreshold = threshold, maxParallelism = parallelPartitionDiscoveryParallelism) (path, leafFiles) @@ -182,7 +176,7 @@ object InMemoryFileIndex extends Logging { HadoopFSUtils.parallelListLeafFiles(sparkSession.sparkContext, paths, hadoopConf, filter, areSQLRootPaths = areRootPaths, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, parallelPartitionDiscoveryParallelism, - Some(shouldFilterOut _)) + Some(shouldFilterOut)) } result } From 6f5c7e5dd52e484ec56e1cfd8435d4f00f682c57 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Thu, 20 Aug 2020 16:38:28 -0700 Subject: [PATCH 20/30] Remove unused import --- .../scala/org/apache/spark/input/WholeTextFileInputFormat.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index 6a9ebbb7471ea..692deb7a3282f 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -24,8 +24,6 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat -import org.apache.spark.util.HadoopFSUtils - /** * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for * reading whole text files. Each file is read as key-value pair, where the key is the file path and From 43fc5a0a418cf9d48b0adb849fcdc374b4c146d4 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 21 Aug 2020 10:26:39 -0700 Subject: [PATCH 21/30] Address comments and limit this to only refactoring --- .../spark/internal/config/package.scala | 34 ----- .../org/apache/spark/util/HadoopFSUtils.scala | 141 +++++++++++++++--- .../datasources/InMemoryFileIndex.scala | 45 ++---- .../datasources/FileIndexSuite.scala | 5 +- 4 files changed, 136 insertions(+), 89 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 6096d66318a39..b308115935d64 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -443,40 +443,6 @@ package object config { .booleanConf .createWithDefault(false) - val PARALLEL_PARTITION_DISCOVERY_THRESHOLD = - ConfigBuilder("spark.rdd.sources.parallelPartitionDiscovery.threshold") - .doc("The maximum number of paths allowed for listing files at driver side. If the number " + - "of detected paths exceeds this value during partition discovery, it tries to list the " + - "files with another Spark distributed job. This configuration is effective only when " + - "using file-based using HadoopFSUtils") - .version("1.5.0") - .intConf - .checkValue(parallel => parallel >= 0, "The maximum number of paths allowed for listing " + - "files at driver side must not be negative") - .createWithDefault(2) - - val PARALLEL_PARTITION_DISCOVERY_PARALLELISM = - ConfigBuilder("spark.rdd.sources.parallelPartitionDiscovery.parallelism") - .doc("The number of parallelism to list a collection of path recursively, Set the " + - "number to prevent file listing from generating too many tasks.") - .version("3.1.0") - .internal() - .intConf - .createWithDefault(10000) - - val IGNORE_DATA_LOCALITY = - ConfigBuilder("spark.rdd.sources.ignoreDataLocality") - .doc("If true, Spark will not fetch the block locations for each file on " + - "listing files. This speeds up file listing, but the scheduler cannot " + - "schedule tasks to take advantage of data locality. It can be particularly " + - "useful if data is read from a remote cluster so the scheduler could never " + - "take advantage of locality anyway. This configuration is effective only when " + - "using file-based using HadoopFSUtils") - .version("3.1.0") - .internal() - .booleanConf - .createWithDefault(false) - private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK = ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock") .internal() diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 5023a5faae614..684cb91e0f861 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -44,13 +44,57 @@ object HadoopFSUtils extends Logging { * * This may only be called on the driver. * + * @param sc Spark context used to run parallel listing. + * @param paths Input paths to list + * @param hadoopConf Hadoop configuration + * @param filter Path filter used to exclude leaf files from result + * @param areSQLRootPaths Whether the input paths are SQL root paths + * @param ignoreMissingFiles Ignore missing files that happen during recursive listing + * (e.g., due to race conditions) + * @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false, + * this will return [[FileStatus]]es without [[BlockLocation]] info. + * @param parallelismThreshold The threshold to enable parallelism. If the number of input paths + * is smaller than this value, this will fallback to use + * sequential listing. + * @param parallelismMax The maximum parallelism for listing. If the number of input paths is + * larger than this value, parallelism will be throttled to this value + * to avoid generating too many tasks. + * @param filterFun Optional predicate on the leaf files. Files who failed the check will be + * excluded from the results * @return for each input path, the set of discovered files for the path */ - @DeveloperApi - def parallelListLeafFiles(sc: SparkContext, paths: Seq[Path], hadoopConf: Configuration, - filter: PathFilter, areSQLRootPaths: Boolean, ignoreMissingFiles: Boolean, - ignoreLocality: Boolean, maxParallelism: Int, - filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = { + def parallelListLeafFiles( + sc: SparkContext, + paths: Seq[Path], + hadoopConf: Configuration, + filter: PathFilter, + areSQLRootPaths: Boolean, + ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, + parallelismThreshold: Int, + parallelismMax: Int, + filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = { + + // Short-circuits parallel listing when serial listing is likely to be faster. + if (paths.size <= parallelismThreshold) { + return paths.map { path => + val leafFiles = listLeafFiles( + path, + hadoopConf, + filter, + Some(sc), + ignoreMissingFiles = ignoreMissingFiles, + ignoreLocality = ignoreLocality, + isSQLRootPath = areSQLRootPaths, + parallelismThreshold = parallelismThreshold, + parallelismDefault = parallelismMax, + filterFun = filterFun) + (path, leafFiles) + } + } + + logInfo(s"Listing leaf files and directories in parallel under ${paths.length} paths." + + s" The first several paths are: ${paths.take(10).mkString(", ")}.") HiveCatalogMetrics.incrementParallelListingJobCount(1) val serializableConfiguration = new SerializableConfiguration(hadoopConf) @@ -58,7 +102,7 @@ object HadoopFSUtils extends Logging { // Set the number of parallelism to prevent following file listing from generating many tasks // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, maxParallelism) + val numParallelism = Math.min(paths.size, parallelismMax) val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) val statusMap = try { @@ -70,44 +114,83 @@ object HadoopFSUtils extends Logging { case s => s"Listing leaf files and directories for $s paths:
${paths(0)}, ..." } - sc.setJobDescription(description) // TODO(holden): should we use jobgroup? + sc.setJobDescription(description) sc .parallelize(serializedPaths, numParallelism) .mapPartitions { pathStrings => val hadoopConf = serializableConfiguration.value pathStrings.map(new Path(_)).toSeq.map { path => val leafFiles = listLeafFiles( - contextOpt = None, // Can't execute parallel scans on workers path = path, hadoopConf = hadoopConf, filter = filter, + contextOpt = None, // Can't execute parallel scans on workers ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, isSQLRootPath = areSQLRootPaths, filterFun = filterFun, parallelismThreshold = Int.MaxValue, - maxParallelism = 0) + parallelismDefault = 0) (path, leafFiles) }.iterator - }.collect() // TODO(holden): should we use local itr here? + }.map { case (path, statuses) => + val serializableStatuses = statuses.map { status => + // Turn FileStatus into SerializableFileStatus so we can send it back to the driver + val blockLocations = status match { + case f: LocatedFileStatus => + f.getBlockLocations.map { loc => + SerializableBlockLocation( + loc.getNames, + loc.getHosts, + loc.getOffset, + loc.getLength) + } + + case _ => + Array.empty[SerializableBlockLocation] + } + + SerializableFileStatus( + status.getPath.toString, + status.getLen, + status.isDirectory, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime, + blockLocations) + } + (path.toString, serializableStatuses) + }.collect() } finally { sc.setJobDescription(previousJobDescription) } - statusMap.toSeq + // turn SerializableFileStatus back to Status + statusMap.map { case (path, serializableStatuses) => + val statuses = serializableStatuses.map { f => + val blockLocations = f.blockLocations.map { loc => + new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) + } + new LocatedFileStatus( + new FileStatus( + f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, + new Path(f.path)), + blockLocations) + } + (new Path(path), statuses) + } } + /** - * :: DeveloperApi :: - * Lists a single filesystem path recursively. If a Sparkcontext object is specified, this + * Lists a single filesystem path recursively. If a `SparkContext`` object is specified, this * function may launch Spark jobs to parallelize listing based on parallelismThreshold. * * If sessionOpt is None, this may be called on executors. * * @return all children of path that match the specified filter. */ - // scalastyle:off argcount - @DeveloperApi - def listLeafFiles( + private def listLeafFiles( path: Path, hadoopConf: Configuration, filter: PathFilter, @@ -117,8 +200,7 @@ object HadoopFSUtils extends Logging { isSQLRootPath: Boolean, filterFun: Option[String => Boolean], parallelismThreshold: Int, - maxParallelism: Int): Seq[FileStatus] = { - // scalastyle:on argcount + parallelismDefault: Int): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) @@ -185,7 +267,8 @@ object HadoopFSUtils extends Logging { ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, filterFun = filterFun, - maxParallelism = maxParallelism + parallelismThreshold = parallelismThreshold, + parallelismMax = parallelismDefault, ).flatMap(_._2) case _ => dirs.flatMap { dir => @@ -199,7 +282,7 @@ object HadoopFSUtils extends Logging { isSQLRootPath = false, filterFun = filterFun, parallelismThreshold = parallelismThreshold, - maxParallelism = maxParallelism) + parallelismDefault = parallelismDefault) } } val allFiles = topLevelFiles ++ nestedFiles @@ -256,4 +339,22 @@ object HadoopFSUtils extends Logging { resolvedLeafStatuses.toSeq } + + /** A serializable variant of HDFS's BlockLocation. */ + private case class SerializableBlockLocation( + names: Array[String], + hosts: Array[String], + offset: Long, + length: Long) + + /** A serializable variant of HDFS's FileStatus. */ + private case class SerializableFileStatus( + path: String, + length: Long, + isDir: Boolean, + blockReplication: Short, + blockSize: Long, + modificationTime: Long, + accessTime: Long, + blockLocations: Array[SerializableBlockLocation]) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 27c78dfbd5436..9609a05934c2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -148,38 +148,19 @@ object InMemoryFileIndex extends Logging { filter: PathFilter, sparkSession: SparkSession, areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = { - - val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles - val ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality - val parallelPartitionDiscoveryParallelism = - sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism - val sc = sparkSession.sparkContext - - // Short-circuits parallel listing when serial listing is likely to be faster. - val threshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold - val result = if (paths.size <= threshold) { - paths.map { path => - val leafFiles = HadoopFSUtils.listLeafFiles( - path = path, - hadoopConf = hadoopConf, - filter = filter, - contextOpt = Some(sc), - ignoreMissingFiles = ignoreMissingFiles, - ignoreLocality = ignoreLocality, - isSQLRootPath = areRootPaths, - filterFun = Some(shouldFilterOut), - parallelismThreshold = threshold, - maxParallelism = parallelPartitionDiscoveryParallelism) - (path, leafFiles) - } - } else { - HadoopFSUtils.parallelListLeafFiles(sparkSession.sparkContext, paths, hadoopConf, filter, - areSQLRootPaths = areRootPaths, ignoreMissingFiles = ignoreMissingFiles, - ignoreLocality = ignoreLocality, parallelPartitionDiscoveryParallelism, - Some(shouldFilterOut)) - } - result - } + HadoopFSUtils.parallelListLeafFiles( + sc = sparkSession.sparkContext, + paths = paths, + hadoopConf = hadoopConf, + filter = filter, + areSQLRootPaths = areRootPaths, + ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles, + ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality, + parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold, + parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism, + filterFun = Some(shouldFilterOut), + ) + } /** Checks if we should filter out this path name. */ def shouldFilterOut(pathName: String): Boolean = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 3f67fede7740e..02be8c9221704 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -290,9 +290,9 @@ class FileIndexSuite extends SharedSparkSession { } } HiveCatalogMetrics.reset() - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() === 0) + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0) new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None) - assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() === expectedNumPar) + assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar) } } } @@ -516,7 +516,6 @@ class FileIndexSuite extends SharedSparkSession { SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) } } - } object DeletionRaceFileSystem { From 1f9bbbc7228a88fee10cc7381193b46f8e05d505 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 21 Aug 2020 11:38:06 -0700 Subject: [PATCH 22/30] Remove Hive metrics call and nits --- core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 684cb91e0f861..b19796a4514ec 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging -import org.apache.spark.metrics.source.HiveCatalogMetrics /** * :: DeveloperApi :: @@ -95,7 +94,6 @@ object HadoopFSUtils extends Logging { logInfo(s"Listing leaf files and directories in parallel under ${paths.length} paths." + s" The first several paths are: ${paths.take(10).mkString(", ")}.") - HiveCatalogMetrics.incrementParallelListingJobCount(1) val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializedPaths = paths.map(_.toString) @@ -108,7 +106,7 @@ object HadoopFSUtils extends Logging { val statusMap = try { val description = paths.size match { case 0 => - s"Listing leaf files and directories 0 paths" + "Listing leaf files and directories 0 paths" case 1 => s"Listing leaf files and directories for 1 path:
${paths(0)}" case s => From 0ab104d84782c9689daefbcbc0b9d0d3b1b97c16 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 21 Aug 2020 11:52:03 -0700 Subject: [PATCH 23/30] Make checkstyle happy --- core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index b19796a4514ec..2af549736a5f7 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -180,6 +180,7 @@ object HadoopFSUtils extends Logging { } } + // scalastyle:off argcount /** * Lists a single filesystem path recursively. If a `SparkContext`` object is specified, this * function may launch Spark jobs to parallelize listing based on parallelismThreshold. @@ -337,6 +338,7 @@ object HadoopFSUtils extends Logging { resolvedLeafStatuses.toSeq } + // scalastyle:on argcount /** A serializable variant of HDFS's BlockLocation. */ private case class SerializableBlockLocation( From 2186a6606b0ddc84fd27ad39010f1c187fd5cb84 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 21 Aug 2020 15:17:54 -0700 Subject: [PATCH 24/30] Fix checkstyle issue (for real) and change to Private API --- .../main/scala/org/apache/spark/util/HadoopFSUtils.scala | 8 +++----- .../sql/execution/datasources/InMemoryFileIndex.scala | 3 +-- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 2af549736a5f7..c8f017cb1dc2e 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -27,17 +27,15 @@ import org.apache.hadoop.fs.viewfs.ViewFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.spark._ -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.Private import org.apache.spark.internal.Logging /** - * :: DeveloperApi :: * Utility functions to simplify and speed-up file listing. */ -@DeveloperApi +@Private object HadoopFSUtils extends Logging { /** - * :: DeveloperApi :: * Lists a collection of paths recursively. Picks the listing strategy adaptively depending * on the number of paths to list. * @@ -267,7 +265,7 @@ object HadoopFSUtils extends Logging { ignoreLocality = ignoreLocality, filterFun = filterFun, parallelismThreshold = parallelismThreshold, - parallelismMax = parallelismDefault, + parallelismMax = parallelismDefault ).flatMap(_._2) case _ => dirs.flatMap { dir => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 9609a05934c2d..a25d353a87b1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -158,8 +158,7 @@ object InMemoryFileIndex extends Logging { ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality, parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold, parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism, - filterFun = Some(shouldFilterOut), - ) + filterFun = Some(shouldFilterOut)) } /** Checks if we should filter out this path name. */ From 549f3354aa2c25373bbc4b613c315c42416d2bf5 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Fri, 21 Aug 2020 17:06:00 -0700 Subject: [PATCH 25/30] Fix unidoc --- core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index c8f017cb1dc2e..eb20b3d7e5e55 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -49,7 +49,7 @@ object HadoopFSUtils extends Logging { * @param ignoreMissingFiles Ignore missing files that happen during recursive listing * (e.g., due to race conditions) * @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false, - * this will return [[FileStatus]]es without [[BlockLocation]] info. + * this will return `FileStatus` without `BlockLocation` info. * @param parallelismThreshold The threshold to enable parallelism. If the number of input paths * is smaller than this value, this will fallback to use * sequential listing. From 2b1aacdb69576e878f592f36a5676a174d2c2cb7 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Sat, 22 Aug 2020 00:54:05 -0700 Subject: [PATCH 26/30] Bring back parallel listing metrics --- core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index eb20b3d7e5e55..71b23cbb1b40c 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -20,15 +20,14 @@ package org.apache.spark.util import java.io.FileNotFoundException import scala.collection.mutable - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.viewfs.ViewFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem - import org.apache.spark._ import org.apache.spark.annotation.Private import org.apache.spark.internal.Logging +import org.apache.spark.metrics.source.HiveCatalogMetrics /** * Utility functions to simplify and speed-up file listing. @@ -92,6 +91,7 @@ object HadoopFSUtils extends Logging { logInfo(s"Listing leaf files and directories in parallel under ${paths.length} paths." + s" The first several paths are: ${paths.take(10).mkString(", ")}.") + HiveCatalogMetrics.incrementParallelListingJobCount(1) val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializedPaths = paths.map(_.toString) From f5e9581694d680406fe1f0f53290add076cb3f36 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Sat, 22 Aug 2020 09:03:02 -0700 Subject: [PATCH 27/30] Fix style issue again (sigh) --- core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 71b23cbb1b40c..0456294b1ecd0 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -20,10 +20,12 @@ package org.apache.spark.util import java.io.FileNotFoundException import scala.collection.mutable + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.viewfs.ViewFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem + import org.apache.spark._ import org.apache.spark.annotation.Private import org.apache.spark.internal.Logging From 86c2013abbf5b0365813b502cbfc9e2a5d5bc224 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Thu, 27 Aug 2020 17:50:04 -0700 Subject: [PATCH 28/30] Switch to private[spark] --- core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 0456294b1ecd0..bb915d7b56efc 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -34,8 +34,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics /** * Utility functions to simplify and speed-up file listing. */ -@Private -object HadoopFSUtils extends Logging { +private[spark] object HadoopFSUtils extends Logging { /** * Lists a collection of paths recursively. Picks the listing strategy adaptively depending * on the number of paths to list. From bfa37cc6e9e3b07e366cab982559d49f94a93248 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Sun, 30 Aug 2020 11:01:45 -0700 Subject: [PATCH 29/30] Address comments --- .../org/apache/spark/util/HadoopFSUtils.scala | 35 ++++++++++--------- .../sql/execution/command/CommandUtils.scala | 2 +- .../datasources/InMemoryFileIndex.scala | 6 ++-- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index bb915d7b56efc..b426401cb22b4 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -45,7 +45,8 @@ private[spark] object HadoopFSUtils extends Logging { * @param paths Input paths to list * @param hadoopConf Hadoop configuration * @param filter Path filter used to exclude leaf files from result - * @param areSQLRootPaths Whether the input paths are SQL root paths + * @param isRootLevel Whether the input paths are at the root level, i.e., they are the root + * paths as opposed to nested paths encountered during recursive calls of this. * @param ignoreMissingFiles Ignore missing files that happen during recursive listing * (e.g., due to race conditions) * @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false, @@ -54,8 +55,8 @@ private[spark] object HadoopFSUtils extends Logging { * is smaller than this value, this will fallback to use * sequential listing. * @param parallelismMax The maximum parallelism for listing. If the number of input paths is - * larger than this value, parallelism will be throttled to this value - * to avoid generating too many tasks. + * larger than this value, parallelism will be throttled to this value + * to avoid generating too many tasks. * @param filterFun Optional predicate on the leaf files. Files who failed the check will be * excluded from the results * @return for each input path, the set of discovered files for the path @@ -65,7 +66,7 @@ private[spark] object HadoopFSUtils extends Logging { paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, - areSQLRootPaths: Boolean, + isRootLevel: Boolean, ignoreMissingFiles: Boolean, ignoreLocality: Boolean, parallelismThreshold: Int, @@ -82,9 +83,9 @@ private[spark] object HadoopFSUtils extends Logging { Some(sc), ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - isSQLRootPath = areSQLRootPaths, + isRootPath = isRootLevel, parallelismThreshold = parallelismThreshold, - parallelismDefault = parallelismMax, + parallelismMax = parallelismMax, filterFun = filterFun) (path, leafFiles) } @@ -124,10 +125,10 @@ private[spark] object HadoopFSUtils extends Logging { contextOpt = None, // Can't execute parallel scans on workers ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - isSQLRootPath = areSQLRootPaths, + isRootPath = isRootLevel, filterFun = filterFun, parallelismThreshold = Int.MaxValue, - parallelismDefault = 0) + parallelismMax = 0) (path, leafFiles) }.iterator }.map { case (path, statuses) => @@ -181,8 +182,8 @@ private[spark] object HadoopFSUtils extends Logging { // scalastyle:off argcount /** - * Lists a single filesystem path recursively. If a `SparkContext`` object is specified, this - * function may launch Spark jobs to parallelize listing based on parallelismThreshold. + * Lists a single filesystem path recursively. If a `SparkContext` object is specified, this + * function may launch Spark jobs to parallelize listing based on `parallelismThreshold`. * * If sessionOpt is None, this may be called on executors. * @@ -195,10 +196,10 @@ private[spark] object HadoopFSUtils extends Logging { contextOpt: Option[SparkContext], ignoreMissingFiles: Boolean, ignoreLocality: Boolean, - isSQLRootPath: Boolean, + isRootPath: Boolean, filterFun: Option[String => Boolean], parallelismThreshold: Int, - parallelismDefault: Int): Seq[FileStatus] = { + parallelismMax: Int): Seq[FileStatus] = { logTrace(s"Listing $path") val fs = path.getFileSystem(hadoopConf) @@ -239,7 +240,7 @@ private[spark] object HadoopFSUtils extends Logging { // able to detect race conditions involving root paths being deleted during // InMemoryFileIndex construction. However, it's still a net improvement to detect and // fail-fast on the non-root cases. For more info see the SPARK-27676 review discussion. - case _: FileNotFoundException if isSQLRootPath || ignoreMissingFiles => + case _: FileNotFoundException if isRootPath || ignoreMissingFiles => logWarning(s"The directory $path was not found. Was it deleted very recently?") Array.empty[FileStatus] } @@ -261,12 +262,12 @@ private[spark] object HadoopFSUtils extends Logging { dirs.map(_.getPath), hadoopConf = hadoopConf, filter = filter, - areSQLRootPaths = false, + isRootLevel = false, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, filterFun = filterFun, parallelismThreshold = parallelismThreshold, - parallelismMax = parallelismDefault + parallelismMax = parallelismMax ).flatMap(_._2) case _ => dirs.flatMap { dir => @@ -277,10 +278,10 @@ private[spark] object HadoopFSUtils extends Logging { contextOpt = contextOpt, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - isSQLRootPath = false, + isRootPath = false, filterFun = filterFun, parallelismThreshold = parallelismThreshold, - parallelismDefault = parallelismDefault) + parallelismMax = parallelismMax) } } val allFiles = topLevelFiles ++ nestedFiles diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index c047be774d99a..8bf7504716f79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -163,7 +163,7 @@ object CommandUtils extends Logging { .getConfString("hive.exec.stagingdir", ".hive-staging") val filter = new PathFilterIgnoreNonData(stagingDir) val sizes = InMemoryFileIndex.bulkListLeafFiles(paths.flatten, - sparkSession.sessionState.newHadoopConf(), filter, sparkSession, areRootPaths = true).map { + sparkSession.sessionState.newHadoopConf(), filter, sparkSession, isRootLevel = true).map { case (_, files) => files.map(_.getLen).sum } // the size is 0 where paths(i) is not defined and sizes(i) where it is defined diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index a25d353a87b1d..130894e9bc025 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -128,7 +128,7 @@ class InMemoryFileIndex( } val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( - pathsToFetch.toSeq, hadoopConf, filter, sparkSession, areRootPaths = true) + pathsToFetch.toSeq, hadoopConf, filter, sparkSession, isRootLevel = true) discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) @@ -147,13 +147,13 @@ object InMemoryFileIndex extends Logging { hadoopConf: Configuration, filter: PathFilter, sparkSession: SparkSession, - areRootPaths: Boolean): Seq[(Path, Seq[FileStatus])] = { + isRootLevel: Boolean): Seq[(Path, Seq[FileStatus])] = { HadoopFSUtils.parallelListLeafFiles( sc = sparkSession.sparkContext, paths = paths, hadoopConf = hadoopConf, filter = filter, - areSQLRootPaths = areRootPaths, + isRootLevel = isRootLevel, ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles, ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality, parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold, From 2d8e64d72a014ed5067be5772f8b2db9129ce2f2 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Mon, 14 Sep 2020 11:54:25 -0700 Subject: [PATCH 30/30] Remove redundant .toSeq --- core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index b426401cb22b4..c0a135e04bac5 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -336,7 +336,7 @@ private[spark] object HadoopFSUtils extends Logging { s"the following files were missing during file scan:\n ${missingFiles.mkString("\n ")}") } - resolvedLeafStatuses.toSeq + resolvedLeafStatuses } // scalastyle:on argcount