Skip to content

Commit

Permalink
Revert a few unintended style changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Apr 22, 2015
1 parent 9fac6f3 commit 494d5c2
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 101 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2004,10 +2004,10 @@ object SparkContext extends Logging {
}

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"

private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"
private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope"
private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride"

/**
* Executor id for the driver. In earlier versions of Spark, this was `<driver>`, but this was
Expand Down
145 changes: 47 additions & 98 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,17 +315,14 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
@RDDScope
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = {
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}

/**
* Return a new RDD containing the distinct elements in this RDD.
*/
@RDDScope
def distinct(): RDD[T] = {
distinct(partitions.length)
}
def distinct(): RDD[T] = distinct(partitions.length)

/**
* Return a new RDD that has exactly numPartitions partitions.
Expand Down Expand Up @@ -362,10 +359,8 @@ abstract class RDD[T: ClassTag](
* data distributed using a hash partitioner.
*/
@RDDScope
def coalesce(
numPartitions: Int,
shuffle: Boolean = false)
(implicit ord: Ordering[T] = null): RDD[T] = {
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
Expand Down Expand Up @@ -419,9 +414,7 @@ abstract class RDD[T: ClassTag](
* @return split RDDs in an array
*/
@RDDScope
def randomSplit(
weights: Array[Double],
seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
def randomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong): Array[RDD[T]] = {
val sum = weights.sum
val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
normalizedCumWeights.sliding(2).map { x =>
Expand Down Expand Up @@ -502,9 +495,7 @@ abstract class RDD[T: ClassTag](
* times (use `.distinct()` to eliminate them).
*/
@RDDScope
def ++(other: RDD[T]): RDD[T] = {
this.union(other)
}
def ++(other: RDD[T]): RDD[T] = this.union(other)

/**
* Return this RDD sorted by the given key function.
Expand All @@ -514,11 +505,10 @@ abstract class RDD[T: ClassTag](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = {
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
this.keyBy[K](f)
.sortByKey(ascending, numPartitions)
.values
}
.sortByKey(ascending, numPartitions)
.values

/**
* Return the intersection of this RDD and another one. The output will not contain any duplicate
Expand All @@ -529,8 +519,8 @@ abstract class RDD[T: ClassTag](
@RDDScope
def intersection(other: RDD[T]): RDD[T] = {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}

/**
Expand All @@ -542,13 +532,11 @@ abstract class RDD[T: ClassTag](
* @param partitioner Partitioner to use for the resulting RDD
*/
@RDDScope
def intersection(
other: RDD[T],
partitioner: Partitioner)
(implicit ord: Ordering[T] = null): RDD[T] = {
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null)
: RDD[T] = {
this.map(v => (v, null)).cogroup(other.map(v => (v, null)), partitioner)
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys
}

/**
Expand Down Expand Up @@ -577,9 +565,7 @@ abstract class RDD[T: ClassTag](
* elements (a, b) where a is in `this` and b is in `other`.
*/
@RDDScope
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = {
new CartesianRDD(sc, this, other)
}
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)

/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
Expand All @@ -591,9 +577,8 @@ abstract class RDD[T: ClassTag](
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
@RDDScope
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = {
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
groupBy[K](f, defaultPartitioner(this))
}

/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
Expand All @@ -605,11 +590,8 @@ abstract class RDD[T: ClassTag](
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
@RDDScope
def groupBy[K](
f: T => K,
numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = {
def groupBy[K](f: T => K, numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =
groupBy(f, new HashPartitioner(numPartitions))
}

/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
Expand All @@ -621,10 +603,8 @@ abstract class RDD[T: ClassTag](
* or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
*/
@RDDScope
def groupBy[K](
f: T => K,
p: Partitioner)
(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])] = {
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
Expand All @@ -633,17 +613,14 @@ abstract class RDD[T: ClassTag](
* Return an RDD created by piping elements to a forked external process.
*/
@RDDScope
def pipe(command: String): RDD[String] = {
new PipedRDD(this, command)
}
def pipe(command: String): RDD[String] = new PipedRDD(this, command)

/**
* Return an RDD created by piping elements to a forked external process.
*/
@RDDScope
def pipe(command: String, env: Map[String, String]): RDD[String] = {
def pipe(command: String, env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env)
}

/**
* Return an RDD created by piping elements to a forked external process.
Expand Down Expand Up @@ -685,8 +662,7 @@ abstract class RDD[T: ClassTag](
*/
@RDDScope
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}
Expand All @@ -700,8 +676,7 @@ abstract class RDD[T: ClassTag](
*/
@RDDScope
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}
Expand Down Expand Up @@ -731,8 +706,7 @@ abstract class RDD[T: ClassTag](
@deprecated("use mapPartitionsWithIndex", "0.7.0")
@RDDScope
def mapPartitionsWithSplit[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = {
f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = {
mapPartitionsWithIndex(f, preservesPartitioning)
}

Expand Down Expand Up @@ -826,44 +800,39 @@ abstract class RDD[T: ClassTag](
@RDDScope
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = {
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, preservesPartitioning)
}

@RDDScope
def zipPartitions[B: ClassTag, V: ClassTag]
(rdd2: RDD[B])
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = {
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
zipPartitions(rdd2, preservesPartitioning = false)(f)
}

@RDDScope
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = {
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
}

@RDDScope
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C])
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = {
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
zipPartitions(rdd2, rdd3, preservesPartitioning = false)(f)
}

@RDDScope
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = {
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
}

@RDDScope
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = {
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
zipPartitions(rdd2, rdd3, rdd4, preservesPartitioning = false)(f)
}


// Actions (launch a job to return a value to the user program)

Expand Down Expand Up @@ -929,26 +898,21 @@ abstract class RDD[T: ClassTag](
* RDD will be &lt;= us.
*/
@RDDScope
def subtract(other: RDD[T]): RDD[T] = {
def subtract(other: RDD[T]): RDD[T] =
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}

/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
@RDDScope
def subtract(other: RDD[T], numPartitions: Int): RDD[T] = {
def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
subtract(other, new HashPartitioner(numPartitions))
}

/**
* Return an RDD with the elements from `this` that are not in `other`.
*/
@RDDScope
def subtract(
other: RDD[T],
p: Partitioner)
(implicit ord: Ordering[T] = null): RDD[T] = {
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {
if (partitioner == Some(p)) {
// Our partitioner knows how to handle T (which, since we have a partitioner, is
// really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
Expand Down Expand Up @@ -1108,9 +1072,7 @@ abstract class RDD[T: ClassTag](
*/
@Experimental
@RDDScope
def countApprox(
timeout: Long,
confidence: Double = 0.95): PartialResult[BoundedDouble] = {
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
var result = 0L
while (iter.hasNext) {
Expand Down Expand Up @@ -1144,7 +1106,8 @@ abstract class RDD[T: ClassTag](
@RDDScope
def countByValueApprox(timeout: Long, confidence: Double = 0.95)
(implicit ord: Ordering[T] = null)
: PartialResult[Map[T, BoundedDouble]] = {
: PartialResult[Map[T, BoundedDouble]] =
{
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
Expand Down Expand Up @@ -1224,9 +1187,7 @@ abstract class RDD[T: ClassTag](
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
@RDDScope
def zipWithIndex(): RDD[(T, Long)] = {
new ZippedWithIndexRDD(this)
}
def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this)

/**
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
Expand Down Expand Up @@ -1297,11 +1258,9 @@ abstract class RDD[T: ClassTag](
* Return the first element in this RDD.
*/
@RDDScope
def first(): T = {
take(1) match {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}
def first(): T = take(1) match {
case Array(t) => t
case _ => throw new UnsupportedOperationException("empty collection")
}

/**
Expand All @@ -1320,9 +1279,7 @@ abstract class RDD[T: ClassTag](
* @return an array of top elements
*/
@RDDScope
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
takeOrdered(num)(ord.reverse)
}
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)

/**
* Returns the first k (smallest) elements from this RDD as defined by the specified
Expand Down Expand Up @@ -1367,18 +1324,14 @@ abstract class RDD[T: ClassTag](
* @return the maximum element of the RDD
* */
@RDDScope
def max()(implicit ord: Ordering[T]): T = {
this.reduce(ord.max)
}
def max()(implicit ord: Ordering[T]): T = this.reduce(ord.max)

/**
* Returns the min of this RDD as defined by the implicit Ordering[T].
* @return the minimum element of the RDD
* */
@RDDScope
def min()(implicit ord: Ordering[T]): T = {
this.reduce(ord.min)
}
def min()(implicit ord: Ordering[T]): T = this.reduce(ord.min)

/**
* @note due to complications in the internal implementation, this method will raise an
Expand All @@ -1389,9 +1342,7 @@ abstract class RDD[T: ClassTag](
* may be empty even when it has at least 1 partition.
*/
@RDDScope
def isEmpty(): Boolean = {
partitions.length == 0 || take(1).length == 0
}
def isEmpty(): Boolean = partitions.length == 0 || take(1).length == 0

/**
* Save this RDD as a text file, using string representations of elements.
Expand Down Expand Up @@ -1425,9 +1376,7 @@ abstract class RDD[T: ClassTag](
* Save this RDD as a compressed text file, using string representations of elements.
*/
@RDDScope
def saveAsTextFile(
path: String,
codec: Class[_ <: CompressionCodec]): Unit = {
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = {
// https://issues.apache.org/jira/browse/SPARK-2075
val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
val textClassTag = implicitly[ClassTag[Text]]
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,4 @@ private[ui] object VizGraph {
subgraph.append(indent + "}\n")
subgraph.toString()
}
}
}

0 comments on commit 494d5c2

Please sign in to comment.