Skip to content

Commit

Permalink
[SPARK-2571] Correctly report shuffle read metrics.
Browse files Browse the repository at this point in the history
Currently, shuffle read metrics are incorrectly reported when stages have multiple shuffle dependencies (they are set to be the metrics from just one of the shuffle dependencies, rather than the accumulated metrics from all of the shuffle dependencies).  This fixes that problem, and should probably be back-ported to the 0.9 branch.

Thanks ryanra for discovering this problem!

cc rxin andrewor14

Author: Kay Ousterhout <[email protected]>

Closes apache#1476 from kayousterhout/join_bug and squashes the following commits:

0203a16 [Kay Ousterhout] Fix broken unit tests.
f463c2e [Kay Ousterhout] [SPARK-2571] Correctly report shuffle read metrics.
  • Loading branch information
kayousterhout committed Jul 18, 2014
1 parent 7f17208 commit 7b971b9
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 12 deletions.
20 changes: 19 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class TaskMetrics extends Serializable {
/**
* If this task reads from shuffle output, metrics on getting shuffle data will be collected here
*/
var shuffleReadMetrics: Option[ShuffleReadMetrics] = None
private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None

def shuffleReadMetrics = _shuffleReadMetrics

/**
* If this task writes to shuffle output, metrics on the written shuffle data will be collected
Expand All @@ -87,6 +89,22 @@ class TaskMetrics extends Serializable {
* Storage statuses of any blocks that have been updated as a result of this task.
*/
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None

/** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */
def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized {
_shuffleReadMetrics match {
case Some(existingMetrics) =>
existingMetrics.shuffleFinishTime = math.max(
existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime)
existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime
existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched
existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched
existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched
existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead
case None =>
_shuffleReadMetrics = Some(newMetrics)
}
}
}

private[spark] object TaskMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics)
context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics)
})

new InterruptibleIterator[T](context, completionIter)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,9 @@ private[spark] object JsonProtocol {
metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
metrics.shuffleReadMetrics =
Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
Utils.jsonOption(json \ "Shuffle Read Metrics").map { shuffleReadMetrics =>
metrics.updateShuffleReadMetrics(shuffleReadMetricsFromJson(shuffleReadMetrics))
}
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
metrics.inputMetrics =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
taskMetrics.shuffleReadMetrics should be ('defined)
val sm = taskMetrics.shuffleReadMetrics.get
sm.totalBlocksFetched should be > (0)
sm.localBlocksFetched should be > (0)
sm.totalBlocksFetched should be (128)
sm.localBlocksFetched should be (128)
sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc

// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskMetrics.updateShuffleReadMetrics(shuffleReadMetrics)
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0, null, null, 0, null)
Expand All @@ -81,8 +81,6 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(listener.stageIdToData.size === 1)

// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
Expand All @@ -91,8 +89,6 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
.shuffleRead === 2000)

// finish this task, should get updated duration
shuffleReadMetrics.remoteBytesRead = 1000
taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ class JsonProtocolSuite extends FunSuite {
sr.localBlocksFetched = e
sr.fetchWaitTime = a + d
sr.remoteBlocksFetched = f
t.shuffleReadMetrics = Some(sr)
t.updateShuffleReadMetrics(sr)
}
sw.shuffleBytesWritten = a + b + c
sw.shuffleWriteTime = b + c + d
Expand Down

0 comments on commit 7b971b9

Please sign in to comment.