diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bbdc9158d8e2b..21650671df146 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -33,6 +33,7 @@ import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} import scala.util.control.NonFatal +import com.google.common.collect.MapMaker import org.apache.commons.lang.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -221,7 +222,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private var _eventLogDir: Option[URI] = None private var _eventLogCodec: Option[String] = None private var _env: SparkEnv = _ - private var _metadataCleaner: MetadataCleaner = _ private var _jobProgressListener: JobProgressListener = _ private var _statusTracker: SparkStatusTracker = _ private var _progressBar: Option[ConsoleProgressBar] = None @@ -295,8 +295,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs - private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]] - private[spark] def metadataCleaner: MetadataCleaner = _metadataCleaner + private[spark] val persistentRdds = new MapMaker().weakValues().makeMap[Int, RDD[_]]().asScala private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener def statusTracker: SparkStatusTracker = _statusTracker @@ -463,8 +462,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _conf.set("spark.repl.class.uri", replUri) } - _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf) - _statusTracker = new SparkStatusTracker(this) _progressBar = @@ -1721,11 +1718,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli env.metricsSystem.report() } } - if (metadataCleaner != null) { - Utils.tryLogNonFatalError { - metadataCleaner.cancel() - } - } Utils.tryLogNonFatalError { _cleaner.foreach(_.stop()) } @@ -2193,11 +2185,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - /** Called by MetadataCleaner to clean up the persistentRdds map periodically */ - private[spark] def cleanup(cleanupTime: Long) { - persistentRdds.clearOldValues(cleanupTime) - } - // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having finished construction. // NOTE: this must be placed at the end of the SparkContext constructor. diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index cc5f933393adf..ed877aa0a7759 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -26,7 +26,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.serializer.Serializer import org.apache.spark.storage._ -import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} +import org.apache.spark.util.Utils import org.apache.spark.{Logging, SparkConf, SparkEnv} /** A group of writers for a ShuffleMapTask, one writer per reducer. */ @@ -63,10 +63,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) val completedMapTasks = new ConcurrentLinkedQueue[Int]() } - private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState] - - private val metadataCleaner = - new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf) + private val shuffleStates = new scala.collection.mutable.HashMap[ShuffleId, ShuffleState] /** * Get a ShuffleWriterGroup for the given map task, which will register it as complete @@ -75,9 +72,8 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer, writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = { new ShuffleWriterGroup { - shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers)) - private val shuffleState = shuffleStates(shuffleId) - + private val shuffleState = + shuffleStates.getOrElseUpdate(shuffleId, new ShuffleState(numReducers)) val openStartTime = System.nanoTime val serializerInstance = serializer.newInstance() val writers: Array[DiskBlockObjectWriter] = { @@ -131,11 +127,5 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) } } - private def cleanup(cleanupTime: Long) { - shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId)) - } - - override def stop() { - metadataCleaner.cancel() - } + override def stop(): Unit = {} } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index ff64c0bf0b818..0dcef7827410f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -19,7 +19,9 @@ package org.apache.spark.storage import java.io._ import java.nio.{ByteBuffer, MappedByteBuffer} +import java.util.concurrent.ConcurrentHashMap +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} @@ -75,7 +77,7 @@ private[spark] class BlockManager( val diskBlockManager = new DiskBlockManager(this, conf) - private val blockInfo = new HashMap[BlockId, BlockInfo] + private val blockInfo = new ConcurrentHashMap[BlockId, BlockInfo] private val futureExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128)) @@ -227,7 +229,7 @@ private[spark] class BlockManager( */ private def reportAllBlocks(): Unit = { logInfo(s"Reporting ${blockInfo.size} blocks to the master.") - for ((blockId, info) <- blockInfo) { + for ((blockId, info) <- blockInfo.asScala) { val status = getCurrentBlockStatus(blockId, info) if (!tryToReportBlockStatus(blockId, info, status)) { logError(s"Failed to report $blockId to master; giving up.") @@ -308,7 +310,7 @@ private[spark] class BlockManager( * NOTE: This is mainly for testing, and it doesn't fetch information from external block store. */ def getStatus(blockId: BlockId): Option[BlockStatus] = { - blockInfo.get(blockId).map { info => + blockInfo.asScala.get(blockId).map { info => val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L // Assume that block is not in external block store @@ -322,7 +324,7 @@ private[spark] class BlockManager( * may not know of). */ def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = { - (blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq + (blockInfo.asScala.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq } /** @@ -434,7 +436,7 @@ private[spark] class BlockManager( } private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { - val info = blockInfo.get(blockId).orNull + val info = blockInfo.get(blockId) if (info != null) { info.synchronized { // Double check to make sure the block is still there. There is a small chance that the @@ -442,7 +444,7 @@ private[spark] class BlockManager( // Note that this only checks metadata tracking. If user intentionally deleted the block // on disk or from off heap storage without using removeBlock, this conditional check will // still pass but eventually we will get an exception because we can't find the block. - if (blockInfo.get(blockId).isEmpty) { + if (blockInfo.asScala.get(blockId).isEmpty) { logWarning(s"Block $blockId had been removed") return None } @@ -726,7 +728,7 @@ private[spark] class BlockManager( val putBlockInfo = { val tinfo = new BlockInfo(level, tellMaster) // Do atomically ! - val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo) + val oldBlockOpt = Option(blockInfo.putIfAbsent(blockId, tinfo)) if (oldBlockOpt.isDefined) { if (oldBlockOpt.get.waitForReady()) { logWarning(s"Block $blockId already exists on this machine; not re-adding it") @@ -1027,7 +1029,7 @@ private[spark] class BlockManager( data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { logInfo(s"Dropping block $blockId from memory") - val info = blockInfo.get(blockId).orNull + val info = blockInfo.get(blockId) // If the block has not already been dropped if (info != null) { @@ -1038,7 +1040,7 @@ private[spark] class BlockManager( // If we get here, the block write failed. logWarning(s"Block $blockId was marked as failure. Nothing to drop") return None - } else if (blockInfo.get(blockId).isEmpty) { + } else if (blockInfo.asScala.get(blockId).isEmpty) { logWarning(s"Block $blockId was already dropped.") return None } @@ -1090,7 +1092,7 @@ private[spark] class BlockManager( def removeRdd(rddId: Int): Int = { // TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks. logInfo(s"Removing RDD $rddId") - val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId) + val blocksToRemove = blockInfo.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId) blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) } blocksToRemove.size } @@ -1100,7 +1102,7 @@ private[spark] class BlockManager( */ def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = { logDebug(s"Removing broadcast $broadcastId") - val blocksToRemove = blockInfo.keys.collect { + val blocksToRemove = blockInfo.asScala.keys.collect { case bid @ BroadcastBlockId(`broadcastId`, _) => bid } blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) } @@ -1112,7 +1114,7 @@ private[spark] class BlockManager( */ def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = { logDebug(s"Removing block $blockId") - val info = blockInfo.get(blockId).orNull + val info = blockInfo.get(blockId) if (info != null) { info.synchronized { // Removals are idempotent in disk store and memory store. At worst, we get a warning. diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala deleted file mode 100644 index 06920c4acdad5..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.util.{Timer, TimerTask} - -import org.apache.spark.{Logging, SparkConf} - -/** - * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries) - */ -private[spark] class MetadataCleaner( - cleanerType: MetadataCleanerType.MetadataCleanerType, - cleanupFunc: (Long) => Unit, - conf: SparkConf) - extends Logging -{ - val name = cleanerType.toString - - private val delaySeconds = MetadataCleaner.getDelaySeconds(conf, cleanerType) - private val periodSeconds = math.max(10, delaySeconds / 10) - private val timer = new Timer(name + " cleanup timer", true) - - - private val task = new TimerTask { - override def run() { - try { - cleanupFunc(System.currentTimeMillis() - (delaySeconds * 1000)) - logInfo("Ran metadata cleaner for " + name) - } catch { - case e: Exception => logError("Error running cleanup task for " + name, e) - } - } - } - - if (delaySeconds > 0) { - logDebug( - "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " + - "and period of " + periodSeconds + " secs") - timer.schedule(task, delaySeconds * 1000, periodSeconds * 1000) - } - - def cancel() { - timer.cancel() - } -} - -private[spark] object MetadataCleanerType extends Enumeration { - - val SPARK_CONTEXT, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value - - type MetadataCleanerType = Value - - def systemProperty(which: MetadataCleanerType.MetadataCleanerType): String = { - "spark.cleaner.ttl." + which.toString - } -} - -// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the -// initialization of StreamingContext. It's okay for users trying to configure stuff themselves. -private[spark] object MetadataCleaner { - def getDelaySeconds(conf: SparkConf): Int = { - conf.getTimeAsSeconds("spark.cleaner.ttl", "-1").toInt - } - - def getDelaySeconds( - conf: SparkConf, - cleanerType: MetadataCleanerType.MetadataCleanerType): Int = { - conf.get(MetadataCleanerType.systemProperty(cleanerType), getDelaySeconds(conf).toString).toInt - } - - def setDelaySeconds( - conf: SparkConf, - cleanerType: MetadataCleanerType.MetadataCleanerType, - delay: Int) { - conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString) - } - - /** - * Set the default delay time (in seconds). - * @param conf SparkConf instance - * @param delay default delay time to set - * @param resetAll whether to reset all to default - */ - def setDelaySeconds(conf: SparkConf, delay: Int, resetAll: Boolean = true) { - conf.set("spark.cleaner.ttl", delay.toString) - if (resetAll) { - for (cleanerType <- MetadataCleanerType.values) { - System.clearProperty(MetadataCleanerType.systemProperty(cleanerType)) - } - } - } -} - diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala deleted file mode 100644 index 310c0c109416c..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedWeakValueHashMap.scala +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import java.lang.ref.WeakReference -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable -import scala.language.implicitConversions - -import org.apache.spark.Logging - -/** - * A wrapper of TimeStampedHashMap that ensures the values are weakly referenced and timestamped. - * - * If the value is garbage collected and the weak reference is null, get() will return a - * non-existent value. These entries are removed from the map periodically (every N inserts), as - * their values are no longer strongly reachable. Further, key-value pairs whose timestamps are - * older than a particular threshold can be removed using the clearOldValues method. - * - * TimeStampedWeakValueHashMap exposes a scala.collection.mutable.Map interface, which allows it - * to be a drop-in replacement for Scala HashMaps. Internally, it uses a Java ConcurrentHashMap, - * so all operations on this HashMap are thread-safe. - * - * @param updateTimeStampOnGet Whether timestamp of a pair will be updated when it is accessed. - */ -private[spark] class TimeStampedWeakValueHashMap[A, B](updateTimeStampOnGet: Boolean = false) - extends mutable.Map[A, B]() with Logging { - - import TimeStampedWeakValueHashMap._ - - private val internalMap = new TimeStampedHashMap[A, WeakReference[B]](updateTimeStampOnGet) - private val insertCount = new AtomicInteger(0) - - /** Return a map consisting only of entries whose values are still strongly reachable. */ - private def nonNullReferenceMap = internalMap.filter { case (_, ref) => ref.get != null } - - def get(key: A): Option[B] = internalMap.get(key) - - def iterator: Iterator[(A, B)] = nonNullReferenceMap.iterator - - override def + [B1 >: B](kv: (A, B1)): mutable.Map[A, B1] = { - val newMap = new TimeStampedWeakValueHashMap[A, B1] - val oldMap = nonNullReferenceMap.asInstanceOf[mutable.Map[A, WeakReference[B1]]] - newMap.internalMap.putAll(oldMap.toMap) - newMap.internalMap += kv - newMap - } - - override def - (key: A): mutable.Map[A, B] = { - val newMap = new TimeStampedWeakValueHashMap[A, B] - newMap.internalMap.putAll(nonNullReferenceMap.toMap) - newMap.internalMap -= key - newMap - } - - override def += (kv: (A, B)): this.type = { - internalMap += kv - if (insertCount.incrementAndGet() % CLEAR_NULL_VALUES_INTERVAL == 0) { - clearNullValues() - } - this - } - - override def -= (key: A): this.type = { - internalMap -= key - this - } - - override def update(key: A, value: B): Unit = this += ((key, value)) - - override def apply(key: A): B = internalMap.apply(key) - - override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = nonNullReferenceMap.filter(p) - - override def empty: mutable.Map[A, B] = new TimeStampedWeakValueHashMap[A, B]() - - override def size: Int = internalMap.size - - override def foreach[U](f: ((A, B)) => U): Unit = nonNullReferenceMap.foreach(f) - - def putIfAbsent(key: A, value: B): Option[B] = internalMap.putIfAbsent(key, value) - - def toMap: Map[A, B] = iterator.toMap - - /** Remove old key-value pairs with timestamps earlier than `threshTime`. */ - def clearOldValues(threshTime: Long): Unit = internalMap.clearOldValues(threshTime) - - /** Remove entries with values that are no longer strongly reachable. */ - def clearNullValues() { - val it = internalMap.getEntrySet.iterator - while (it.hasNext) { - val entry = it.next() - if (entry.getValue.value.get == null) { - logDebug("Removing key " + entry.getKey + " because it is no longer strongly reachable.") - it.remove() - } - } - } - - // For testing - - def getTimestamp(key: A): Option[Long] = { - internalMap.getTimeStampedValue(key).map(_.timestamp) - } - - def getReference(key: A): Option[WeakReference[B]] = { - internalMap.getTimeStampedValue(key).map(_.value) - } -} - -/** - * Helper methods for converting to and from WeakReferences. - */ -private object TimeStampedWeakValueHashMap { - - // Number of inserts after which entries with null references are removed - val CLEAR_NULL_VALUES_INTERVAL = 100 - - /* Implicit conversion methods to WeakReferences. */ - - implicit def toWeakReference[V](v: V): WeakReference[V] = new WeakReference[V](v) - - implicit def toWeakReferenceTuple[K, V](kv: (K, V)): (K, WeakReference[V]) = { - kv match { case (k, v) => (k, toWeakReference(v)) } - } - - implicit def toWeakReferenceFunction[K, V, R](p: ((K, V)) => R): ((K, WeakReference[V])) => R = { - (kv: (K, WeakReference[V])) => p(kv) - } - - /* Implicit conversion methods from WeakReferences. */ - - implicit def fromWeakReference[V](ref: WeakReference[V]): V = ref.get - - implicit def fromWeakReferenceOption[V](v: Option[WeakReference[V]]): Option[V] = { - v match { - case Some(ref) => Option(fromWeakReference(ref)) - case None => None - } - } - - implicit def fromWeakReferenceTuple[K, V](kv: (K, WeakReference[V])): (K, V) = { - kv match { case (k, v) => (k, fromWeakReference(v)) } - } - - implicit def fromWeakReferenceIterator[K, V]( - it: Iterator[(K, WeakReference[V])]): Iterator[(K, V)] = { - it.map(fromWeakReferenceTuple) - } - - implicit def fromWeakReferenceMap[K, V]( - map: mutable.Map[K, WeakReference[V]]) : mutable.Map[K, V] = { - mutable.Map(map.mapValues(fromWeakReference).toSeq: _*) - } -} diff --git a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala index 9b3169026cda3..25fc15dd54d04 100644 --- a/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/TimeStampedHashMapSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.util -import java.lang.ref.WeakReference - import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Random @@ -34,10 +32,6 @@ class TimeStampedHashMapSuite extends SparkFunSuite { testMap(new TimeStampedHashMap[String, String]()) testMapThreadSafety(new TimeStampedHashMap[String, String]()) - // Test TimeStampedWeakValueHashMap basic functionality - testMap(new TimeStampedWeakValueHashMap[String, String]()) - testMapThreadSafety(new TimeStampedWeakValueHashMap[String, String]()) - test("TimeStampedHashMap - clearing by timestamp") { // clearing by insertion time val map = new TimeStampedHashMap[String, String](updateTimeStampOnGet = false) @@ -68,86 +62,6 @@ class TimeStampedHashMapSuite extends SparkFunSuite { assert(map1.get("k2").isDefined) } - test("TimeStampedWeakValueHashMap - clearing by timestamp") { - // clearing by insertion time - val map = new TimeStampedWeakValueHashMap[String, String](updateTimeStampOnGet = false) - map("k1") = "v1" - assert(map("k1") === "v1") - Thread.sleep(10) - val threshTime = System.currentTimeMillis - assert(map.getTimestamp("k1").isDefined) - assert(map.getTimestamp("k1").get < threshTime) - map.clearOldValues(threshTime) - assert(map.get("k1") === None) - - // clearing by modification time - val map1 = new TimeStampedWeakValueHashMap[String, String](updateTimeStampOnGet = true) - map1("k1") = "v1" - map1("k2") = "v2" - assert(map1("k1") === "v1") - Thread.sleep(10) - val threshTime1 = System.currentTimeMillis - Thread.sleep(10) - assert(map1("k2") === "v2") // access k2 to update its access time to > threshTime - assert(map1.getTimestamp("k1").isDefined) - assert(map1.getTimestamp("k1").get < threshTime1) - assert(map1.getTimestamp("k2").isDefined) - assert(map1.getTimestamp("k2").get >= threshTime1) - map1.clearOldValues(threshTime1) // should only clear k1 - assert(map1.get("k1") === None) - assert(map1.get("k2").isDefined) - } - - test("TimeStampedWeakValueHashMap - clearing weak references") { - var strongRef = new Object - val weakRef = new WeakReference(strongRef) - val map = new TimeStampedWeakValueHashMap[String, Object] - map("k1") = strongRef - map("k2") = "v2" - map("k3") = "v3" - val isEquals = map("k1") == strongRef - assert(isEquals) - - // clear strong reference to "k1" - strongRef = null - val startTime = System.currentTimeMillis - System.gc() // Make a best effort to run the garbage collection. It *usually* runs GC. - System.runFinalization() // Make a best effort to call finalizer on all cleaned objects. - while(System.currentTimeMillis - startTime < 10000 && weakRef.get != null) { - System.gc() - System.runFinalization() - Thread.sleep(100) - } - assert(map.getReference("k1").isDefined) - val ref = map.getReference("k1").get - assert(ref.get === null) - assert(map.get("k1") === None) - - // operations should only display non-null entries - assert(map.iterator.forall { case (k, v) => k != "k1" }) - assert(map.filter { case (k, v) => k != "k2" }.size === 1) - assert(map.filter { case (k, v) => k != "k2" }.head._1 === "k3") - assert(map.toMap.size === 2) - assert(map.toMap.forall { case (k, v) => k != "k1" }) - val buffer = new ArrayBuffer[String] - map.foreach { case (k, v) => buffer += v.toString } - assert(buffer.size === 2) - assert(buffer.forall(_ != "k1")) - val plusMap = map + (("k4", "v4")) - assert(plusMap.size === 3) - assert(plusMap.forall { case (k, v) => k != "k1" }) - val minusMap = map - "k2" - assert(minusMap.size === 1) - assert(minusMap.head._1 == "k3") - - // clear null values - should only clear k1 - map.clearNullValues() - assert(map.getReference("k1") === None) - assert(map.get("k1") === None) - assert(map.get("k2").isDefined) - assert(map.get("k2").get === "v2") - } - /** Test basic operations of a Scala mutable Map. */ def testMap(hashMapConstructor: => mutable.Map[String, String]) { def newMap() = hashMapConstructor diff --git a/docs/configuration.md b/docs/configuration.md index a9ef37a9b1cd9..2341852d1b18b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -830,17 +830,6 @@ Apart from these, the following properties are also available, and may be useful Which broadcast implementation to use. - - spark.cleaner.ttl - (infinite) - - Duration (seconds) of how long Spark will remember any metadata (stages generated, tasks - generated, etc.). Periodic cleanups will ensure that metadata older than this duration will be - forgotten. This is useful for running Spark for many hours / days (for example, running 24/7 in - case of Spark Streaming applications). Note that any RDD that persists in memory for more than - this duration will be cleared as well. - - spark.executor.cores 1 in YARN mode, all the available cores on the worker in standalone mode. @@ -1454,7 +1443,7 @@ Apart from these, the following properties are also available, and may be useful A comma separated list of ciphers. The specified ciphers must be supported by JVM. The reference list of protocols one can find on - this + a href="https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https">this page. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index d0046afdeb447..98e1112d2b3d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkException, SparkConf, Logging} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec -import org.apache.spark.util.{MetadataCleaner, Utils} +import org.apache.spark.util.Utils import org.apache.spark.streaming.scheduler.JobGenerator @@ -41,7 +41,6 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: Time) val checkpointDir = ssc.checkpointDir val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray - val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConfPairs = ssc.conf.getAll def createSparkConf(): SparkConf = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 91a43e14a8b1b..c59348a89d34f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -32,7 +32,7 @@ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext.rddToFileName import org.apache.spark.streaming.scheduler.Job import org.apache.spark.streaming.ui.UIUtils -import org.apache.spark.util.{CallSite, MetadataCleaner, Utils} +import org.apache.spark.util.{CallSite, Utils} /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -271,18 +271,6 @@ abstract class DStream[T: ClassTag] ( checkpointDuration + "). Please set it to higher than " + checkpointDuration + "." ) - val metadataCleanerDelay = MetadataCleaner.getDelaySeconds(ssc.conf) - logInfo("metadataCleanupDelay = " + metadataCleanerDelay) - require( - metadataCleanerDelay < 0 || rememberDuration.milliseconds < metadataCleanerDelay * 1000, - "It seems you are doing some DStream window operation or setting a checkpoint interval " + - "which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " + - "than " + rememberDuration.milliseconds / 1000 + " seconds. But Spark's metadata cleanup" + - "delay is set to " + metadataCleanerDelay + " seconds, which is not sufficient. Please " + - "set the Java cleaner delay to more than " + - math.ceil(rememberDuration.milliseconds / 1000.0).toInt + " seconds." - ) - dependencies.foreach(_.validateAtStart()) logInfo("Slide time = " + slideDuration) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 860fac29c0ee0..f756ea2f3e71a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -81,6 +81,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo test("from conf with settings") { val myConf = SparkContext.updatedConf(new SparkConf(false), master, appName) + // TODO(josh): Update these exmaples to use a different configuration. myConf.set("spark.cleaner.ttl", "10s") ssc = new StreamingContext(myConf, batchDuration) assert(ssc.conf.getTimeAsSeconds("spark.cleaner.ttl", "-1") === 10)