Skip to content

Commit

Permalink
Record accurate size of blocks in MapStatus when it's above threshold.
Browse files Browse the repository at this point in the history
  • Loading branch information
jinxing committed May 19, 2017
1 parent 69bb771 commit bfea9f5
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 7 deletions.
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,22 @@ package object config {
"spark.io.compression.codec.")
.booleanConf
.createWithDefault(false)

private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
ConfigBuilder("spark.shuffle.accurateBlockThreshold")
.doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " +
"record the size accurately if it's above this config and " +
"spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. This helps to prevent" +
" OOM by avoiding underestimating shuffle block size when fetch shuffle blocks.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100 * 1024 * 1024)

private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE =
ConfigBuilder("spark.shuffle.accurateBlockThresholdByTimesAverage")
.doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " +
"record the size accurately if it's above this config * averageSize and " +
"spark.shuffle.accurateBlockThreshold. This helps to prevent OOM by avoiding " +
"underestimating shuffle block size when fetch shuffle blocks.")
.intConf
.createWithDefault(2)
}
57 changes: 51 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ package org.apache.spark.scheduler

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.SparkEnv
import org.apache.spark.internal.config
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -121,48 +126,69 @@ private[spark] class CompressedMapStatus(
}

/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
* plus a bitmap for tracking which blocks are empty.
* A [[MapStatus]] implementation that stores the accurate size of huge blocks, which are larger
* than both spark.shuffle.accurateBlockThreshold and
* spark.shuffle.accurateBlockThresholdByTimesAverage * averageSize. It stores the
* average size of other non-empty blocks, plus a bitmap for tracking which blocks are empty.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
* @param emptyBlocks a bitmap tracking which blocks are empty
* @param avgSize average size of the non-empty blocks
* @param hugeBlockSizes sizes of huge blocks by their reduceId.
*/
private[spark] class HighlyCompressedMapStatus private (
private[this] var loc: BlockManagerId,
private[this] var numNonEmptyBlocks: Int,
private[this] var emptyBlocks: RoaringBitmap,
private[this] var avgSize: Long)
private[this] var avgSize: Long,
@transient private var hugeBlockSizes: Map[Int, Byte])
extends MapStatus with Externalizable {

// loc could be null when the default constructor is called during deserialization
require(loc == null || avgSize > 0 || numNonEmptyBlocks == 0,
"Average size can only be zero for map stages that produced no output")

protected def this() = this(null, -1, null, -1) // For deserialization only
protected def this() = this(null, -1, null, -1, null) // For deserialization only

override def location: BlockManagerId = loc

override def getSizeForBlock(reduceId: Int): Long = {
assert(hugeBlockSizes != null)
if (emptyBlocks.contains(reduceId)) {
0
} else {
avgSize
hugeBlockSizes.get(reduceId) match {
case Some(size) => MapStatus.decompressSize(size)
case None => avgSize
}
}
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
out.writeInt(hugeBlockSizes.size)
hugeBlockSizes.foreach { kv =>
out.writeInt(kv._1)
out.writeByte(kv._2)
}
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
avgSize = in.readLong()
val count = in.readInt()
val hugeBlockSizesArray = mutable.ArrayBuffer[Tuple2[Int, Byte]]()
(0 until count).foreach { _ =>
val block = in.readInt()
val size = in.readByte()
hugeBlockSizesArray += Tuple2(block, size)
}
hugeBlockSizes = hugeBlockSizesArray.toMap
}
}

Expand Down Expand Up @@ -193,8 +219,27 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
val threshold1 = Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
.getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
val threshold2 = avgSize * Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE))
.getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.defaultValue.get)
val threshold = math.max(threshold1, threshold2)
val hugeBlockSizesArray = ArrayBuffer[Tuple2[Int, Byte]]()
if (numNonEmptyBlocks > 0) {
i = 0
while (i < totalNumBlocks) {
if (uncompressedSizes(i) > threshold) {
hugeBlockSizesArray += Tuple2(i, MapStatus.compressSize(uncompressedSizes(i)))

}
i += 1
}
}
emptyBlocks.trim()
emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize,
hugeBlockSizesArray.toMap)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.spark.scheduler

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}

import scala.util.Random

import org.mockito.Mockito._
import org.roaringbitmap.RoaringBitmap

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.internal.config
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage.BlockManagerId

Expand Down Expand Up @@ -128,4 +132,54 @@ class MapStatusSuite extends SparkFunSuite {
assert(size1 === size2)
assert(!success)
}

test("When SHUFFLE_ACCURATE_BLOCK_THRESHOLD is 0, blocks which are bigger than " +
"SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE * averageSize should not be " +
"underestimated.") {
val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "0")
.set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.key, "2")
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
SparkEnv.set(env)
// Value of element in sizes is equal to the corresponding index when index >= 1000.
val sizes = Array.concat(Array.fill[Long](1000)(1L), (1000L to 2000L).toArray)
val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes)
val arrayStream = new ByteArrayOutputStream(102400)
val objectOutputStream = new ObjectOutputStream(arrayStream)
assert(status1.isInstanceOf[HighlyCompressedMapStatus])
objectOutputStream.writeObject(status1)
objectOutputStream.flush()
val array = arrayStream.toByteArray
val objectInput = new ObjectInputStream(new ByteArrayInputStream(array))
val status2 = objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus]
val avg = sizes.sum / 2001
((2 * avg + 1) to 2000).foreach {
case part =>
assert(status2.getSizeForBlock(part.toInt) >= sizes(part.toInt))
}
}

test("When SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE is 0, blocks which are bigger than" +
" SHUFFLE_ACCURATE_BLOCK_THRESHOLD should not be underestimated.")
{
val conf = new SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.key, "1000")
.set(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD_BY_TIMES_AVERAGE.key, "0")
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
SparkEnv.set(env)
// Value of element in sizes is equal to the corresponding index.
val sizes = (0L to 2000L).toArray
val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes)
val arrayStream = new ByteArrayOutputStream(102400)
val objectOutputStream = new ObjectOutputStream(arrayStream)
assert(status1.isInstanceOf[HighlyCompressedMapStatus])
objectOutputStream.writeObject(status1)
objectOutputStream.flush()
val array = arrayStream.toByteArray
val objectInput = new ObjectInputStream(new ByteArrayInputStream(array))
val status2 = objectInput.readObject().asInstanceOf[HighlyCompressedMapStatus]
(1001 to 2000).foreach {
case part => assert(status2.getSizeForBlock(part) >= sizes(part))
}
}
}
20 changes: 20 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,26 @@ Apart from these, the following properties are also available, and may be useful
<code>spark.io.compression.codec</code>.
</td>
</tr>
<tr>
<td><code>spark.shuffle.accurateBlockThreshold</code></td>
<td>100 * 1024 * 1024</td>
<td>
When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will record the
size accurately if it's above this config and
<code>spark.shuffle.accurateBlockThresholdByTimesAverage</code> * averageSize. This helps to
prevent OOM by avoiding underestimating shuffle block size when fetch shuffle blocks.
</td>
</tr>
<tr>
<td><code>spark.shuffle.accurateBlockThresholdByTimesAverage</code></td>
<td>2</td>
<td>
When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will record the
size accurately if it's above this config * averageSize and
<code>spark.shuffle.accurateBlockThreshold</code>. This helps to prevent OOM by avoiding
underestimating shuffle block size when fetch shuffle blocks.
</td>
<tr>
<tr>
<td><code>spark.io.encryption.enabled</code></td>
<td>false</td>
Expand Down

0 comments on commit bfea9f5

Please sign in to comment.