Skip to content

Commit

Permalink
Changes after building against Shark.
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Apr 6, 2014
1 parent 8452309 commit c581dce
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable

private[spark] class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
/** <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span> */
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString

Expand Down
31 changes: 22 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,28 @@ import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerTy
*
* @param config a Spark Config object describing the application configuration. Any settings in
* this config overrides the default configs as well as system properties.
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
class SparkContext(
config: SparkConf,
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
val preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map())
class SparkContext(config: SparkConf)
extends Logging {

// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
// contains a map from hostname to a list of input format splits on the host.
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
* Alternative constructor for setting preferred locations where Spark will create executors.
*
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Can
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = {
this(config)
this.preferredNodeLocationData = preferredNodeLocationData
}

/**
* Alternative constructor that allows setting common Spark properties directly
*
Expand Down Expand Up @@ -606,6 +616,9 @@ class SparkContext(
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
new UnionRDD(this, Seq(first) ++ rest)

/** Get an RDD that has no partitions or elements. */
def emptyRDD[T: ClassTag] = new EmptyRDD[T](this)

// Methods for creating shared variables

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@ import org.apache.spark.storage._
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
* including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
* Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
private[spark] class SparkEnv (
class SparkEnv (
val executorId: String,
val actorSystem: ActorSystem,
val serializer: Serializer,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import scala.reflect.ClassTag
import org.apache.spark.{Partition, SparkContext, TaskContext}

/**
* An RDD that has no partitions and no elements..
* An RDD that has no partitions and no elements.
*/
private[spark] class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {
class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {

override def getPartitions: Array[Partition] = Array.empty

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] class UnionPartition[T: ClassTag](idx: Int, rdd: RDD[T], splitInd
}
}

private[spark] class UnionRDD[T: ClassTag](
class UnionRDD[T: ClassTag](
sc: SparkContext,
@transient var rdds: Seq[RDD[T]])
extends RDD[T](sc, Nil) { // Nil since we implement getDependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil

/**
* <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span>
* Parses and holds information about inputFormat (and files) specified as a parameter.
*/
class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import collection.mutable.ArrayBuffer

// information about a specific split instance : handles both split instances.
// So that we do not need to worry about the differences.
private[spark]
/** <span class="badge" style="float: right; background-color: #44751E;">DEVELOPER API</span> */
class SplitInfo(val inputFormatClazz: Class[_], val hostLocation: String, val path: String,
val length: Long, val underlyingSplit: Any) {
override def toString(): String = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Vector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.util.Random

import org.apache.spark.util.random.XORShiftRandom

@deprecated("Use Vector from Spark's mllib.linalg package instead.", "1.0.0")
@deprecated("Use Vectors.dense from Spark's mllib.linalg package instead.", "1.0.0")
class Vector(val elements: Array[Double]) extends Serializable {
def length = elements.length

Expand Down

0 comments on commit c581dce

Please sign in to comment.