Skip to content

Commit

Permalink
[SNAP-1190] Reduce partition message overhead from driver to executor (
Browse files Browse the repository at this point in the history
…#31)

- DAGScheduler:
  - For small enough common task data (RDD + closure) send inline with the Task instead of a broadcast
  - Transiently store task binary data in Stage to re-use if possible
  - Compress the common task bytes to save on network cost
- Task: New TaskData class to encapsulate task compressed bytes from above, the uncompressed length
  and reference index if TaskData is being read from a separate list (see next comments)
- CoarseGrainedClusterMessage: Added new LaunchTasks message to encapsulate multiple
  Task messages to same executor
- CoarseGrainedSchedulerBackend:
  - Create LaunchTasks by grouping messages in ExecutorTaskGroup per executor
  - Actual TaskData is sent as part of TaskDescription and not the Task to easily
    separate out the common portions in a separate list
  - Send the common TaskData as a separate ArrayBuffer of data with the index into this
    list set in the original task's TaskData
- CoarseGrainedExecutorBackend: Handle LaunchTasks by splitting into individual jobs
- CompressionCodec: added bytes compress/decompress methods for more efficient byte array compression
- Executor:
  - Set the common decompressed task data back into the Task object.
  - Avoid additional serialization of TaskResult just to determine the serialization time.
    Instead now calculate the time inline during serialization write/writeExternal methods
- TaskMetrics: more generic handling for DoubleAccumulator case
- Task: Handling of TaskData during serialization to send a flag to indicate whether
  data is inlined or will be received via broadcast
- ResultTask, ShuffleMapTask: delegate handling of TaskData to parent Task class
- SparkEnv: encapsulate codec creation as a zero-arg function to avoid repeated conf lookups
- SparkContext.clean: avoid checking serializability in case non-default closure serializer is being used
- Test updates for above
Conflicts:
	core/src/main/scala/org/apache/spark/SparkEnv.scala
	core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
	core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
	core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
	core/src/main/scala/org/apache/spark/scheduler/Task.scala
	core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
	core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
	core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Conflicts:
	core/src/main/scala/org/apache/spark/SparkContext.scala
	core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
	core/src/main/scala/org/apache/spark/executor/Executor.scala
	core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
	core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
	core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
	core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
	core/src/main/scala/org/apache/spark/scheduler/Task.scala
	core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
	core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
	core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
	core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
	core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
	mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala
	resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
	resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
	resource-managers/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherSuite.scala
  • Loading branch information
Sumedh Wale authored and ymahajan committed Mar 5, 2018
1 parent b500538 commit c874a10
Show file tree
Hide file tree
Showing 20 changed files with 423 additions and 119 deletions.
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.status.AppStatusStore
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
Expand Down Expand Up @@ -230,6 +231,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _
private var _isDefaultClosureSerializer: Boolean = true

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -446,6 +448,8 @@ class SparkContext(config: SparkConf) extends Logging {
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)

_isDefaultClosureSerializer = _env.closureSerializer.isInstanceOf[JavaSerializer]

// If running the REPL, register the repl's output dir with the file server.
_conf.getOption("spark.repl.class.outputDir").foreach { path =>
val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
Expand Down Expand Up @@ -2312,7 +2316,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @return the cleaned closure
*/
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
ClosureCleaner.clean(f, checkSerializable && _isDefaultClosureSerializer)
f
}

Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.netty.NettyBlockTransferService
Expand Down Expand Up @@ -97,6 +98,11 @@ class SparkEnv (

private[spark] var driverTmpDir: Option[String] = None

private val codecCreator = CompressionCodec.codecCreator(conf,
CompressionCodec.getCodecName(conf))

def createCompressionCodec: CompressionCodec = codecCreator()

private[spark] def stop() {

if (!isStopped) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{ThreadUtils, Utils}

private[spark] class CoarseGrainedExecutorBackend(
Expand All @@ -51,10 +50,6 @@ private[spark] class CoarseGrainedExecutorBackend(
var executor: Executor = null
@volatile var driver: Option[RpcEndpointRef] = None

// If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
// to be changed so that we don't share the serializer instance across threads
private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()

override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
Expand Down Expand Up @@ -94,14 +89,30 @@ private[spark] class CoarseGrainedExecutorBackend(
logError("Slave registration failed: " + message)
exitExecutor()

case LaunchTask(data) =>
case LaunchTask(taskDesc) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
exitExecutor()
} else {
val taskDesc = TaskDescription.decode(data.value)
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask, taskDesc.taskData.decompress(env))
}

case LaunchTasks(tasks, taskDataList) =>
if (executor ne null) {
logDebug("Got assigned tasks " + tasks.map(_.taskId).mkString(","))
for (task <- tasks) {
logInfo("Got assigned task " + task.taskId)
val ref = task.taskData.reference
val taskData = if (ref >= 0) taskDataList(ref) else task.taskData
executor.launchTask(this, taskId = task.taskId,
attemptNumber = task.attemptNumber, task.name, task.serializedTask,
taskData.decompress(env))
}
} else {
exitExecutor(1, "Received LaunchTasks command but executor was null")
}

case KillTask(taskId, _, interruptThread, reason) =>
Expand Down
18 changes: 8 additions & 10 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,12 @@ private[spark] class Executor(

class TaskRunner(
execBackend: ExecutorBackend,
private val taskDescription: TaskDescription)
private val taskDescription: TaskDescription,
val taskId: Long,
val attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer,
taskDataBytes: Array[Byte])
extends Runnable {

val taskId = taskDescription.taskId
Expand Down Expand Up @@ -404,11 +409,6 @@ private[spark] class Executor(
// If the task has been killed, let's fail it.
task.context.killTaskIfInterrupted()

val resultSer = env.serializer.newInstance()
val beforeSerialization = System.nanoTime()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.nanoTime()

// Deserialization happens in two parts: first, we deserialize a Task object, which
// includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
task.metrics.setExecutorDeserializeTime(math.max(
Expand All @@ -421,8 +421,6 @@ private[spark] class Executor(
task.metrics.setExecutorCpuTime(
(taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime)
task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime)
task.metrics.setResultSerializationTime(math.max(
afterSerialization - beforeSerialization, 0L) / 1000000.0)

// Expose task metrics using the Dropwizard metrics system.
// Update task metrics counters
Expand Down Expand Up @@ -467,8 +465,8 @@ private[spark] class Executor(

// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val directResult = new DirectTaskResult(value, accumUpdates,
Some(task.metrics.resultSerializationTimeMetric))
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class TaskMetrics private[spark] () extends Serializable with KryoSerializable {
private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v)
private[spark] def setResultSerializationTime(v: Double): Unit =
_resultSerializationTime.setValue(v)
private[spark] def resultSerializationTimeMetric = _resultSerializationTime
private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v)
private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v)
private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v)
Expand Down Expand Up @@ -338,7 +339,12 @@ private[spark] object TaskMetrics extends Logging {
} else {
tm.nameToAccums.get(name).foreach {
case l: LongAccumulator => l.setValue(value.asInstanceOf[Long])
case d => d.asInstanceOf[DoubleAccumulator].setValue(value.asInstanceOf[Double])
case d: DoubleAccumulator => value match {
case v: Double => d.setValue(v)
case _ => d.setValue(value.asInstanceOf[Long])
}
case o => throw new UnsupportedOperationException(
s"Unexpected accumulator $o for TaskMetrics")
}
}
}
Expand Down
74 changes: 62 additions & 12 deletions core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.io._
import java.util.Locale

import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
import com.ning.compress.lzf.{LZFDecoder, LZFEncoder, LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory}
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}

import org.apache.spark.SparkConf
Expand All @@ -43,6 +43,11 @@ trait CompressionCodec {
def compressedOutputStream(s: OutputStream): OutputStream

def compressedInputStream(s: InputStream): InputStream

def compress(input: Array[Byte], inputLen: Int): Array[Byte]

def decompress(input: Array[Byte], inputOffset: Int, inputLen: Int,
outputLen: Int): Array[Byte]
}

private[spark] object CompressionCodec {
Expand All @@ -69,16 +74,32 @@ private[spark] object CompressionCodec {
}

def createCodec(conf: SparkConf, codecName: String): CompressionCodec = {
val codecClass =
shortCompressionCodecNames.getOrElse(codecName.toLowerCase(Locale.ROOT), codecName)
val codec = try {
codecCreator(conf, codecName)()
}

def codecCreator(conf: SparkConf, codecName: String): () => CompressionCodec = {
if (codecName == DEFAULT_COMPRESSION_CODEC) {
return () => new LZ4CompressionCodec(conf)
}
val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName)
try {
val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf])
Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec])
() => {
try {
ctor.newInstance(conf).asInstanceOf[CompressionCodec]
} catch {
case e: IllegalArgumentException => throw fail(codecName)
}
}
} catch {
case _: ClassNotFoundException | _: IllegalArgumentException => None
case e: ClassNotFoundException => throw fail(codecName)
case e: NoSuchMethodException => throw fail(codecName)
}
codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " +
s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC"))
}

private def fail(codecName: String): IllegalArgumentException = {
new IllegalArgumentException(s"Codec [$codecName] is not available. " +
s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC")
}

/**
Expand Down Expand Up @@ -117,9 +138,16 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
new LZ4BlockOutputStream(s, blockSize)
}

override def compressedInputStream(s: InputStream): InputStream = {
val disableConcatenationOfByteStream = false
new LZ4BlockInputStream(s, disableConcatenationOfByteStream)
override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s)

override def compress(input: Array[Byte], inputLen: Int): Array[Byte] = {
LZ4Factory.fastestInstance().fastCompressor().compress(input, 0, inputLen)
}

override def decompress(input: Array[Byte], inputOffset: Int, inputLen: Int,
outputLen: Int): Array[Byte] = {
LZ4Factory.fastestInstance().fastDecompressor().decompress(input,
inputOffset, outputLen)
}
}

Expand All @@ -140,6 +168,17 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
}

override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)

override def compress(input: Array[Byte], inputLen: Int): Array[Byte] = {
LZFEncoder.encode(input, 0, inputLen)
}

override def decompress(input: Array[Byte], inputOffset: Int, inputLen: Int,
outputLen: Int): Array[Byte] = {
val output = new Array[Byte](outputLen)
LZFDecoder.decode(input, inputOffset, inputLen, output)
output
}
}


Expand All @@ -162,6 +201,17 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
}

override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)

override def compress(input: Array[Byte], inputLen: Int): Array[Byte] = {
Snappy.rawCompress(input, inputLen)
}

override def decompress(input: Array[Byte], inputOffset: Int,
inputLen: Int, outputLen: Int): Array[Byte] = {
val output = new Array[Byte](outputLen)
Snappy.uncompress(input, inputOffset, inputLen, output, 0)
output
}
}

/**
Expand Down
30 changes: 27 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@ class DAGScheduler(
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
<<<<<<< HEAD
var taskBinaryBytes: Array[Byte] = null
// taskBinaryBytes and partitions are both effected by the checkpoint status. We need
// this synchronization in case another concurrent job is checkpointing this RDD, so we get a
Expand All @@ -1035,8 +1036,23 @@ class DAGScheduler(

partitions = stage.rdd.partitions
}

taskBinary = sc.broadcast(taskBinaryBytes)
if (bytes == null) stage.taskBinaryBytes = taskBinaryBytes

// use direct byte shipping for small size or if number of partitions is small
val taskBytesLen = taskBinaryBytes.length
if (taskBytesLen <= DAGScheduler.TASK_INLINE_LIMIT ||
partitionsToCompute.length <= DAGScheduler.TASK_INLINE_PARTITION_LIMIT) {
if (stage.taskData.uncompressedLen > 0) {
taskData = stage.taskData
} else {
// compress inline task data (broadcast compresses as per conf)
taskData = new TaskData(env.createCompressionCodec.compress(
taskBinaryBytes, taskBytesLen), taskBytesLen)
stage.taskData = taskData
}
} else {
taskBinary = Some(sc.broadcast(taskBinaryBytes))
}
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
Expand Down Expand Up @@ -1545,7 +1561,7 @@ class DAGScheduler(
* Marks a stage as finished and removes it from the list of running stages.
*/
private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
val serviceTime = stage.latestInfo.submissionTime match {
val serviceTime = if (!log.isInfoEnabled) 0L else stage.latestInfo.submissionTime match {
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
case _ => "Unknown"
}
Expand Down Expand Up @@ -1847,4 +1863,12 @@ private[spark] object DAGScheduler {

// Number of consecutive stage attempts allowed before a stage is aborted
val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4

// The maximum size of uncompressed common task bytes (rdd, closure)
// that will be shipped with the task else will be broadcast separately.
val TASK_INLINE_LIMIT = 100 * 1024

// The maximum number of partitions below which common task bytes will be
// shipped with the task else will be broadcast separately.
val TASK_INLINE_PARTITION_LIMIT = 8
}
Loading

0 comments on commit c874a10

Please sign in to comment.