Skip to content

Commit

Permalink
Changes based on PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Mar 25, 2014
1 parent f2881fd commit 620eca3
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 26 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* their cache of map output locations if this happens.
*/
protected var epoch: Long = 0
protected val epochLock = new java.lang.Object
protected val epochLock = new AnyRef

/** Remembers which map output locations are currently being fetched on a worker */
private val fetching = new HashSet[Int]
Expand Down Expand Up @@ -305,7 +305,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
cachedSerializedStatuses.clear()
}

protected def cleanup(cleanupTime: Long) {
private def cleanup(cleanupTime: Long) {
mapStatuses.clearOldValues(cleanupTime)
cachedSerializedStatuses.clearOldValues(cleanupTime)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act
private def removeShuffle(shuffleId: Int) {
// Nothing to do in the BlockManagerMasterActor data structures
val removeMsg = RemoveShuffle(shuffleId)
blockManagerInfo.values.map { bm =>
blockManagerInfo.values.foreach { bm =>
bm.slaveActor ! removeMsg
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ import scala.reflect.ClassTag
private[spark] class BoundedHashMap[A, B](bound: Int, useLRU: Boolean)
extends WrappedJavaHashMap[A, B, A, B] with SynchronizedMap[A, B] {

protected[util] val internalJavaMap = Collections.synchronizedMap(new LinkedHashMap[A, B](
private[util] val internalJavaMap = Collections.synchronizedMap(new LinkedHashMap[A, B](
bound / 8, (0.75).toFloat, useLRU) {
override protected def removeEldestEntry(eldest: JMapEntry[A, B]): Boolean = {
size() > bound
}
})

protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
new BoundedHashMap[K1, V1](bound, useLRU)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ private[util] case class TimeStampedValue[T](timestamp: Long, value: T)
private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
extends WrappedJavaHashMap[A, B, A, TimeStampedValue[B]] with Logging {

protected[util] val internalJavaMap = new ConcurrentHashMap[A, TimeStampedValue[B]]()
private[util] val internalJavaMap = new ConcurrentHashMap[A, TimeStampedValue[B]]()

protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
new TimeStampedHashMap[K1, V1]()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ private[spark] class TimeStampedWeakValueHashMap[A, B]()
/** Counter for counting the number of inserts */
private val insertCounts = new AtomicInteger(0)

protected[util] val internalJavaMap: util.Map[A, TimeStampedWeakValue[B]] = {
private[util] val internalJavaMap: util.Map[A, TimeStampedWeakValue[B]] = {
new ConcurrentHashMap[A, TimeStampedWeakValue[B]]()
}

protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
new TimeStampedWeakValueHashMap[K1, V1]()
}

Expand All @@ -68,15 +68,12 @@ private[spark] class TimeStampedWeakValueHashMap[A, B]()
}

override def get(key: A): Option[B] = {
Option(internalJavaMap.get(key)) match {
case Some(weakValue) =>
val value = weakValue.weakValue.get
if (value == null) {
internalJavaMap.remove(key)
}
Option(value)
case None =>
None
Option(internalJavaMap.get(key)).flatMap { weakValue =>
val value = weakValue.weakValue.get
if (value == null) {
internalJavaMap.remove(key)
}
Option(value)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ private[spark] abstract class WrappedJavaHashMap[K, V, IK, IV] extends Map[K, V]

/* Methods that must be defined. */

/** Internal Java HashMap that is being wrapped. */
protected[util] val internalJavaMap: JMap[IK, IV]
/**
* Internal Java HashMap that is being wrapped.
* Scoped private[util] so that rest of Spark code cannot
* directly access the internal map.
*/
private[util] val internalJavaMap: JMap[IK, IV]

/** Method to get a new instance of the internal Java HashMap. */
protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _]
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _]

/*
Methods that convert between internal and external types. These implementations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark.util

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Random

import java.util
import java.lang.ref.WeakReference

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import scala.util.Random

import org.scalatest.FunSuite

class WrappedJavaHashMapSuite extends FunSuite {
Expand Down Expand Up @@ -203,9 +203,9 @@ class WrappedJavaHashMapSuite extends FunSuite {
}

class TestMap[A, B] extends WrappedJavaHashMap[A, B, A, B] {
protected[util] val internalJavaMap: util.Map[A, B] = new util.HashMap[A, B]()
private[util] val internalJavaMap: util.Map[A, B] = new util.HashMap[A, B]()

protected[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
private[util] def newInstance[K1, V1](): WrappedJavaHashMap[K1, V1, _, _] = {
new TestMap[K1, V1]
}
}

0 comments on commit 620eca3

Please sign in to comment.