Skip to content

Commit

Permalink
Fix checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 9, 2015
1 parent 4ae8806 commit 9217439
Showing 1 changed file with 9 additions and 17 deletions.
26 changes: 9 additions & 17 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -285,16 +285,6 @@ abstract class RDD[T: ClassTag](
*/
private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body)

/**
* Execute a block of code in a scope such that all new RDDs created in this body will
* be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.
*
* Note: Return statements are NOT allowed in the given body.
*/
private[spark] def withNamedScope[U](scopeName: String)(body: => U): U = {
RDDOperationScope.withScope[U](sc)(body)
}

// Transformations (return a new RDD)

/**
Expand Down Expand Up @@ -1532,13 +1522,15 @@ abstract class RDD[T: ClassTag](
* has completed (therefore the RDD has been materialized and potentially stored in memory).
* doCheckpoint() is called recursively on the parent RDDs.
*/
private[spark] def doCheckpoint(): Unit = withNamedScope("checkpoint") {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", false, true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
Expand Down

0 comments on commit 9217439

Please sign in to comment.