Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3694] RDD and Task serialization debugging output #3518

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6c99762
Created class to traverse dependency graph of RDD
Oct 30, 2014
47ccc22
Started walker code
Oct 30, 2014
a8d5332
RDD WAlker updates
Nov 6, 2014
a63652f
Added debug output to task serialization. Added debug output to RDD s…
Nov 6, 2014
05f2cc0
Rebase
Nov 6, 2014
cbb1d77
Style errors
Nov 14, 2014
1831000
Merge remote-tracking branch 'upstream/master'
Nov 29, 2014
916a31c
Manual merge of updates
Nov 29, 2014
bfb723d
Added helper files
Nov 29, 2014
e0a8153
Fixed whitespace errors
Nov 30, 2014
cb6ebb1
Updated documentation to add debug parameter for rdd serialization
Nov 30, 2014
95fa69b
Incorporated feedback from PR. Cleaned up code and refactored interfa…
Dec 1, 2014
d2abbb5
Fixed incorrect boolean in test script
Dec 1, 2014
ef3dd39
Minor comment fixes
Dec 1, 2014
aff02e9
Added ObjectWalker class to traverse the references of a generic obje…
Dec 16, 2014
47b027a
Started on updating SerializationHelper to add generic ObjectWalker
Dec 16, 2014
8ac35dc
Merge remote-tracking branch 'upstream/master' into SPARK-3694B
Dec 16, 2014
25d5780
Code needs work, need to properly store references
Dec 17, 2014
7a19547
Updated SerializationHelper to be agnostic to the data type being tra…
Dec 18, 2014
bb5f700
Updated DAGScheduler and TaskSetManager classes to add debug printout…
Dec 18, 2014
07142ce
Moved code out of DAGScheduler into SerializationHelper class to mini…
Dec 18, 2014
8e5f710
Got rid of unecessary EdgeRef class
Dec 18, 2014
a32f0ac
Fixed merge issues from SPARK-4737
Jan 12, 2015
1d2d563
Updated to use scala queues insteadof google queues
Jan 16, 2015
5b93dc1
Fixed style issue
Jan 16, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.nio.ByteBuffer
import java.util
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, Stack}
import scala.concurrent.Await
import scala.concurrent.duration._
Expand All @@ -39,7 +42,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils}
import org.apache.spark.util.{CallSite, Clock, RDDTrace, SerializationHelper, SystemClock, Utils}
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat

/**
Expand Down Expand Up @@ -789,6 +792,44 @@ class DAGScheduler(
}
}

/**
* Helper function to check whether an RDD and its dependencies are serializable.
*
* This hook is exposed here primarily for testing purposes.
*
* Note: This function is defined separately from the SerializationHelper.isSerializable()
* since DAGScheduler.isSerializable() is passed as a parameter to the RDDWalker class's graph
* traversal, which would otherwise require knowledge of the closureSerializer
* (which was undesirable).
*
* @param rdd - Rdd to attempt to serialize
* @return Array[SerializedRdd] -
* Return an array of Either objects indicating if serialization is successful.
* Each object represents the RDD or a dependency of the RDD
* Success: ByteBuffer - The serialized RDD
* Failure: String - The reason for the failure.
*
*/
private[spark] def tryToSerializeRddDeps(rdd: RDD[_]): Array[RDDTrace] = {
SerializationHelper.tryToSerializeRddAndDeps(closureSerializer, rdd)
}


/**
* Returns nicely formatted text representing the trace of the failed serialization
*
* Note: This is defined here since it uses the closure serializer. Although the better place for
* the serializer would be in the SerializationHelper, the Helper is not guaranteed to run in a
* single thread unlike the DAGScheduler.
*
* @param rdd - The top-level reference that we are attempting to serialize
* @return
*/
def traceBrokenRdd(rdd: RDD[_]): String = {
SerializationHelper.tryToSerializeRdd(closureSerializer, rdd)
.fold(l => l, r => "Successfully serialized " + rdd.toString)
}

/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
Expand Down Expand Up @@ -827,9 +868,11 @@ class DAGScheduler(
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null

try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).

val taskBinaryBytes: Array[Byte] =
if (stage.isShuffleMap) {
closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()
Expand All @@ -840,10 +883,18 @@ class DAGScheduler(
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
SerializationHelper
.tryToSerializeRdd(closureSerializer, stage.rdd)
.fold(l => logDebug(l), r => {})

abortStage(stage, "Task not serializable: " + e.toString)
runningStages -= stage
return
case NonFatal(e) =>
SerializationHelper
.tryToSerializeRdd(closureSerializer, stage.rdd)
.fold(l => logDebug(l), r => {})

abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
runningStages -= stage
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.{Clock, SerializationHelper, SystemClock, Utils}

/**
* Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
Expand Down Expand Up @@ -459,17 +459,29 @@ private[spark] class TaskSetManager(
}
// Serialize and return the task
val startTime = clock.getTime()

val serializedTask: ByteBuffer = try {
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
// we assume the task can be serialized without exceptions.

Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
// If the task cannot be serialized, then there's no point to re-attempt the task,
// as it will always fail. So just abort the whole task-set.
// as it will always fail. So just abort the whole task-set and print a serialization
// trace to help identify the failure point.
case NonFatal(e) =>
SerializationHelper.tryToSerialize(ser, task).fold (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure this catches any exceptions thrown by the serialization utility itself and in that case just say that we couldn't produce debugging output.

l => logDebug("Un-serializable reference trace for " +
task.toString + ":\n" + l),
r => {}
)

val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}

if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
Expand Down
111 changes: 111 additions & 0 deletions core/src/main/scala/org/apache/spark/util/ObjectWalker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util

import java.lang.reflect.{Modifier, Field}

import scala.collection.mutable


/**
* This class permits traversing a generic Object's reference graph. This is useful for debugging
* serialization errors. See SPARK-3694.
*
* This code is based on code written by Josh Rosen found here:
* https://gist.github.com/JoshRosen/d6a8972c99992e97d040
*/
private[spark] object ObjectWalker {
def isTransient(field: Field): Boolean = Modifier.isTransient(field.getModifiers)
def isStatic(field: Field): Boolean = Modifier.isStatic(field.getModifiers)
def isPrimitive(field: Field): Boolean = field.getType.isPrimitive

/**
* Traverse the graph representing all references between the provided root object, its
* members, and their references in turn.
*
* What we want to be able to do is readily identify un-serializable components AND the path
* to those components. To do this, store the traversal of the graph as a 2-tuple - the actual
* reference visited and its parent. Then, to get the path to the un-serializable reference
* we can simply follow the parent links.
*
* @param rootObj - The root object for which to generate the reference graph
* @return a new Set containing the 2-tuple of references from the traversal of the
* reference graph along with their parent references. (self, parent)
*/
def buildRefGraph(rootObj: AnyRef): mutable.LinkedList[AnyRef] = {
val visitedRefs = mutable.Set[AnyRef]()
val toVisit = new mutable.Queue[AnyRef]()
var results = mutable.LinkedList[AnyRef]()

toVisit += rootObj

while (toVisit.nonEmpty) {
val obj : AnyRef = toVisit.dequeue()
// Store the last parent reference to enable quick retrieval of the path to a broken node

if (!visitedRefs.contains(obj)) {
results = mutable.LinkedList(obj).append(results)
visitedRefs.add(obj)

// Extract all the fields from the object that would be serialized. Transient and
// static references are not serialized and primitive variables will always be serializable
// and will not contain further references.
for (field <- getFieldsToTest(obj)) {
// Extract the field object and pass to the visitor
val originalAccessibility = field.isAccessible
field.setAccessible(true)
val fieldObj = field.get(obj)
field.setAccessible(originalAccessibility)

if (fieldObj != null) {
toVisit += fieldObj
}
}
}
}
results
}

/**
* Get the serialiazble fields from an object reference
* @param obj - Reference to the object fo rwhich to generate a serialization trace
* @return a new Set containing the serializable fields of the object
*/
def getFieldsToTest(obj: AnyRef): mutable.Set[Field] = {
getAllFields(obj.getClass)
.filterNot(isStatic)
.filterNot(isTransient)
.filterNot(isPrimitive)
}

/**
* Get all fields (including private ones) from this class and its superclasses.
* @param cls - The class from which to retrieve fields
* @return a new mutable.Set representing the fields of the reference
*/
private def getAllFields(cls: Class[_]): mutable.Set[Field] = {
val fields = mutable.Set[Field]()
var _cls: Class[_] = cls
while (_cls != null) {
fields ++= _cls.getDeclaredFields
fields ++= _cls.getFields
_cls = _cls.getSuperclass
}

fields
}
}
61 changes: 61 additions & 0 deletions core/src/main/scala/org/apache/spark/util/RDDWalker.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.language.existentials

import org.apache.spark.rdd.RDD

/**
* This class permits traversing the RDD's dependency graph. This is
* accomplished by walking the object graph linking these RDDs. This is useful for debugging
* internal RDD references. See SPARK-3694.
*/
private[spark] object RDDWalker {
/**
* Traverse the dependencies of the RDD and store them within an Array along with their depths.
* Return this data structure and subsequently process it.
*
* @param rddToWalk - The RDD to traverse along with its dependencies
* @return Array[(RDD[_], depth : Int] - An array of results generated by the traversal function
*/
def walk(rddToWalk : RDD[_]): Array[(RDD[_], Int)] = {

val walkQueue = new mutable.Queue[(RDD[_], Int)]()
val visited = mutable.Set[RDD[_]]()

// Keep track of both the RDD and its depth in the traversal graph.
val results = new ArrayBuffer[(RDD[_], Int)]()
// Implement as a queue to perform a BFS
walkQueue += ((rddToWalk,0))

while (!walkQueue.isEmpty) {
// Pop from the queue
val (rddToProcess : RDD[_], depth:Int) = walkQueue.dequeue()
if (!visited.contains(rddToProcess)) {
visited.add(rddToProcess)
rddToProcess.dependencies.foreach(s => walkQueue += ((s.rdd, depth + 1)))
results.append((rddToProcess, depth))
}
}

results.toArray
}
}
Loading