Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7689] Remove TTL-based metadata cleaning in Spark 2.0 #10534

Closed
wants to merge 13 commits into from
25 changes: 5 additions & 20 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark

import java.io._
import java.util.Arrays
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

Expand Down Expand Up @@ -267,8 +266,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
}

/**
* MapOutputTracker for the driver. This uses TimeStampedHashMap to keep track of map
* output information, which allows old output information based on a TTL.
* MapOutputTracker for the driver.
*/
private[spark] class MapOutputTrackerMaster(conf: SparkConf)
extends MapOutputTracker(conf) {
Expand All @@ -291,17 +289,10 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
// can be read locally, but may lead to more delay in scheduling if those locations are busy.
private val REDUCER_PREF_LOCS_FRACTION = 0.2

/**
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver,
* so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
* Other than these two scenarios, nothing should be dropped from this HashMap.
*/
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()

// For cleaning up TimeStampedHashMaps
private val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf)
// HashMaps for storing mapStatuses and cached serialized statuses in the driver.
// Statuses are dropped only by explicit de-registering.
protected val mapStatuses = new ConcurrentHashMap[Int, Array[MapStatus]]().asScala
private val cachedSerializedStatuses = new ConcurrentHashMap[Int, Array[Byte]]().asScala

def registerShuffle(shuffleId: Int, numMaps: Int) {
if (mapStatuses.put(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
Expand Down Expand Up @@ -462,14 +453,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
sendTracker(StopMapOutputTracker)
mapStatuses.clear()
trackerEndpoint = null
metadataCleaner.cancel()
cachedSerializedStatuses.clear()
}

private def cleanup(cleanupTime: Long) {
mapStatuses.clearOldValues(cleanupTime)
cachedSerializedStatuses.clearOldValues(cleanupTime)
}
}

/**
Expand Down
21 changes: 6 additions & 15 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import java.util.UUID.randomUUID

Expand All @@ -32,6 +33,7 @@ import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
import scala.util.control.NonFatal

import com.google.common.collect.MapMaker
import org.apache.commons.lang.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -199,7 +201,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _eventLogDir: Option[URI] = None
private var _eventLogCodec: Option[String] = None
private var _env: SparkEnv = _
private var _metadataCleaner: MetadataCleaner = _
private var _jobProgressListener: JobProgressListener = _
private var _statusTracker: SparkStatusTracker = _
private var _progressBar: Option[ConsoleProgressBar] = None
Expand Down Expand Up @@ -271,8 +272,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] val addedJars = HashMap[String, Long]()

// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
private[spark] def metadataCleaner: MetadataCleaner = _metadataCleaner
private[spark] val persistentRdds = {
val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
map.asScala
}
private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener

def statusTracker: SparkStatusTracker = _statusTracker
Expand Down Expand Up @@ -439,8 +442,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_conf.set("spark.repl.class.uri", replUri)
}

_metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)

_statusTracker = new SparkStatusTracker(this)

_progressBar =
Expand Down Expand Up @@ -1674,11 +1675,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
env.metricsSystem.report()
}
}
if (metadataCleaner != null) {
Utils.tryLogNonFatalError {
metadataCleaner.cancel()
}
}
Utils.tryLogNonFatalError {
_cleaner.foreach(_.stop())
}
Expand Down Expand Up @@ -2085,11 +2081,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

/** Called by MetadataCleaner to clean up the persistentRdds map periodically */
private[spark] def cleanup(cleanupTime: Long) {
persistentRdds.clearOldValues(cleanupTime)
}

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction.
// NOTE: this must be placed at the end of the SparkContext constructor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.shuffle

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}

import scala.collection.JavaConverters._

Expand All @@ -27,7 +27,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
import org.apache.spark.util.Utils

/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
Expand Down Expand Up @@ -63,10 +63,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
}

private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]

private val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
private val shuffleStates = new ConcurrentHashMap[ShuffleId, ShuffleState]

/**
* Get a ShuffleWriterGroup for the given map task, which will register it as complete
Expand All @@ -75,9 +72,12 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer,
writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
private val shuffleState = shuffleStates(shuffleId)

private val shuffleState: ShuffleState = {
// Note: we do _not_ want to just wrap this java ConcurrentHashMap into a Scala map and use
// .getOrElseUpdate() because that's actually NOT atomic.
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
shuffleStates.get(shuffleId)
}
val openStartTime = System.nanoTime
val serializerInstance = serializer.newInstance()
val writers: Array[DiskBlockObjectWriter] = {
Expand Down Expand Up @@ -114,7 +114,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)

/** Remove all the blocks / files related to a particular shuffle. */
private def removeShuffleBlocks(shuffleId: ShuffleId): Boolean = {
shuffleStates.get(shuffleId) match {
Option(shuffleStates.get(shuffleId)) match {
case Some(state) =>
for (mapId <- state.completedMapTasks.asScala; reduceId <- 0 until state.numReducers) {
val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId)
Expand All @@ -131,11 +131,5 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
}
}

private def cleanup(cleanupTime: Long) {
shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
}

override def stop() {
metadataCleaner.cancel()
}
override def stop(): Unit = {}
}
63 changes: 14 additions & 49 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.storage

import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
Expand Down Expand Up @@ -75,7 +77,7 @@ private[spark] class BlockManager(

val diskBlockManager = new DiskBlockManager(this, conf)

private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
private val blockInfo = new ConcurrentHashMap[BlockId, BlockInfo]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt it simpler to keep the Scala map interface? Will minimize changes in rest of code in this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried about the pitfalls of .getOrElseUpdate not being atomic on ConcurrentHashMaps that had been wrapped into Scala maps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.


private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
Expand Down Expand Up @@ -147,11 +149,6 @@ private[spark] class BlockManager(
private var asyncReregisterTask: Future[Unit] = null
private val asyncReregisterLock = new Object

private val metadataCleaner = new MetadataCleaner(
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
private val broadcastCleaner = new MetadataCleaner(
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)

// Field related to peer block managers that are necessary for block replication
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
Expand Down Expand Up @@ -232,7 +229,7 @@ private[spark] class BlockManager(
*/
private def reportAllBlocks(): Unit = {
logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
for ((blockId, info) <- blockInfo) {
for ((blockId, info) <- blockInfo.asScala) {
val status = getCurrentBlockStatus(blockId, info)
if (!tryToReportBlockStatus(blockId, info, status)) {
logError(s"Failed to report $blockId to master; giving up.")
Expand Down Expand Up @@ -313,7 +310,7 @@ private[spark] class BlockManager(
* NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfo.get(blockId).map { info =>
blockInfo.asScala.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
// Assume that block is not in external block store
Expand All @@ -327,7 +324,7 @@ private[spark] class BlockManager(
* may not know of).
*/
def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
(blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
(blockInfo.asScala.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
}

/**
Expand Down Expand Up @@ -439,15 +436,15 @@ private[spark] class BlockManager(
}

private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
val info = blockInfo.get(blockId).orNull
val info = blockInfo.get(blockId)
if (info != null) {
info.synchronized {
// Double check to make sure the block is still there. There is a small chance that the
// block has been removed by removeBlock (which also synchronizes on the blockInfo object).
// Note that this only checks metadata tracking. If user intentionally deleted the block
// on disk or from off heap storage without using removeBlock, this conditional check will
// still pass but eventually we will get an exception because we can't find the block.
if (blockInfo.get(blockId).isEmpty) {
if (blockInfo.asScala.get(blockId).isEmpty) {
logWarning(s"Block $blockId had been removed")
return None
}
Expand Down Expand Up @@ -731,7 +728,7 @@ private[spark] class BlockManager(
val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
val oldBlockOpt = Option(blockInfo.putIfAbsent(blockId, tinfo))
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
Expand Down Expand Up @@ -1032,7 +1029,7 @@ private[spark] class BlockManager(
data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {

logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
val info = blockInfo.get(blockId)

// If the block has not already been dropped
if (info != null) {
Expand All @@ -1043,7 +1040,7 @@ private[spark] class BlockManager(
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
} else if (blockInfo.get(blockId).isEmpty) {
} else if (blockInfo.asScala.get(blockId).isEmpty) {
logWarning(s"Block $blockId was already dropped.")
return None
}
Expand Down Expand Up @@ -1095,7 +1092,7 @@ private[spark] class BlockManager(
def removeRdd(rddId: Int): Int = {
// TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
logInfo(s"Removing RDD $rddId")
val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
val blocksToRemove = blockInfo.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
blocksToRemove.size
}
Expand All @@ -1105,7 +1102,7 @@ private[spark] class BlockManager(
*/
def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
logDebug(s"Removing broadcast $broadcastId")
val blocksToRemove = blockInfo.keys.collect {
val blocksToRemove = blockInfo.asScala.keys.collect {
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
}
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
Expand All @@ -1117,7 +1114,7 @@ private[spark] class BlockManager(
*/
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
val info = blockInfo.get(blockId).orNull
val info = blockInfo.get(blockId)
if (info != null) {
info.synchronized {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
Expand All @@ -1141,36 +1138,6 @@ private[spark] class BlockManager(
}
}

private def dropOldNonBroadcastBlocks(cleanupTime: Long): Unit = {
logInfo(s"Dropping non broadcast blocks older than $cleanupTime")
dropOldBlocks(cleanupTime, !_.isBroadcast)
}

private def dropOldBroadcastBlocks(cleanupTime: Long): Unit = {
logInfo(s"Dropping broadcast blocks older than $cleanupTime")
dropOldBlocks(cleanupTime, _.isBroadcast)
}

private def dropOldBlocks(cleanupTime: Long, shouldDrop: (BlockId => Boolean)): Unit = {
val iterator = blockInfo.getEntrySet.iterator
while (iterator.hasNext) {
val entry = iterator.next()
val (id, info, time) = (entry.getKey, entry.getValue.value, entry.getValue.timestamp)
if (time < cleanupTime && shouldDrop(id)) {
info.synchronized {
val level = info.level
if (level.useMemory) { memoryStore.remove(id) }
if (level.useDisk) { diskStore.remove(id) }
if (level.useOffHeap) { externalBlockStore.remove(id) }
iterator.remove()
logInfo(s"Dropped block $id")
}
val status = getCurrentBlockStatus(id, info)
reportBlockStatus(id, info, status)
}
}
}

private def shouldCompress(blockId: BlockId): Boolean = {
blockId match {
case _: ShuffleBlockId => compressShuffle
Expand Down Expand Up @@ -1248,8 +1215,6 @@ private[spark] class BlockManager(
if (externalBlockStoreInitialized) {
externalBlockStore.clear()
}
metadataCleaner.cancel()
broadcastCleaner.cancel()
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped")
}
Expand Down
Loading