From c3bfcae2ae12e1ebc2a817df4eb9dca8fcce463f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 27 Apr 2015 16:21:04 -0700 Subject: [PATCH] Re-implement scopes using closures instead of annotations The problem with annotations is that there is no way to associate an RDD's scope with another's. This is because the stack trace simply does not expose enough information for us to associate one instance of a method invocation with another. So, we're back to closures. Note that this still suffers from the same not serializable issue previously discussed, and this is being fixed in the ClosureCleaner separately. --- .../scala/org/apache/spark/SparkContext.scala | 98 +++--- .../apache/spark/annotation/RDDScoped.java | 29 -- .../apache/spark/rdd/AsyncRDDActions.scala | 16 +- .../apache/spark/rdd/PairRDDFunctions.scala | 229 ++++++-------- .../main/scala/org/apache/spark/rdd/RDD.scala | 280 ++++++++---------- .../scala/org/apache/spark/rdd/RDDScope.scala | 104 +++---- 6 files changed, 313 insertions(+), 443 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/annotation/RDDScoped.java diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e9ef785e2cbb7..3b29c999b9e5b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor import org.apache.mesos.MesosNativeLibrary -import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScoped} +import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump} @@ -631,6 +631,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null) } + /** + * Execute a block of code in a scope. + * All new RDDs created in this body will be part of the same scope. + */ + private def withRDDScope[U](body: => U): U = RDDScope.withScope[U](this)(body) + // Methods for creating RDDs /** Distribute a local Scala collection to form an RDD. @@ -641,8 +647,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions. */ - @RDDScoped - def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { + def parallelize[T: ClassTag]( + seq: Seq[T], + numSlices: Int = defaultParallelism): RDD[T] = withRDDScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } @@ -651,16 +658,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * This method is identical to `parallelize`. */ - @RDDScoped - def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { + def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withRDDScope { parallelize(seq, numSlices) } /** Distribute a local Scala collection to form an RDD, with one or more * location preferences (hostnames of Spark nodes) for each object. * Create a new partition for each collection item. */ - @RDDScoped - def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = { + def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withRDDScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) @@ -670,8 +675,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ - @RDDScoped - def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = { + def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withRDDScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString) @@ -704,9 +708,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * * @param minPartitions A suggestion value of the minimal splitting number for input data. */ - @RDDScoped - def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): - RDD[(String, String)] = { + def wholeTextFiles( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withRDDScope { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) NewFileInputFormat.addInputPath(job, new Path(path)) @@ -751,9 +755,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @note Small files are preferred; very large files may cause bad performance. */ @Experimental - @RDDScoped - def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): - RDD[(String, PortableDataStream)] = { + def binaryFiles( + path: String, + minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withRDDScope { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) NewFileInputFormat.addInputPath(job, new Path(path)) @@ -780,9 +784,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * @return An RDD of data with values, represented as byte arrays */ @Experimental - @RDDScoped - def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration) - : RDD[Array[Byte]] = { + def binaryRecords( + path: String, + recordLength: Int, + conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withRDDScope { assertNotStopped() conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path, @@ -818,14 +823,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScoped def hadoopRDD[K, V]( conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int = defaultMinPartitions - ): RDD[(K, V)] = { + minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withRDDScope { assertNotStopped() // Add necessary security credentials to the JobConf before broadcasting it. SparkHadoopUtil.get.addCredentials(conf) @@ -840,14 +843,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScoped def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minPartitions: Int = defaultMinPartitions - ): RDD[(K, V)] = { + minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withRDDScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) @@ -876,10 +877,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScoped def hadoopFile[K, V, F <: InputFormat[K, V]] (path: String, minPartitions: Int) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope { hadoopFile(path, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], @@ -901,16 +901,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScoped def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope { hadoopFile[K, V, F](path, defaultMinPartitions) + } /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ - @RDDScoped def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] (path: String) - (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withRDDScope { newAPIHadoopFile( path, fm.runtimeClass.asInstanceOf[Class[F]], @@ -928,13 +927,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScoped def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], - conf: Configuration = hadoopConfiguration): RDD[(K, V)] = { + conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withRDDScope { assertNotStopped() // The call to new NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves @@ -962,12 +960,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScoped def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], - vClass: Class[V]): RDD[(K, V)] = { + vClass: Class[V]): RDD[(K, V)] = withRDDScope { assertNotStopped() // Add necessary security credentials to the JobConf. Required to access secure HDFS. val jconf = new JobConf(conf) @@ -983,12 +980,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScoped def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], minPartitions: Int - ): RDD[(K, V)] = { + ): RDD[(K, V)] = withRDDScope { assertNotStopped() val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions) @@ -1002,8 +998,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. * */ - @RDDScoped - def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = { + def sequenceFile[K, V]( + path: String, + keyClass: Class[K], + valueClass: Class[V]): RDD[(K, V)] = withRDDScope { assertNotStopped() sequenceFile(path, keyClass, valueClass, defaultMinPartitions) } @@ -1030,12 +1028,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ - @RDDScoped def sequenceFile[K, V] (path: String, minPartitions: Int = defaultMinPartitions) (implicit km: ClassTag[K], vm: ClassTag[V], - kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) - : RDD[(K, V)] = { + kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = withRDDScope { assertNotStopped() val kc = kcf() val vc = vcf() @@ -1054,26 +1050,20 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * though the nice thing about it is that there's very little effort required to save arbitrary * objects. */ - @RDDScoped def objectFile[T: ClassTag]( path: String, - minPartitions: Int = defaultMinPartitions - ): RDD[T] = { + minPartitions: Int = defaultMinPartitions): RDD[T] = withRDDScope { assertNotStopped() sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader)) } - @RDDScoped - protected[spark] def checkpointFile[T: ClassTag]( - path: String - ): RDD[T] = { + protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withRDDScope { new CheckpointRDD[T](this, path) } /** Build the union of a list of RDDs. */ - @RDDScoped - def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = { + def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withRDDScope { val partitioners = rdds.flatMap(_.partitioner).toSet if (partitioners.size == 1) { new PartitionerAwareUnionRDD(this, rdds) @@ -1083,9 +1073,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** Build the union of a list of RDDs passed as variable-length arguments. */ - @RDDScoped - def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = + def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withRDDScope { union(Seq(first) ++ rest) + } /** Get an RDD that has no partitions or elements. */ def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T](this) @@ -2039,10 +2029,10 @@ object SparkContext extends Logging { } private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" - private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" - private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel" + private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope" + private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride" /** * Executor id for the driver. In earlier versions of Spark, this was ``, but this was diff --git a/core/src/main/scala/org/apache/spark/annotation/RDDScoped.java b/core/src/main/scala/org/apache/spark/annotation/RDDScoped.java deleted file mode 100644 index 565e7d631e48e..0000000000000 --- a/core/src/main/scala/org/apache/spark/annotation/RDDScoped.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.annotation; - -import java.lang.annotation.*; - -/** - * An annotation to mark a method as an RDD operation that encloses its body in a scope. - * This is used to compute the scope of an RDD when it is instantiated. - */ -// TODO: This should really be private[spark] -@Retention(RetentionPolicy.RUNTIME) -@Target({ElementType.METHOD}) -public @interface RDDScoped {} diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index de6e0b99f08af..ec185340c3a2d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -24,7 +24,6 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.reflect.ClassTag import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} -import org.apache.spark.annotation.RDDScoped /** * A set of asynchronous RDD actions available through an implicit conversion. @@ -34,8 +33,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Returns a future for counting the number of elements in the RDD. */ - @RDDScoped - def countAsync(): FutureAction[Long] = { + def countAsync(): FutureAction[Long] = self.withScope { val totalCount = new AtomicLong self.context.submitJob( self, @@ -55,8 +53,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Returns a future for retrieving all elements of this RDD. */ - @RDDScoped - def collectAsync(): FutureAction[Seq[T]] = { + def collectAsync(): FutureAction[Seq[T]] = self.withScope { val results = new Array[Array[T]](self.partitions.length) self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.length), (index, data) => results(index) = data, results.flatten.toSeq) @@ -65,8 +62,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Returns a future for retrieving the first num elements of the RDD. */ - @RDDScoped - def takeAsync(num: Int): FutureAction[Seq[T]] = { + def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope { val f = new ComplexFutureAction[Seq[T]] f.run { @@ -113,8 +109,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Applies a function f to all elements of this RDD. */ - @RDDScoped - def foreachAsync(f: T => Unit): FutureAction[Unit] = { + def foreachAsync(f: T => Unit): FutureAction[Unit] = self.withScope { val cleanF = self.context.clean(f) self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.length), (index, data) => Unit, Unit) @@ -123,8 +118,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi /** * Applies a function f to each partition of this RDD. */ - @RDDScoped - def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = { + def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = self.withScope { self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.length), (index, data) => Unit, Unit) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 65f0948f65b56..14ee02d9b6314 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -29,7 +29,7 @@ import scala.util.DynamicVariable import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} @@ -38,7 +38,7 @@ import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewO import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner -import org.apache.spark.annotation.{Experimental, RDDScoped} +import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.{DataWriteMethod, OutputMetrics} import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil @@ -70,13 +70,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ - @RDDScoped def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, - serializer: Serializer = null): RDD[(K, C)] = { + serializer: Serializer = null): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { @@ -106,11 +105,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** * Simplified version of combineByKey that hash-partitions the output RDD. */ - @RDDScoped def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, - numPartitions: Int): RDD[(K, C)] = { + numPartitions: Int): RDD[(K, C)] = self.withScope { combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) } @@ -123,9 +121,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * allocation, both of these functions are allowed to modify and return their first argument * instead of creating a new U. */ - @RDDScoped def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, - combOp: (U, U) => U): RDD[(K, U)] = { + combOp: (U, U) => U): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) @@ -146,9 +143,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * allocation, both of these functions are allowed to modify and return their first argument * instead of creating a new U. */ - @RDDScoped def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, - combOp: (U, U) => U): RDD[(K, U)] = { + combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, new HashPartitioner(numPartitions))(seqOp, combOp) } @@ -161,9 +157,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * allocation, both of these functions are allowed to modify and return their first argument * instead of creating a new U. */ - @RDDScoped def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, - combOp: (U, U) => U): RDD[(K, U)] = { + combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) } @@ -172,8 +167,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ - @RDDScoped - def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = { + def foldByKey( + zeroValue: V, + partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) @@ -191,8 +187,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ - @RDDScoped - def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = { + def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = self.withScope { foldByKey(zeroValue, new HashPartitioner(numPartitions))(func) } @@ -201,8 +196,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ - @RDDScoped - def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = { + def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope { foldByKey(zeroValue, defaultPartitioner(self))(func) } @@ -219,10 +213,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @param seed seed for the random number generator * @return RDD containing the sampled subset */ - @RDDScoped def sampleByKey(withReplacement: Boolean, fractions: Map[K, Double], - seed: Long = Utils.random.nextLong): RDD[(K, V)] = { + seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope { require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.") @@ -251,11 +244,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @return RDD containing the sampled subset */ @Experimental - @RDDScoped def sampleByKeyExact( withReplacement: Boolean, fractions: Map[K, Double], - seed: Long = Utils.random.nextLong): RDD[(K, V)] = { + seed: Long = Utils.random.nextLong): RDD[(K, V)] = self.withScope { require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.") @@ -272,8 +264,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ - @RDDScoped - def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { + def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKey[V]((v: V) => v, func, func, partitioner) } @@ -282,8 +273,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */ - @RDDScoped - def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = { + def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope { reduceByKey(new HashPartitioner(numPartitions), func) } @@ -293,8 +283,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ - @RDDScoped - def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { + def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) } @@ -303,8 +292,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * immediately to the master as a Map. This will also perform the merging locally on each mapper * before sending results to a reducer, similarly to a "combiner" in MapReduce. */ - @RDDScoped - def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = { + def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope { if (keyClass.isArray) { throw new SparkException("reduceByKeyLocally() does not support array keys") @@ -332,8 +320,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) /** Alias for reduceByKeyLocally */ @deprecated("Use reduceByKeyLocally", "1.0.0") - @RDDScoped - def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func) + def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = self.withScope { + reduceByKeyLocally(func) + } /** * Count the number of elements for each key, collecting the results to a local Map. @@ -343,8 +332,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */ - @RDDScoped - def countByKey(): Map[K, Long] = self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap + def countByKey(): Map[K, Long] = self.withScope { + self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap + } /** * :: Experimental :: @@ -352,9 +342,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * not finish within a timeout. */ @Experimental - @RDDScoped def countByKeyApprox(timeout: Long, confidence: Double = 0.95) - : PartialResult[Map[K, BoundedDouble]] = { + : PartialResult[Map[K, BoundedDouble]] = self.withScope { self.map(_._1).countByValueApprox(timeout, confidence) } @@ -378,8 +367,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @param partitioner Partitioner to use for the resulting RDD. */ @Experimental - @RDDScoped - def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { + def countApproxDistinctByKey( + p: Int, + sp: Int, + partitioner: Partitioner): RDD[(K, Long)] = self.withScope { require(p >= 4, s"p ($p) must be >= 4") require(sp <= 32, s"sp ($sp) must be <= 32") require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") @@ -411,8 +402,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * It must be greater than 0.000017. * @param partitioner partitioner of the resulting RDD */ - @RDDScoped - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { + def countApproxDistinctByKey( + relativeSD: Double, + partitioner: Partitioner): RDD[(K, Long)] = self.withScope { require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt assert(p <= 32) @@ -430,8 +422,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * It must be greater than 0.000017. * @param numPartitions number of partitions of the resulting RDD */ - @RDDScoped - def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { + def countApproxDistinctByKey( + relativeSD: Double, + numPartitions: Int): RDD[(K, Long)] = self.withScope { countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } @@ -445,8 +438,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. */ - @RDDScoped - def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { + def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = self.withScope { countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) } @@ -463,8 +455,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ - @RDDScoped - def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { + def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. @@ -488,16 +479,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ - @RDDScoped - def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = { + def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope { groupByKey(new HashPartitioner(numPartitions)) } /** * Return a copy of the RDD partitioned using the specified partitioner. */ - @RDDScoped - def partitionBy(partitioner: Partitioner): RDD[(K, V)] = { + def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope { if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -513,8 +502,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ - @RDDScoped - def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { + def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues( pair => for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w) ) @@ -526,8 +514,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * partition the output RDD. */ - @RDDScoped - def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W]( + other: RDD[(K, W)], + partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._2.isEmpty) { pair._1.iterator.map(v => (v, None)) @@ -543,9 +532,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to * partition the output RDD. */ - @RDDScoped def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) - : RDD[(K, (Option[V], W))] = { + : RDD[(K, (Option[V], W))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { pair => if (pair._1.isEmpty) { pair._2.iterator.map(w => (None, w)) @@ -563,9 +551,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements * in `this` have key k. Uses the given Partitioner to partition the output RDD. */ - @RDDScoped def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) - : RDD[(K, (Option[V], Option[W]))] = { + : RDD[(K, (Option[V], Option[W]))] = self.withScope { this.cogroup(other, partitioner).flatMapValues { case (vs, Seq()) => vs.iterator.map(v => (Some(v), None)) case (Seq(), ws) => ws.iterator.map(w => (None, Some(w))) @@ -577,9 +564,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. */ - @RDDScoped def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) - : RDD[(K, C)] = { + : RDD[(K, C)] = self.withScope { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } @@ -593,8 +579,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - @RDDScoped - def groupByKey(): RDD[(K, Iterable[V])] = { + def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { groupByKey(defaultPartitioner(self)) } @@ -603,8 +588,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - @RDDScoped - def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope { join(other, defaultPartitioner(self, other)) } @@ -613,8 +597,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ - @RDDScoped - def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { + def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = self.withScope { join(other, new HashPartitioner(numPartitions)) } @@ -624,8 +607,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * using the existing partitioner/parallelism level. */ - @RDDScoped - def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = self.withScope { leftOuterJoin(other, defaultPartitioner(self, other)) } @@ -635,8 +617,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * into `numPartitions` partitions. */ - @RDDScoped - def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { + def leftOuterJoin[W]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, (V, Option[W]))] = self.withScope { leftOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -646,8 +629,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD using the existing partitioner/parallelism level. */ - @RDDScoped - def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = self.withScope { rightOuterJoin(other, defaultPartitioner(self, other)) } @@ -657,8 +639,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD into the given number of partitions. */ - @RDDScoped - def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { + def rightOuterJoin[W]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, (Option[V], W))] = self.withScope { rightOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -671,8 +654,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/ * parallelism level. */ - @RDDScoped - def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = { + def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = self.withScope { fullOuterJoin(other, defaultPartitioner(self, other)) } @@ -684,8 +666,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements * in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions. */ - @RDDScoped - def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = { + def fullOuterJoin[W]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = self.withScope { fullOuterJoin(other, new HashPartitioner(numPartitions)) } @@ -695,8 +678,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only * one value per key is preserved in the map returned) */ - @RDDScoped - def collectAsMap(): Map[K, V] = { + def collectAsMap(): Map[K, V] = self.withScope { val data = self.collect() val map = new mutable.HashMap[K, V] map.sizeHint(data.length) @@ -708,8 +690,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Pass each value in the key-value pair RDD through a map function without changing the keys; * this also retains the original RDD's partitioning. */ - @RDDScoped - def mapValues[U](f: V => U): RDD[(K, U)] = { + def mapValues[U](f: V => U): RDD[(K, U)] = self.withScope { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) => iter.map { case (k, v) => (k, cleanF(v)) }, @@ -720,8 +701,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. */ - @RDDScoped - def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { + def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = self.withScope { val cleanF = self.context.clean(f) new MapPartitionsRDD[(K, U), (K, V)](self, (context, pid, iter) => iter.flatMap { case (k, v) => @@ -735,12 +715,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * return a resulting RDD that contains a tuple with the list of values * for that key in `this`, `other1`, `other2` and `other3`. */ - @RDDScoped def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], partitioner: Partitioner) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -757,9 +736,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - @RDDScoped def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) - : RDD[(K, (Iterable[V], Iterable[W]))] = { + : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -773,9 +751,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - @RDDScoped def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -792,9 +769,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * return a resulting RDD that contains a tuple with the list of values * for that key in `this`, `other1`, `other2` and `other3`. */ - @RDDScoped def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3)) } @@ -802,8 +778,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - @RDDScoped - def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { + def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { cogroup(other, defaultPartitioner(self, other)) } @@ -811,9 +786,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - @RDDScoped def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -821,8 +795,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - @RDDScoped - def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = { + def cogroup[W]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { cogroup(other, new HashPartitioner(numPartitions)) } @@ -830,9 +805,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ - @RDDScoped def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { cogroup(other1, other2, new HashPartitioner(numPartitions)) } @@ -841,32 +815,28 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * return a resulting RDD that contains a tuple with the list of values * for that key in `this`, `other1`, `other2` and `other3`. */ - @RDDScoped def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)], numPartitions: Int) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { cogroup(other1, other2, other3, new HashPartitioner(numPartitions)) } /** Alias for cogroup. */ - @RDDScoped - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { cogroup(other, defaultPartitioner(self, other)) } /** Alias for cogroup. */ - @RDDScoped def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = self.withScope { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } /** Alias for cogroup. */ - @RDDScoped def groupWith[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]) - : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope { cogroup(other1, other2, other3, defaultPartitioner(self, other1, other2, other3)) } @@ -876,26 +846,25 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ - @RDDScoped - def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = + def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = self.withScope { subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.length))) + } /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ - @RDDScoped - def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] = + def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] = self.withScope { subtractByKey(other, new HashPartitioner(numPartitions)) + } /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ - @RDDScoped - def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = + def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = self.withScope { new SubtractedRDD[K, V, W](self, other, p) + } /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the * RDD has a known partitioner by only searching the partition that the key maps to. */ - @RDDScoped - def lookup(key: K): Seq[V] = { + def lookup(key: K): Seq[V] = self.withScope { self.partitioner match { case Some(p) => val index = p.getPartition(key) @@ -917,8 +886,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */ - @RDDScoped - def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { + def saveAsHadoopFile[F <: OutputFormat[K, V]]( + path: String)(implicit fm: ClassTag[F]): Unit = self.withScope { saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -927,9 +896,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * supporting the key and value types K and V in this RDD. Compress the result with the * supplied codec. */ - @RDDScoped def saveAsHadoopFile[F <: OutputFormat[K, V]]( - path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) { + path: String, + codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit = self.withScope { val runtimeClass = fm.runtimeClass saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec) } @@ -938,8 +907,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. */ - @RDDScoped - def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { + def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]]( + path: String)(implicit fm: ClassTag[F]): Unit = self.withScope { saveAsNewAPIHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } @@ -947,14 +916,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. */ - @RDDScoped def saveAsNewAPIHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = self.context.hadoopConfiguration) - { + conf: Configuration = self.context.hadoopConfiguration): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = new NewAPIHadoopJob(hadoopConf) @@ -969,13 +936,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. Compress with the supplied codec. */ - @RDDScoped def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], - codec: Class[_ <: CompressionCodec]) { + codec: Class[_ <: CompressionCodec]): Unit = self.withScope { saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, new JobConf(self.context.hadoopConfiguration), Some(codec)) } @@ -984,14 +950,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */ - @RDDScoped def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), - codec: Option[Class[_ <: CompressionCodec]] = None) { + codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf hadoopConf.setOutputKeyClass(keyClass) @@ -1024,8 +989,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * output paths required (e.g. a table name to write to) in the same way as it would be * configured for a Hadoop MapReduce job. */ - @RDDScoped - def saveAsNewAPIHadoopDataset(conf: Configuration) { + def saveAsNewAPIHadoopDataset(conf: Configuration): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val job = new NewAPIHadoopJob(hadoopConf) @@ -1092,8 +1056,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */ - @RDDScoped - def saveAsHadoopDataset(conf: JobConf) { + def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope { // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038). val hadoopConf = conf val wrappedConf = new SerializableWritable(hadoopConf) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index e96f425b8a4c2..863fc80f5f669 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -18,7 +18,6 @@ package org.apache.spark.rdd import java.util.Random -import java.util.concurrent.atomic.AtomicInteger import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer @@ -32,7 +31,7 @@ import org.apache.hadoop.mapred.TextOutputFormat import org.apache.spark._ import org.apache.spark.Partitioner._ -import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScoped} +import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.CountEvaluator @@ -278,13 +277,18 @@ abstract class RDD[T: ClassTag]( if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context) } + /** + * Execute a block of code in a scope. + * All new RDDs created in this body will be part of the same scope. + */ + private[spark] def withScope[U](body: => U): U = RDDScope.withScope[U](sc)(body) + // Transformations (return a new RDD) /** * Return a new RDD by applying a function to all elements of this RDD. */ - @RDDScoped - def map[U: ClassTag](f: T => U): RDD[U] = { + def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) } @@ -293,8 +297,7 @@ abstract class RDD[T: ClassTag]( * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ - @RDDScoped - def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = { + def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) } @@ -302,8 +305,7 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD containing only the elements that satisfy a predicate. */ - @RDDScoped - def filter(f: T => Boolean): RDD[T] = { + def filter(f: T => Boolean): RDD[T] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, @@ -314,15 +316,16 @@ abstract class RDD[T: ClassTag]( /** * Return a new RDD containing the distinct elements in this RDD. */ - @RDDScoped - def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = + def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) + } /** * Return a new RDD containing the distinct elements in this RDD. */ - @RDDScoped - def distinct(): RDD[T] = distinct(partitions.length) + def distinct(): RDD[T] = withScope { + distinct(partitions.length) + } /** * Return a new RDD that has exactly numPartitions partitions. @@ -333,8 +336,7 @@ abstract class RDD[T: ClassTag]( * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ - @RDDScoped - def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = { + def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } @@ -358,9 +360,8 @@ abstract class RDD[T: ClassTag]( * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. */ - @RDDScoped def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null) - : RDD[T] = { + : RDD[T] = withScope { if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { @@ -392,11 +393,10 @@ abstract class RDD[T: ClassTag]( * with replacement: expected number of times each element is chosen; fraction must be >= 0 * @param seed seed for the random number generator */ - @RDDScoped def sample( withReplacement: Boolean, fraction: Double, - seed: Long = Utils.random.nextLong): RDD[T] = { + seed: Long = Utils.random.nextLong): RDD[T] = withScope { require(fraction >= 0.0, "Negative fraction value: " + fraction) if (withReplacement) { new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed) @@ -413,8 +413,9 @@ abstract class RDD[T: ClassTag]( * * @return split RDDs in an array */ - @RDDScoped - def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = { + def randomSplit( + weights: Array[Double], + seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope { val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => @@ -431,11 +432,10 @@ abstract class RDD[T: ClassTag]( * @param seed seed for the random number generator * @return sample of specified size in an array */ - @RDDScoped def takeSample( withReplacement: Boolean, num: Int, - seed: Long = Utils.random.nextLong): Array[T] = { + seed: Long = Utils.random.nextLong): Array[T] = withScope { val numStDev = 10.0 if (num < 0) { @@ -481,8 +481,7 @@ abstract class RDD[T: ClassTag]( * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ - @RDDScoped - def union(other: RDD[T]): RDD[T] = { + def union(other: RDD[T]): RDD[T] = withScope { if (partitioner.isDefined && other.partitioner == partitioner) { new PartitionerAwareUnionRDD(sc, Array(this, other)) } else { @@ -494,21 +493,22 @@ abstract class RDD[T: ClassTag]( * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ - @RDDScoped - def ++(other: RDD[T]): RDD[T] = this.union(other) + def ++(other: RDD[T]): RDD[T] = withScope { + this.union(other) + } /** * Return this RDD sorted by the given key function. */ - @RDDScoped def sortBy[K]( f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length) - (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = + (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values + } /** * Return the intersection of this RDD and another one. The output will not contain any duplicate @@ -516,8 +516,7 @@ abstract class RDD[T: ClassTag]( * * Note that this method performs a shuffle internally. */ - @RDDScoped - def intersection(other: RDD[T]): RDD[T] = { + def intersection(other: RDD[T]): RDD[T] = withScope { this.map(v => (v, null)).cogroup(other.map(v => (v, null))) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys @@ -531,9 +530,9 @@ abstract class RDD[T: ClassTag]( * * @param partitioner Partitioner to use for the resulting RDD */ - @RDDScoped - def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null) - : RDD[T] = { + def intersection( + other: RDD[T], + partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope { this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys @@ -547,16 +546,14 @@ abstract class RDD[T: ClassTag]( * * @param numPartitions How many partitions to use in the resulting RDD */ - @RDDScoped - def intersection(other: RDD[T], numPartitions: Int): RDD[T] = { + def intersection(other: RDD[T], numPartitions: Int): RDD[T] = withScope { intersection(other, new HashPartitioner(numPartitions)) } /** * Return an RDD created by coalescing all elements within each partition into an array. */ - @RDDScoped - def glom(): RDD[Array[T]] = { + def glom(): RDD[Array[T]] = withScope { new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray)) } @@ -564,8 +561,9 @@ abstract class RDD[T: ClassTag]( * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */ - @RDDScoped - def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) + def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { + new CartesianRDD(sc, this, other) + } /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements @@ -576,9 +574,9 @@ abstract class RDD[T: ClassTag]( * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - @RDDScoped - def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = + def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope { groupBy[K](f, defaultPartitioner(this)) + } /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements @@ -589,9 +587,11 @@ abstract class RDD[T: ClassTag]( * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - @RDDScoped - def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = + def groupBy[K]( + f: T => K, + numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope { groupBy(f, new HashPartitioner(numPartitions)) + } /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements @@ -602,9 +602,8 @@ abstract class RDD[T: ClassTag]( * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ - @RDDScoped def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null) - : RDD[(K, Iterable[T])] = { + : RDD[(K, Iterable[T])] = withScope { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(p) } @@ -612,15 +611,16 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD created by piping elements to a forked external process. */ - @RDDScoped - def pipe(command: String): RDD[String] = new PipedRDD(this, command) + def pipe(command: String): RDD[String] = withScope { + new PipedRDD(this, command) + } /** * Return an RDD created by piping elements to a forked external process. */ - @RDDScoped - def pipe(command: String, env: Map[String, String]): RDD[String] = + def pipe(command: String, env: Map[String, String]): RDD[String] = withScope { new PipedRDD(this, command, env) + } /** * Return an RDD created by piping elements to a forked external process. @@ -641,13 +641,12 @@ abstract class RDD[T: ClassTag]( * @param separateWorkingDir Use separate working directories for each task. * @return the result RDD */ - @RDDScoped def pipe( command: Seq[String], env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, printRDDElement: (T, String => Unit) => Unit = null, - separateWorkingDir: Boolean = false): RDD[String] = { + separateWorkingDir: Boolean = false): RDD[String] = withScope { new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, if (printRDDElement ne null) sc.clean(printRDDElement) else null, @@ -660,9 +659,8 @@ abstract class RDD[T: ClassTag]( * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ - @RDDScoped def mapPartitions[U: ClassTag]( - f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { + f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) } @@ -674,9 +672,9 @@ abstract class RDD[T: ClassTag]( * `preservesPartitioning` indicates whether the input function preserves the partitioner, which * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ - @RDDScoped def mapPartitionsWithIndex[U: ClassTag]( - f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { + f: (Int, Iterator[T]) => Iterator[U], + preservesPartitioning: Boolean = false): RDD[U] = withScope { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) } @@ -690,11 +688,10 @@ abstract class RDD[T: ClassTag]( * should be `false` unless this is a pair RDD and the input function doesn't modify the keys. */ @DeveloperApi - @RDDScoped @deprecated("use TaskContext.get", "1.2.0") def mapPartitionsWithContext[U: ClassTag]( f: (TaskContext, Iterator[T]) => Iterator[U], - preservesPartitioning: Boolean = false): RDD[U] = { + preservesPartitioning: Boolean = false): RDD[U] = withScope { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) } @@ -704,9 +701,9 @@ abstract class RDD[T: ClassTag]( * of the original partition. */ @deprecated("use mapPartitionsWithIndex", "0.7.0") - @RDDScoped def mapPartitionsWithSplit[U: ClassTag]( - f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { + f: (Int, Iterator[T]) => Iterator[U], + preservesPartitioning: Boolean = false): RDD[U] = withScope { mapPartitionsWithIndex(f, preservesPartitioning) } @@ -716,10 +713,9 @@ abstract class RDD[T: ClassTag]( * partition with the index of that partition. */ @deprecated("use mapPartitionsWithIndex", "1.0.0") - @RDDScoped def mapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) - (f: (T, A) => U): RDD[U] = { + (f: (T, A) => U): RDD[U] = withScope { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.map(t => f(t, a)) @@ -732,10 +728,9 @@ abstract class RDD[T: ClassTag]( * partition with the index of that partition. */ @deprecated("use mapPartitionsWithIndex and flatMap", "1.0.0") - @RDDScoped def flatMapWith[A, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) - (f: (T, A) => Seq[U]): RDD[U] = { + (f: (T, A) => Seq[U]): RDD[U] = withScope { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.flatMap(t => f(t, a)) @@ -748,8 +743,7 @@ abstract class RDD[T: ClassTag]( * partition with the index of that partition. */ @deprecated("use mapPartitionsWithIndex and foreach", "1.0.0") - @RDDScoped - def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = { + def foreachWith[A](constructA: Int => A)(f: (T, A) => Unit): Unit = withScope { mapPartitionsWithIndex { (index, iter) => val a = constructA(index) iter.map(t => {f(t, a); t}) @@ -762,8 +756,7 @@ abstract class RDD[T: ClassTag]( * partition with the index of that partition. */ @deprecated("use mapPartitionsWithIndex and filter", "1.0.0") - @RDDScoped - def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = { + def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.filter(t => p(t, a)) @@ -776,8 +769,7 @@ abstract class RDD[T: ClassTag]( * partitions* and the *same number of elements in each partition* (e.g. one was made through * a map on the other). */ - @RDDScoped - def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = { + def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => new Iterator[(T, U)] { def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { @@ -797,41 +789,41 @@ abstract class RDD[T: ClassTag]( * *same number of partitions*, but does *not* require them to have the same number * of elements in each partition. */ - @RDDScoped def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B], preservesPartitioning: Boolean) - (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = + (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope { new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning) + } - @RDDScoped def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) - (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = + (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = withScope { zipPartitions(rdd2, preservesPartitioning = false)(f) + } - @RDDScoped def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) - (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = + (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope { new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning) + } - @RDDScoped def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C]) - (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = + (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = withScope { zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f) + } - @RDDScoped def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean) - (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = + (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope { new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning) + } - @RDDScoped def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) - (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = + (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = withScope { zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f) + } // Actions (launch a job to return a value to the user program) @@ -839,8 +831,7 @@ abstract class RDD[T: ClassTag]( /** * Applies a function f to all elements of this RDD. */ - @RDDScoped - def foreach(f: T => Unit): Unit = { + def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) } @@ -848,8 +839,7 @@ abstract class RDD[T: ClassTag]( /** * Applies a function f to each partition of this RDD. */ - @RDDScoped - def foreachPartition(f: Iterator[T] => Unit): Unit = { + def foreachPartition(f: Iterator[T] => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => cleanF(iter)) } @@ -857,8 +847,7 @@ abstract class RDD[T: ClassTag]( /** * Return an array that contains all of the elements in this RDD. */ - @RDDScoped - def collect(): Array[T] = { + def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } @@ -868,8 +857,7 @@ abstract class RDD[T: ClassTag]( * * The iterator will consume as much memory as the largest partition in this RDD. */ - @RDDScoped - def toLocalIterator: Iterator[T] = { + def toLocalIterator: Iterator[T] = withScope { def collectPartition(p: Int): Array[T] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head } @@ -880,14 +868,14 @@ abstract class RDD[T: ClassTag]( * Return an array that contains all of the elements in this RDD. */ @deprecated("use collect", "1.0.0") - @RDDScoped - def toArray(): Array[T] = collect() + def toArray(): Array[T] = withScope { + collect() + } /** * Return an RDD that contains all matching values by applying `f`. */ - @RDDScoped - def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = { + def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope { filter(f.isDefinedAt).map(f) } @@ -897,22 +885,23 @@ abstract class RDD[T: ClassTag]( * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ - @RDDScoped - def subtract(other: RDD[T]): RDD[T] = + def subtract(other: RDD[T]): RDD[T] = withScope { subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length))) + } /** * Return an RDD with the elements from `this` that are not in `other`. */ - @RDDScoped - def subtract(other: RDD[T], numPartitions: Int): RDD[T] = + def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope { subtract(other, new HashPartitioner(numPartitions)) + } /** * Return an RDD with the elements from `this` that are not in `other`. */ - @RDDScoped - def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = { + def subtract( + other: RDD[T], + p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope { if (partitioner == Some(p)) { // Our partitioner knows how to handle T (which, since we have a partitioner, is // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples @@ -934,8 +923,7 @@ abstract class RDD[T: ClassTag]( * Reduces the elements of this RDD using the specified commutative and * associative binary operator. */ - @RDDScoped - def reduce(f: (T, T) => T): T = { + def reduce(f: (T, T) => T): T = withScope { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { @@ -964,8 +952,7 @@ abstract class RDD[T: ClassTag]( * @param depth suggested depth of the tree (default: 2) * @see [[org.apache.spark.rdd.RDD#reduce]] */ - @RDDScoped - def treeReduce(f: (T, T) => T, depth: Int = 2): T = { + def treeReduce(f: (T, T) => T, depth: Int = 2): T = withScope { require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") val cleanF = context.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { @@ -997,8 +984,7 @@ abstract class RDD[T: ClassTag]( * modify t1 and return it as its result value to avoid object allocation; however, it should not * modify t2. */ - @RDDScoped - def fold(zeroValue: T)(op: (T, T) => T): T = { + def fold(zeroValue: T)(op: (T, T) => T): T = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanOp = sc.clean(op) @@ -1016,8 +1002,7 @@ abstract class RDD[T: ClassTag]( * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. */ - @RDDScoped - def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { + def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) @@ -1034,11 +1019,10 @@ abstract class RDD[T: ClassTag]( * @param depth suggested depth of the tree (default: 2) * @see [[org.apache.spark.rdd.RDD#aggregate]] */ - @RDDScoped def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) => U, - depth: Int = 2): U = { + depth: Int = 2): U = withScope { require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.") if (partitions.length == 0) { return Utils.clone(zeroValue, context.env.closureSerializer.newInstance()) @@ -1071,8 +1055,9 @@ abstract class RDD[T: ClassTag]( * within a timeout, even if not all tasks have finished. */ @Experimental - @RDDScoped - def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { + def countApprox( + timeout: Long, + confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope { val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) => var result = 0L while (iter.hasNext) { @@ -1093,8 +1078,7 @@ abstract class RDD[T: ClassTag]( * To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which * returns an RDD[T, Long] instead of a map. */ - @RDDScoped - def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = { + def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long] = withScope { map(value => (value, null)).countByKey() } @@ -1103,11 +1087,9 @@ abstract class RDD[T: ClassTag]( * Approximate version of countByValue(). */ @Experimental - @RDDScoped def countByValueApprox(timeout: Long, confidence: Double = 0.95) (implicit ord: Ordering[T] = null) - : PartialResult[Map[T, BoundedDouble]] = - { + : PartialResult[Map[T, BoundedDouble]] = withScope { if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValueApprox() does not support arrays") } @@ -1140,8 +1122,7 @@ abstract class RDD[T: ClassTag]( * If `sp` equals 0, the sparse representation is skipped. */ @Experimental - @RDDScoped - def countApproxDistinct(p: Int, sp: Int): Long = { + def countApproxDistinct(p: Int, sp: Int): Long = withScope { require(p >= 4, s"p ($p) must be at least 4") require(sp <= 32, s"sp ($sp) cannot be greater than 32") require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") @@ -1167,8 +1148,7 @@ abstract class RDD[T: ClassTag]( * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. */ - @RDDScoped - def countApproxDistinct(relativeSD: Double = 0.05): Long = { + def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope { val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt countApproxDistinct(p, 0) } @@ -1186,8 +1166,9 @@ abstract class RDD[T: ClassTag]( * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. */ - @RDDScoped - def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this) + def zipWithIndex(): RDD[(T, Long)] = withScope { + new ZippedWithIndexRDD(this) + } /** * Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k, @@ -1199,8 +1180,7 @@ abstract class RDD[T: ClassTag]( * and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee * the same index assignments, you should sort the RDD with sortByKey() or save it to a file. */ - @RDDScoped - def zipWithUniqueId(): RDD[(T, Long)] = { + def zipWithUniqueId(): RDD[(T, Long)] = withScope { val n = this.partitions.length.toLong this.mapPartitionsWithIndex { case (k, iter) => iter.zipWithIndex.map { case (item, i) => @@ -1217,8 +1197,7 @@ abstract class RDD[T: ClassTag]( * @note due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */ - @RDDScoped - def take(num: Int): Array[T] = { + def take(num: Int): Array[T] = withScope { if (num == 0) { return new Array[T](0) } @@ -1257,10 +1236,11 @@ abstract class RDD[T: ClassTag]( /** * Return the first element in this RDD. */ - @RDDScoped - def first(): T = take(1) match { - case Array(t) => t - case _ => throw new UnsupportedOperationException("empty collection") + def first(): T = withScope { + take(1) match { + case Array(t) => t + case _ => throw new UnsupportedOperationException("empty collection") + } } /** @@ -1278,8 +1258,9 @@ abstract class RDD[T: ClassTag]( * @param ord the implicit ordering for T * @return an array of top elements */ - @RDDScoped - def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse) + def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { + takeOrdered(num)(ord.reverse) + } /** * Returns the first k (smallest) elements from this RDD as defined by the specified @@ -1297,8 +1278,7 @@ abstract class RDD[T: ClassTag]( * @param ord the implicit ordering for T * @return an array of top elements */ - @RDDScoped - def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = { + def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { @@ -1323,15 +1303,17 @@ abstract class RDD[T: ClassTag]( * Returns the max of this RDD as defined by the implicit Ordering[T]. * @return the maximum element of the RDD * */ - @RDDScoped - def max()(implicit ord: Ordering[T]): T = this.reduce(ord.max) + def max()(implicit ord: Ordering[T]): T = withScope { + this.reduce(ord.max) + } /** * Returns the min of this RDD as defined by the implicit Ordering[T]. * @return the minimum element of the RDD * */ - @RDDScoped - def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min) + def min()(implicit ord: Ordering[T]): T = withScope { + this.reduce(ord.min) + } /** * @note due to complications in the internal implementation, this method will raise an @@ -1341,14 +1323,14 @@ abstract class RDD[T: ClassTag]( * @return true if and only if the RDD contains no elements at all. Note that an RDD * may be empty even when it has at least 1 partition. */ - @RDDScoped - def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0 + def isEmpty(): Boolean = withScope { + partitions.length == 0 || take(1).length == 0 + } /** * Save this RDD as a text file, using string representations of elements. */ - @RDDScoped - def saveAsTextFile(path: String): Unit = { + def saveAsTextFile(path: String): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 // // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit @@ -1375,8 +1357,7 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a compressed text file, using string representations of elements. */ - @RDDScoped - def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = { + def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope { // https://issues.apache.org/jira/browse/SPARK-2075 val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] @@ -1394,8 +1375,7 @@ abstract class RDD[T: ClassTag]( /** * Save this RDD as a SequenceFile of serialized objects. */ - @RDDScoped - def saveAsObjectFile(path: String): Unit = { + def saveAsObjectFile(path: String): Unit = withScope { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) .saveAsSequenceFile(path) @@ -1404,14 +1384,12 @@ abstract class RDD[T: ClassTag]( /** * Creates tuples of the elements in this RDD by applying `f`. */ - @RDDScoped - def keyBy[K](f: T => K): RDD[(K, T)] = { + def keyBy[K](f: T => K): RDD[(K, T)] = withScope { map(x => (f(x), x)) } /** A private method for tests, to look at the contents of each partition */ - @RDDScoped - private[spark] def collectPartitions(): Array[Array[T]] = { + private[spark] def collectPartitions(): Array[Array[T]] = withScope { sc.runJob(this, (iter: Iterator[T]) => iter.toArray) } @@ -1457,7 +1435,7 @@ abstract class RDD[T: ClassTag]( * For more detail, see the documentation of {{RDDScope}}. This scope is null if * the user instantiates this RDD himself without using any Spark operations. */ - @transient private[spark] val scope = RDDScope.getScope.orNull + @transient private[spark] val scope = sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY) private[spark] def getCreationSite: String = Option(creationSite).map(_.shortForm).getOrElse("") diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala index 2557a10c62b4d..b8986acff4263 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDScope.scala @@ -18,8 +18,7 @@ package org.apache.spark.rdd import java.util.concurrent.atomic.AtomicInteger - -import org.apache.spark.annotation.RDDScoped +import org.apache.spark.SparkContext /** * A collection of utility methods to construct a hierarchical representation of RDD scopes. @@ -38,15 +37,6 @@ private[spark] object RDDScope { // between different scopes of the same name private val scopeCounter = new AtomicInteger(0) - // Consider only methods that belong to these classes as potential RDD operations - // This is to limit the amount of reflection we do when we traverse the stack trace - private val classesWithScopeMethods = Set( - "org.apache.spark.SparkContext", - "org.apache.spark.rdd.RDD", - "org.apache.spark.rdd.PairRDDFunctions", - "org.apache.spark.rdd.AsyncRDDActions" - ) - /** * Make a globally unique scope ID from the scope name. * @@ -62,64 +52,48 @@ private[spark] object RDDScope { } /** - * Retrieve the hierarchical scope from the stack trace when an RDD is first created. - * - * This considers all methods marked with the @RDDScoped annotation and chains them together - * in the order they are invoked. Each level in the scope hierarchy represents a unique - * invocation of a particular RDD operation. - * - * For example: treeAggregate_0;reduceByKey_1;combineByKey_2;mapPartitions_3 - * This means this RDD is created by the user calling treeAggregate, which calls - * `reduceByKey`, and then `combineByKey`, and then `mapPartitions` to create this RDD. + * Execute the given body such that all RDDs created in this body will have the same scope. + * The name of the scope will be the name of the method that immediately encloses this one. */ - private[spark] def getScope: Option[String] = { - - // TODO: Note that this approach does not correctly associate the same invocation across RDDs - // For instance, a call to `textFile` creates both a HadoopRDD and a MapPartitionsRDD, but - // there is no way to associate the invocation across these two RDDs to draw the same scope - // around them. This is because the stack trace simply does not provide information for us - // to make any reasonable association across RDDs. We may need a higher level approach that - // involves setting common variables before and after the RDD operation itself. + private[spark] def withScope[T]( + sc: SparkContext, + allowNesting: Boolean = false)(body: => T): T = { + val callerMethodName = Thread.currentThread.getStackTrace()(3).getMethodName + withScope[T](sc, callerMethodName, allowNesting)(body) + } - val rddScopeNames = Thread.currentThread.getStackTrace - // Avoid reflecting on all classes in the stack trace - .filter { ste => classesWithScopeMethods.contains(ste.getClassName) } - // Return the corresponding method if it has the @RDDScoped annotation - .flatMap { ste => - // Note that this is an approximation since we match the method only by name - // Unfortunate we cannot be more precise because the stack trace does not include - // parameter information - Class.forName(ste.getClassName).getDeclaredMethods.find { m => - m.getName == ste.getMethodName && - m.getDeclaredAnnotations.exists { a => - a.annotationType() == classOf[RDDScoped] - } - } + /** + * Execute the given body such that all RDDs created in this body will have the same scope. + * + * If nesting is allowed, this concatenates the previous scope with the new one in a way that + * signifies the hierarchy. Otherwise, if nesting is not allowed, then any children calls to + * this method executed in the body will have no effect. + */ + private[spark] def withScope[T]( + sc: SparkContext, + name: String, + allowNesting: Boolean = false)(body: => T): T = { + // Save the old scope to restore it later + val scopeKey = SparkContext.RDD_SCOPE_KEY + val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY + val oldScope = sc.getLocalProperty(scopeKey) + val oldNoOverride = sc.getLocalProperty(noOverrideKey) + try { + // Set the scope only if the higher level caller allows us to do so + if (sc.getLocalProperty(noOverrideKey) == null) { + val oldScopeId = Option(oldScope).map { _ + SCOPE_NESTING_DELIMITER }.getOrElse("") + val newScopeId = oldScopeId + makeScopeId(name) + sc.setLocalProperty(scopeKey, newScopeId) } - // Use the method name as the scope name for now - .map { m => m.getName } - - // It is common for such methods to internally invoke other methods with the same name - // as aliases (e.g. union, reduceByKey). Here we remove adjacent duplicates such that - // the scope chain does not capture this (e.g. a, a, b, c, b, c, c => a, b, c, b, c). - // This is surprisingly difficult to express even in Scala. - var prev: String = null - val dedupedRddScopeNames = rddScopeNames.flatMap { n => - if (n != prev) { - prev = n - Some(n) - } else { - None + // Optionally disallow the child body to override our scope + if (!allowNesting) { + sc.setLocalProperty(noOverrideKey, "true") } - } - - // Chain scope IDs to denote hierarchy, with outermost scope first - val rddScopeIds = dedupedRddScopeNames.map(makeScopeId) - if (rddScopeIds.nonEmpty) { - Some(rddScopeIds.reverse.mkString(SCOPE_NESTING_DELIMITER)) - } else { - None + body + } finally { + // Remember to restore any state that was modified before exiting + sc.setLocalProperty(scopeKey, oldScope) + sc.setLocalProperty(noOverrideKey, oldNoOverride) } } - }