Skip to content

Commit

Permalink
Moved code out of DAGScheduler into SerializationHelper class to mini…
Browse files Browse the repository at this point in the history
…mize impact on exising classes. Also fixed a bug where unserializable RDD dependencies were not being appropriately printed
  • Loading branch information
Ilya Ganelin committed Dec 18, 2014
1 parent bb5f700 commit 07142ce
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 125 deletions.
72 changes: 14 additions & 58 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util.{CallSite, Clock, RDDWalker, SerializationHelper,
SerializationState, SystemClock, Utils}
import org.apache.spark.util.SerializationHelper.{BrokenRef, SerializedRef}
import org.apache.spark.util.{CallSite, Clock, RDDTrace, SerializationHelper, SystemClock, Utils}
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

/**
Expand Down Expand Up @@ -796,6 +794,8 @@ class DAGScheduler(
/**
* Helper function to check whether an RDD and its dependencies are serializable.
*
* This hook is exposed here primarily for testing purposes.
*
* Note: This function is defined separately from the SerializationHelper.isSerializable()
* since DAGScheduler.isSerializable() is passed as a parameter to the RDDWalker class's graph
* traversal, which would otherwise require knowledge of the closureSerializer
Expand All @@ -809,73 +809,27 @@ class DAGScheduler(
* Failure: String - The reason for the failure.
*
*/
def tryToSerializeRdd(rdd: RDD[_]): Array[SerializedRef] = {
// Walk the RDD so that we can display a trace on a per-dependency basis
val traversal : Array[(RDD[_], Int)] = RDDWalker.walk(rdd)

// Attempt to serialize each dependency of the RDD (track depth information to facilitate
// debugging)
val serialized = traversal.map {
case (curRdd, depth) =>
(curRdd, depth, SerializationHelper.tryToSerialize(closureSerializer, curRdd))
}

// If serialization was unsuccessful print failures
val anyFailed = serialized.filter(_._3.isLeft).length > 0

if (anyFailed) {
// For convenience, first output a trace by depth of whether each dependency was serializable
serialized.map {
case (curRdd, depth, result) =>
val out = "Depth " + depth + ": " + curRdd.toString + " - " + result.fold(l => l,
r => SerializationState.Success)
logDebug(out)
}

// Next, print a specific reference trace for the unserializable RDD
serialized.map {
case (curRdd, depth, result) =>
result.fold (l => logDebug(traceBrokenRef(curRdd)), r => {})
}

}

// Lastly return only the results of the attempted serialization for testing purposes
serialized.map(_._3)
def tryToSerializeRddDeps(rdd: RDD[_]): Array[RDDTrace] = {
SerializationHelper.tryToSerializeRddAndDeps(closureSerializer, rdd)
}


/**
* Returns nicely formatted text representing the trace of the failed serialization
*
* Note: This is defined here since it uses the closure serializer. Although the better place for
* the serializer would be in the SerializationHelper, the Helper is not guaranteed to run in a
* single thread unlike the DAGScheduler.
*
* @param ref - The top-level reference that we are attempting to serialize
* @param rdd - The top-level reference that we are attempting to serialize
* @return
*/
def traceBrokenRef(ref: AnyRef): String = {
SerializationHelper.getSerializationTrace(closureSerializer, ref)
def traceBrokenRdd(rdd: RDD[_]): String = {
SerializationHelper.tryToSerializeRdd(closureSerializer, rdd)
.fold(l => l, r => "Successfully serialized " + rdd.toString)
}

/**
* Use the SerializationHelper to execute a graph traversal of a broken reference to identify
* failures.
*
* Note: This is defined here since it uses the closure serializer. Although the better place for
* the serializer would be in the SerializationHelper, the Helper is not guaranteed to run in a
* single thread unlike the DAGScheduler.
*
* @param ref - The broken ref for which to generate a trace
* @return a Set of BrokenRef - a tuple of the un-serialiazble reference and the
* path to that reference
*
*/
def getBrokenRefs(ref : AnyRef): mutable.Set[BrokenRef] = {
SerializationHelper.getPathsToBrokenRefs(closureSerializer, ref)
}

/** Called when stage's parents are available and we can now do its task. */
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
Expand Down Expand Up @@ -923,7 +877,9 @@ class DAGScheduler(

// Before serialization print out the RDD and its references.
if (debugSerialization) {
tryToSerializeRdd(stage.rdd)
SerializationHelper
.tryToSerializeRdd(closureSerializer, stage.rdd)
.fold(l => logDebug(l), r => {})
}

val taskBinaryBytes: Array[Byte] =
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/util/RDDWalker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ import org.apache.spark.rdd.RDD
* internal RDD references. See SPARK-3694.
*/
object RDDWalker {

// Keep track of both the RDD and its depth in the traversal graph.
val walkQueue = Queues.newArrayDeque[(RDD[_], Int)]()
var visited = mutable.Set[RDD[_]]()

/**
* Traverse the dependencies of the RDD and store them within an Array along with their depths.
* Return this data structure and subsequently process it.
Expand All @@ -44,6 +39,11 @@ object RDDWalker {
* @return Array[(RDD[_], depth : Int] - An array of results generated by the traversal function
*/
def walk(rddToWalk : RDD[_]): Array[(RDD[_], Int)] = {

val walkQueue = Queues.newArrayDeque[(RDD[_], Int)]()
val visited = mutable.Set[RDD[_]]()

// Keep track of both the RDD and its depth in the traversal graph.
val results = new ArrayBuffer[(RDD[_], Int)]()
// Implement as a queue to perform a BFS
walkQueue.addFirst(rddToWalk,0)
Expand Down
Loading

0 comments on commit 07142ce

Please sign in to comment.