Skip to content

Commit

Permalink
Merge pull request alteryx#41 from pwendell/shuffle-benchmark
Browse files Browse the repository at this point in the history
Provide Instrumentation for Shuffle Write Performance

Shuffle write performance can have a major impact on the performance of jobs. This patch adds a few pieces of instrumentation related to shuffle writes. They are:

1. A listing of the time spent performing blocking writes for each task. This is implemented by keeping track of the aggregate delay seen by many individual writes.
2. An undocumented option `spark.shuffle.sync` which forces shuffle data to sync to disk. This is necessary for measuring shuffle performance in the absence of the OS buffer cache.
3. An internal utility which micro-benchmarks write throughput for simulated shuffle outputs.

I'm going to do some performance testing on this to see whether these small timing calls add overhead. From a feature perspective, however, I consider this complete. Any feedback is appreciated.
(cherry picked from commit 35886f3)

Signed-off-by: Patrick Wendell <[email protected]>
  • Loading branch information
pwendell committed Oct 21, 2013
1 parent 37a755c commit 1a50c79
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,9 @@ class ShuffleWriteMetrics extends Serializable {
* Number of bytes written for a shuffle
*/
var shuffleBytesWritten: Long = _

/**
* Time spent blocking on writes to disk or buffer cache, in nanoseconds.
*/
var shuffleWriteTime: Long = _
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,20 @@ private[spark] class ShuffleMapTask(

// Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L
var totalTime = 0L
val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
writer.commit()
writer.close()
val size = writer.size()
totalBytes += size
totalTime += writer.timeWriting()
MapOutputTracker.compressSize(size)
}

// Update shuffle metrics.
val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)

new MapStatus(blockManager.blockManagerId, compressedSizes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,9 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
* Size of the valid writes, in bytes.
*/
def size(): Long

/**
* Cumulative time spent performing blocking writes, in ns.
*/
def timeWriting(): Long
}
44 changes: 41 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,64 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
class DiskBlockObjectWriter(blockId: BlockId, serializer: Serializer, bufferSize: Int)
extends BlockObjectWriter(blockId) {

/** Intercepts write calls and tracks total time spent writing. Not thread safe. */
private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
def timeWriting = _timeWriting
private var _timeWriting = 0L

private def callWithTiming(f: => Unit) = {
val start = System.nanoTime()
f
_timeWriting += (System.nanoTime() - start)
}

def write(i: Int): Unit = callWithTiming(out.write(i))
override def write(b: Array[Byte]) = callWithTiming(out.write(b))
override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len))
}

private val f: File = createFile(blockId /*, allowAppendExisting */)
private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean

// The file channel, used for repositioning / truncating the file.
private var channel: FileChannel = null
private var bs: OutputStream = null
private var fos: FileOutputStream = null
private var ts: TimeTrackingOutputStream = null
private var objOut: SerializationStream = null
private var lastValidPosition = 0L
private var initialized = false
private var _timeWriting = 0L

override def open(): DiskBlockObjectWriter = {
val fos = new FileOutputStream(f, true)
fos = new FileOutputStream(f, true)
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(fos, bufferSize))
bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
this
}

override def close() {
if (initialized) {
objOut.close()
if (syncWrites) {
// Force outstanding writes to disk and track how long it takes
objOut.flush()
val start = System.nanoTime()
fos.getFD.sync()
_timeWriting += System.nanoTime() - start
objOut.close()
} else {
objOut.close()
}

_timeWriting += ts.timeWriting

channel = null
bs = null
fos = null
ts = null
objOut = null
}
// Invoke the close callback handler.
Expand Down Expand Up @@ -110,6 +145,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
}

override def size(): Long = lastValidPosition

// Only valid if called after close()
override def timeWriting = _timeWriting
}

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++
Seq("GC Time") ++
{if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++
{if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++
{if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
Seq("Errors")

val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks)
Expand Down Expand Up @@ -169,6 +169,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
}}
{if (shuffleWrite) {
<td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
parent.formatDuration(s.shuffleWriteTime / (1000 * 1000))}.getOrElse("")}</td>
<td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
}}
Expand Down
84 changes: 84 additions & 0 deletions core/src/main/scala/spark/storage/StoragePerfTester.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.apache.spark.storage

import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{CountDownLatch, Executors}

import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.SparkContext
import org.apache.spark.util.Utils

/** Utility for micro-benchmarking shuffle write performance.
*
* Writes simulated shuffle output from several threads and records the observed throughput*/
object StoragePerfTester {
def main(args: Array[String]) = {
/** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))

/** Number of map tasks. All tasks execute concurrently. */
val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)

/** Number of reduce splits for each map task. */
val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)

val recordLength = 1000 // ~1KB records
val totalRecords = dataSizeMb * 1000
val recordsPerMap = totalRecords / numMaps

val writeData = "1" * recordLength
val executor = Executors.newFixedThreadPool(numMaps)

System.setProperty("spark.shuffle.compress", "false")
System.setProperty("spark.shuffle.sync", "true")

// This is only used to instantiate a BlockManager. All thread scheduling is done manually.
val sc = new SparkContext("local[4]", "Write Tester")
val blockManager = sc.env.blockManager

def writeOutputBytes(mapId: Int, total: AtomicLong) = {
val shuffle = blockManager.shuffleBlockManager.forShuffle(1, numOutputSplits,
new KryoSerializer())
val buckets = shuffle.acquireWriters(mapId)
for (i <- 1 to recordsPerMap) {
buckets.writers(i % numOutputSplits).write(writeData)
}
buckets.writers.map {w =>
w.commit()
total.addAndGet(w.size())
w.close()
}

shuffle.releaseWriters(buckets)
}

val start = System.currentTimeMillis()
val latch = new CountDownLatch(numMaps)
val totalBytes = new AtomicLong()
for (task <- 1 to numMaps) {
executor.submit(new Runnable() {
override def run() = {
try {
writeOutputBytes(task, totalBytes)
latch.countDown()
} catch {
case e: Exception =>
println("Exception in child thread: " + e + " " + e.getMessage)
System.exit(1)
}
}
})
}
latch.await()
val end = System.currentTimeMillis()
val time = (end - start) / 1000.0
val bytesPerSecond = totalBytes.get() / time
val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong

System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))

executor.shutdown()
sc.stop()
}
}

0 comments on commit 1a50c79

Please sign in to comment.