Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-4924
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
  • Loading branch information
Marcelo Vanzin committed Feb 26, 2015
2 parents 00505f9 + 8942b52 commit b1d86b0
Show file tree
Hide file tree
Showing 208 changed files with 5,955 additions and 3,582 deletions.
9 changes: 9 additions & 0 deletions conf/metrics.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@

#worker.sink.csv.unit=minutes

# Enable Slf4jSink for all instances by class name
#*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink

# Polling period for Slf4JSink
#*.sink.sl4j.period=1

#*.sink.sl4j.unit=minutes


# Enable jvm source for instance master, worker, driver and executor
#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource

Expand Down
36 changes: 28 additions & 8 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.lang.ThreadLocal

import scala.collection.generic.Growable
import scala.collection.mutable.Map
import scala.ref.WeakReference
import scala.reflect.ClassTag

import org.apache.spark.serializer.JavaSerializer
Expand Down Expand Up @@ -280,10 +281,12 @@ object AccumulatorParam {
// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private[spark] object Accumulators {
// TODO: Use soft references? => need to make readObject work properly then
val originals = Map[Long, Accumulable[_, _]]()
val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
// Store a WeakReference instead of a StrongReference because this way accumulators can be
// appropriately garbage collected during long-running jobs and release memory
type WeakAcc = WeakReference[Accumulable[_, _]]
val originals = Map[Long, WeakAcc]()
val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() {
override protected def initialValue() = Map[Long, WeakAcc]()
}
var lastId: Long = 0

Expand All @@ -294,9 +297,9 @@ private[spark] object Accumulators {

def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
if (original) {
originals(a.id) = a
originals(a.id) = new WeakAcc(a)
} else {
localAccums.get()(a.id) = a
localAccums.get()(a.id) = new WeakAcc(a)
}
}

Expand All @@ -307,11 +310,22 @@ private[spark] object Accumulators {
}
}

def remove(accId: Long) {
synchronized {
originals.remove(accId)
}
}

// Get the values of the local accumulators for the current thread (by ID)
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.get) {
ret(id) = accum.localValue
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
ret(id) = accum.get match {
case Some(values) => values.localValue
case None => None
}
}
return ret
}
Expand All @@ -320,7 +334,13 @@ private[spark] object Accumulators {
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
}
}
}
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ private sealed trait CleanupTask
private case class CleanRDD(rddId: Int) extends CleanupTask
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
private case class CleanAccum(accId: Long) extends CleanupTask

/**
* A WeakReference associated with a CleanupTask.
Expand Down Expand Up @@ -114,6 +115,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
registerForCleanup(rdd, CleanRDD(rdd.id))
}

def registerAccumulatorForCleanup(a: Accumulable[_, _]): Unit = {
registerForCleanup(a, CleanAccum(a.id))
}

/** Register a ShuffleDependency for cleanup when it is garbage collected. */
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]) {
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
Expand Down Expand Up @@ -145,6 +150,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
}
}
} catch {
Expand Down Expand Up @@ -190,6 +197,18 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}

/** Perform accumulator cleanup. */
def doCleanupAccum(accId: Long, blocking: Boolean) {
try {
logDebug("Cleaning accumulator " + accId)
Accumulators.remove(accId)
listeners.foreach(_.accumCleaned(accId))
logInfo("Cleaned accumulator " + accId)
} catch {
case e: Exception => logError("Error cleaning accumulator " + accId, e)
}
}

private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
Expand All @@ -206,4 +225,5 @@ private[spark] trait CleanerListener {
def rddCleaned(rddId: Int)
def shuffleCleaned(shuffleId: Int)
def broadcastCleaned(broadcastId: Long)
def accumCleaned(accId: Long)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import scala.collection.mutable

import org.apache.spark.scheduler._
import org.apache.spark.util.{SystemClock, Clock}

/**
* An agent that dynamically allocates and removes executors based on the workload.
Expand Down Expand Up @@ -123,7 +124,7 @@ private[spark] class ExecutorAllocationManager(
private val intervalMillis: Long = 100

// Clock used to schedule when executors should be added and removed
private var clock: Clock = new RealClock
private var clock: Clock = new SystemClock()

// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener
Expand Down Expand Up @@ -588,28 +589,3 @@ private[spark] class ExecutorAllocationManager(
private object ExecutorAllocationManager {
val NOT_SET = Long.MaxValue
}

/**
* An abstract clock for measuring elapsed time.
*/
private trait Clock {
def getTimeMillis: Long
}

/**
* A clock backed by a monotonically increasing time source.
* The time returned by this clock does not correspond to any notion of wall-clock time.
*/
private class RealClock extends Clock {
override def getTimeMillis: Long = System.nanoTime / (1000 * 1000)
}

/**
* A clock that allows the caller to customize the time.
* This is used mainly for testing.
*/
private class TestClock(startTimeMillis: Long) extends Clock {
private var time: Long = startTimeMillis
override def getTimeMillis: Long = time
def tick(ms: Long): Unit = { time += ms }
}
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,13 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.files.userClassPathFirst", "spark.executor.userClassPathFirst",
"1.3"),
DeprecatedConfig("spark.yarn.user.classpath.first", null, "1.3",
"Use spark.{driver,executor}.userClassPathFirst instead."))
"Use spark.{driver,executor}.userClassPathFirst instead."),
DeprecatedConfig("spark.history.fs.updateInterval",
"spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"),
DeprecatedConfig("spark.history.updateInterval",
"spark.history.fs.update.interval.seconds",
"1.3", "Use spark.history.fs.update.interval.seconds instead"))
configs.map { x => (x.oldName, x) }.toMap
}

Expand Down
30 changes: 23 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
* modified collection. Pass a copy of the argument to avoid this.
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
assertNotStopped()
Expand Down Expand Up @@ -984,15 +986,21 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* values to using the `+=` method. Only the driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)
{
val acc = new Accumulator(initialValue, param)
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
* driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = {
new Accumulator(initialValue, param, Some(name))
val acc = new Accumulator(initialValue, param, Some(name))
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}

/**
Expand All @@ -1001,8 +1009,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) =
new Accumulable(initialValue, param)
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) = {
val acc = new Accumulable(initialValue, param)
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
Expand All @@ -1011,8 +1022,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @tparam R accumulator result type
* @tparam T type that can be added to the accumulator
*/
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) =
new Accumulable(initialValue, param, Some(name))
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) = {
val acc = new Accumulable(initialValue, param, Some(name))
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}

/**
* Create an accumulator from a "mutable collection" type.
Expand All @@ -1023,7 +1037,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R,T]
new Accumulable(initialValue, param)
val acc = new Accumulable(initialValue, param)
cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}

/**
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,13 @@ private[spark] class PythonRDD(
val oldBids = PythonRDD.getWorkerBroadcasts(worker)
val newBids = broadcastVars.map(_.id).toSet
// number of different broadcasts
val cnt = oldBids.diff(newBids).size + newBids.diff(oldBids).size
val toRemove = oldBids.diff(newBids)
val cnt = toRemove.size + newBids.diff(oldBids).size
dataOut.writeInt(cnt)
for (bid <- oldBids) {
if (!newBids.contains(bid)) {
// remove the broadcast from worker
dataOut.writeLong(- bid - 1) // bid >= 0
oldBids.remove(bid)
}
for (bid <- toRemove) {
// remove the broadcast from worker
dataOut.writeLong(- bid - 1) // bid >= 0
oldBids.remove(bid)
}
for (broadcast <- broadcastVars) {
if (!oldBids.contains(broadcast.id)) {
Expand Down
Loading

0 comments on commit b1d86b0

Please sign in to comment.