Skip to content

Commit

Permalink
All of the rest of the changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Dec 31, 2015
1 parent 3940e97 commit 98b732a
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 435 deletions.
17 changes: 2 additions & 15 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
import scala.util.control.NonFatal

import com.google.common.collect.MapMaker
import org.apache.commons.lang.SerializationUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -221,7 +222,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _eventLogDir: Option[URI] = None
private var _eventLogCodec: Option[String] = None
private var _env: SparkEnv = _
private var _metadataCleaner: MetadataCleaner = _
private var _jobProgressListener: JobProgressListener = _
private var _statusTracker: SparkStatusTracker = _
private var _progressBar: Option[ConsoleProgressBar] = None
Expand Down Expand Up @@ -295,8 +295,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] val addedJars = HashMap[String, Long]()

// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
private[spark] def metadataCleaner: MetadataCleaner = _metadataCleaner
private[spark] val persistentRdds = new MapMaker().weakValues().makeMap[Int, RDD[_]]().asScala
private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener

def statusTracker: SparkStatusTracker = _statusTracker
Expand Down Expand Up @@ -463,8 +462,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_conf.set("spark.repl.class.uri", replUri)
}

_metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)

_statusTracker = new SparkStatusTracker(this)

_progressBar =
Expand Down Expand Up @@ -1721,11 +1718,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
env.metricsSystem.report()
}
}
if (metadataCleaner != null) {
Utils.tryLogNonFatalError {
metadataCleaner.cancel()
}
}
Utils.tryLogNonFatalError {
_cleaner.foreach(_.stop())
}
Expand Down Expand Up @@ -2193,11 +2185,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
}

/** Called by MetadataCleaner to clean up the persistentRdds map periodically */
private[spark] def cleanup(cleanupTime: Long) {
persistentRdds.clearOldValues(cleanupTime)
}

// In order to prevent multiple SparkContexts from being active at the same time, mark this
// context as having finished construction.
// NOTE: this must be placed at the end of the SparkContext constructor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkConf, SparkEnv}

/** A group of writers for a ShuffleMapTask, one writer per reducer. */
Expand Down Expand Up @@ -63,10 +63,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
val completedMapTasks = new ConcurrentLinkedQueue[Int]()
}

private val shuffleStates = new TimeStampedHashMap[ShuffleId, ShuffleState]

private val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SHUFFLE_BLOCK_MANAGER, this.cleanup, conf)
private val shuffleStates = new scala.collection.mutable.HashMap[ShuffleId, ShuffleState]

/**
* Get a ShuffleWriterGroup for the given map task, which will register it as complete
Expand All @@ -75,9 +72,8 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
def forMapTask(shuffleId: Int, mapId: Int, numReducers: Int, serializer: Serializer,
writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numReducers))
private val shuffleState = shuffleStates(shuffleId)

private val shuffleState =
shuffleStates.getOrElseUpdate(shuffleId, new ShuffleState(numReducers))
val openStartTime = System.nanoTime
val serializerInstance = serializer.newInstance()
val writers: Array[DiskBlockObjectWriter] = {
Expand Down Expand Up @@ -131,11 +127,5 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
}
}

private def cleanup(cleanupTime: Long) {
shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
}

override def stop() {
metadataCleaner.cancel()
}
override def stop(): Unit = {}
}
26 changes: 14 additions & 12 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.storage

import java.io._
import java.nio.{ByteBuffer, MappedByteBuffer}
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
Expand Down Expand Up @@ -75,7 +77,7 @@ private[spark] class BlockManager(

val diskBlockManager = new DiskBlockManager(this, conf)

private val blockInfo = new HashMap[BlockId, BlockInfo]
private val blockInfo = new ConcurrentHashMap[BlockId, BlockInfo]

private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
Expand Down Expand Up @@ -227,7 +229,7 @@ private[spark] class BlockManager(
*/
private def reportAllBlocks(): Unit = {
logInfo(s"Reporting ${blockInfo.size} blocks to the master.")
for ((blockId, info) <- blockInfo) {
for ((blockId, info) <- blockInfo.asScala) {
val status = getCurrentBlockStatus(blockId, info)
if (!tryToReportBlockStatus(blockId, info, status)) {
logError(s"Failed to report $blockId to master; giving up.")
Expand Down Expand Up @@ -308,7 +310,7 @@ private[spark] class BlockManager(
* NOTE: This is mainly for testing, and it doesn't fetch information from external block store.
*/
def getStatus(blockId: BlockId): Option[BlockStatus] = {
blockInfo.get(blockId).map { info =>
blockInfo.asScala.get(blockId).map { info =>
val memSize = if (memoryStore.contains(blockId)) memoryStore.getSize(blockId) else 0L
val diskSize = if (diskStore.contains(blockId)) diskStore.getSize(blockId) else 0L
// Assume that block is not in external block store
Expand All @@ -322,7 +324,7 @@ private[spark] class BlockManager(
* may not know of).
*/
def getMatchingBlockIds(filter: BlockId => Boolean): Seq[BlockId] = {
(blockInfo.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
(blockInfo.asScala.keys ++ diskBlockManager.getAllBlocks()).filter(filter).toSeq
}

/**
Expand Down Expand Up @@ -434,15 +436,15 @@ private[spark] class BlockManager(
}

private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
val info = blockInfo.get(blockId).orNull
val info = blockInfo.get(blockId)
if (info != null) {
info.synchronized {
// Double check to make sure the block is still there. There is a small chance that the
// block has been removed by removeBlock (which also synchronizes on the blockInfo object).
// Note that this only checks metadata tracking. If user intentionally deleted the block
// on disk or from off heap storage without using removeBlock, this conditional check will
// still pass but eventually we will get an exception because we can't find the block.
if (blockInfo.get(blockId).isEmpty) {
if (blockInfo.asScala.get(blockId).isEmpty) {
logWarning(s"Block $blockId had been removed")
return None
}
Expand Down Expand Up @@ -726,7 +728,7 @@ private[spark] class BlockManager(
val putBlockInfo = {
val tinfo = new BlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
val oldBlockOpt = Option(blockInfo.putIfAbsent(blockId, tinfo))
if (oldBlockOpt.isDefined) {
if (oldBlockOpt.get.waitForReady()) {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
Expand Down Expand Up @@ -1027,7 +1029,7 @@ private[spark] class BlockManager(
data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {

logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
val info = blockInfo.get(blockId)

// If the block has not already been dropped
if (info != null) {
Expand All @@ -1038,7 +1040,7 @@ private[spark] class BlockManager(
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
} else if (blockInfo.get(blockId).isEmpty) {
} else if (blockInfo.asScala.get(blockId).isEmpty) {
logWarning(s"Block $blockId was already dropped.")
return None
}
Expand Down Expand Up @@ -1090,7 +1092,7 @@ private[spark] class BlockManager(
def removeRdd(rddId: Int): Int = {
// TODO: Avoid a linear scan by creating another mapping of RDD.id to blocks.
logInfo(s"Removing RDD $rddId")
val blocksToRemove = blockInfo.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
val blocksToRemove = blockInfo.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId)
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster = false) }
blocksToRemove.size
}
Expand All @@ -1100,7 +1102,7 @@ private[spark] class BlockManager(
*/
def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
logDebug(s"Removing broadcast $broadcastId")
val blocksToRemove = blockInfo.keys.collect {
val blocksToRemove = blockInfo.asScala.keys.collect {
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
}
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
Expand All @@ -1112,7 +1114,7 @@ private[spark] class BlockManager(
*/
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
val info = blockInfo.get(blockId).orNull
val info = blockInfo.get(blockId)
if (info != null) {
info.synchronized {
// Removals are idempotent in disk store and memory store. At worst, we get a warning.
Expand Down
109 changes: 0 additions & 109 deletions core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala

This file was deleted.

Loading

0 comments on commit 98b732a

Please sign in to comment.