diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 88a918aebf763..a1f2827248891 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1269,11 +1269,55 @@ abstract class RDD[T: ClassTag]( /** A description of this RDD and its recursive dependencies for debugging. */ def toDebugString: String = { - def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = { - Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++ - rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " ")) + // Apply a different rule to the last child + def debugChildren(rdd: RDD[_], prefix: String): Seq[String] = { + val len = rdd.dependencies.length + len match { + case 0 => Seq.empty + case 1 => + val d = rdd.dependencies.head + debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]], true) + case _ => + val frontDeps = rdd.dependencies.take(len - 1) + val frontDepStrings = frontDeps.flatMap( + d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]])) + + val lastDep = rdd.dependencies.last + val lastDepStrings = + debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_,_,_]], true) + + (frontDepStrings ++ lastDepStrings) + } + } + // The first RDD in the dependency stack has no parents, so no need for a +- + def firstDebugString(rdd: RDD[_]): Seq[String] = { + val partitionStr = "(" + rdd.partitions.size + ")" + val leftOffset = (partitionStr.length - 1) / 2 + val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)) + Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix) + } + def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = { + val partitionStr = "(" + rdd.partitions.size + ")" + val leftOffset = (partitionStr.length - 1) / 2 + val thisPrefix = prefix.replaceAll("\\|\\s+$", "") + val nextPrefix = ( + thisPrefix + + (if (isLastChild) " " else "| ") + + (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))) + Seq(thisPrefix + "+-" + partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix) + } + def debugString(rdd: RDD[_], + prefix: String = "", + isShuffle: Boolean = true, + isLastChild: Boolean = false): Seq[String] = { + if (isShuffle) { + shuffleDebugString(rdd, prefix, isLastChild) + } + else { + Seq(prefix + rdd) ++ debugChildren(rdd, prefix) + } } - debugString(this).mkString("\n") + firstDebugString(this).mkString("\n") } override def toString: String = "%s%s[%d] at %s".format( diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index e0f433b26f7ff..4d86e1a0d8bbf 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -61,6 +61,14 @@ object MimaExcludes { "org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.storage.MemoryStore.Entry"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$debugChildren$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$firstDebugString$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$shuffleDebugString$1"), + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$debugString$1"), ProblemFilters.exclude[MissingMethodProblem]( "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$" + "createZero$1")