From 9217439ff2e190072b151ff8936440452655f0f1 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 8 May 2015 21:27:20 -0700 Subject: [PATCH] Fix checkpoints --- .../main/scala/org/apache/spark/rdd/RDD.scala | 26 +++++++------------ 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index bc8d9d892d9fe..c9ae15092d05e 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -285,16 +285,6 @@ abstract class RDD[T: ClassTag]( */ private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](sc)(body) - /** - * Execute a block of code in a scope such that all new RDDs created in this body will - * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}. - * - * Note: Return statements are NOT allowed in the given body. - */ - private[spark] def withNamedScope[U](scopeName: String)(body: => U): U = { - RDDOperationScope.withScope[U](sc)(body) - } - // Transformations (return a new RDD) /** @@ -1532,13 +1522,15 @@ abstract class RDD[T: ClassTag]( * has completed (therefore the RDD has been materialized and potentially stored in memory). * doCheckpoint() is called recursively on the parent RDDs. */ - private[spark] def doCheckpoint(): Unit = withNamedScope("checkpoint") { - if (!doCheckpointCalled) { - doCheckpointCalled = true - if (checkpointData.isDefined) { - checkpointData.get.doCheckpoint() - } else { - dependencies.foreach(_.rdd.doCheckpoint()) + private[spark] def doCheckpoint(): Unit = { + RDDOperationScope.withScope(sc, "checkpoint", false, true) { + if (!doCheckpointCalled) { + doCheckpointCalled = true + if (checkpointData.isDefined) { + checkpointData.get.doCheckpoint() + } else { + dependencies.foreach(_.rdd.doCheckpoint()) + } } } }