diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index ac73288442a74..5d59e00636ee6 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -75,7 +75,9 @@ class TaskMetrics extends Serializable { /** * If this task reads from shuffle output, metrics on getting shuffle data will be collected here */ - var shuffleReadMetrics: Option[ShuffleReadMetrics] = None + private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None + + def shuffleReadMetrics = _shuffleReadMetrics /** * If this task writes to shuffle output, metrics on the written shuffle data will be collected @@ -87,6 +89,22 @@ class TaskMetrics extends Serializable { * Storage statuses of any blocks that have been updated as a result of this task. */ var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None + + /** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */ + def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized { + _shuffleReadMetrics match { + case Some(existingMetrics) => + existingMetrics.shuffleFinishTime = math.max( + existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime) + existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime + existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched + existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched + existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched + existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead + case None => + _shuffleReadMetrics = Some(newMetrics) + } + } } private[spark] object TaskMetrics { diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index a932455776e34..3795994cd920f 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -84,7 +84,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks - context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics) + context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics) }) new InterruptibleIterator[T](context, completionIter) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 47eb44b530379..2ff8b25a56d10 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -527,8 +527,9 @@ private[spark] object JsonProtocol { metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long] metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long] metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long] - metrics.shuffleReadMetrics = - Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson) + Utils.jsonOption(json \ "Shuffle Read Metrics").map { shuffleReadMetrics => + metrics.updateShuffleReadMetrics(shuffleReadMetricsFromJson(shuffleReadMetrics)) + } metrics.shuffleWriteMetrics = Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson) metrics.inputMetrics = diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 71f48e295ecca..3b0b8e2f68c97 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -258,8 +258,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers if (stageInfo.rddInfos.exists(_.name == d4.name)) { taskMetrics.shuffleReadMetrics should be ('defined) val sm = taskMetrics.shuffleReadMetrics.get - sm.totalBlocksFetched should be > (0) - sm.localBlocksFetched should be > (0) + sm.totalBlocksFetched should be (128) + sm.localBlocksFetched should be (128) sm.remoteBlocksFetched should be (0) sm.remoteBytesRead should be (0l) } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index a8556624804bb..b52f81877d557 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -63,7 +63,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc // finish this task, should get updated shuffleRead shuffleReadMetrics.remoteBytesRead = 1000 - taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) + taskMetrics.updateShuffleReadMetrics(shuffleReadMetrics) var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 var task = new ShuffleMapTask(0, null, null, 0, null) @@ -81,8 +81,6 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc assert(listener.stageIdToData.size === 1) // finish this task, should get updated duration - shuffleReadMetrics.remoteBytesRead = 1000 - taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) @@ -91,8 +89,6 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc .shuffleRead === 2000) // finish this task, should get updated duration - shuffleReadMetrics.remoteBytesRead = 1000 - taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics) taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 task = new ShuffleMapTask(0, null, null, 0, null) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 058d31453081a..11f70a6090d24 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -518,7 +518,7 @@ class JsonProtocolSuite extends FunSuite { sr.localBlocksFetched = e sr.fetchWaitTime = a + d sr.remoteBlocksFetched = f - t.shuffleReadMetrics = Some(sr) + t.updateShuffleReadMetrics(sr) } sw.shuffleBytesWritten = a + b + c sw.shuffleWriteTime = b + c + d