From 0320a143695e9c0d1715a3b486590e08306b7f02 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Sat, 3 Dec 2016 17:36:38 +0530 Subject: [PATCH] [SNAP-1190] Reduce partition message overhead from driver to executor (#31) - DAGScheduler: - For small enough common task data (RDD + closure) send inline with the Task instead of a broadcast - Transiently store task binary data in Stage to re-use if possible - Compress the common task bytes to save on network cost - Task: New TaskData class to encapsulate task compressed bytes from above, the uncompressed length and reference index if TaskData is being read from a separate list (see next comments) - CoarseGrainedClusterMessage: Added new LaunchTasks message to encapsulate multiple Task messages to same executor - CoarseGrainedSchedulerBackend: - Create LaunchTasks by grouping messages in ExecutorTaskGroup per executor - Actual TaskData is sent as part of TaskDescription and not the Task to easily separate out the common portions in a separate list - Send the common TaskData as a separate ArrayBuffer of data with the index into this list set in the original task's TaskData - CoarseGrainedExecutorBackend: Handle LaunchTasks by splitting into individual jobs - CompressionCodec: added bytes compress/decompress methods for more efficient byte array compression - Executor: - Set the common decompressed task data back into the Task object. - Avoid additional serialization of TaskResult just to determine the serialization time. Instead now calculate the time inline during serialization write/writeExternal methods - TaskMetrics: more generic handling for DoubleAccumulator case - Task: Handling of TaskData during serialization to send a flag to indicate whether data is inlined or will be received via broadcast - ResultTask, ShuffleMapTask: delegate handling of TaskData to parent Task class - SparkEnv: encapsulate codec creation as a zero-arg function to avoid repeated conf lookups - SparkContext.clean: avoid checking serializability in case non-default closure serializer is being used - Test updates for above --- .../scala/org/apache/spark/SparkContext.scala | 6 +- .../scala/org/apache/spark/SparkEnv.scala | 6 + .../CoarseGrainedExecutorBackend.scala | 26 ++-- .../org/apache/spark/executor/Executor.scala | 26 ++-- .../spark/executor/MesosExecutorBackend.scala | 2 +- .../apache/spark/executor/TaskMetrics.scala | 8 +- .../apache/spark/io/CompressionCodec.scala | 69 +++++++-- .../apache/spark/scheduler/DAGScheduler.scala | 39 ++++- .../apache/spark/scheduler/ResultTask.scala | 22 +-- .../spark/scheduler/ShuffleMapTask.scala | 25 ++-- .../org/apache/spark/scheduler/Stage.scala | 4 +- .../org/apache/spark/scheduler/Task.scala | 102 ++++++++++++- .../spark/scheduler/TaskDescription.scala | 5 +- .../apache/spark/scheduler/TaskResult.scala | 100 ++++++++----- .../spark/scheduler/TaskSetManager.scala | 4 +- .../cluster/CoarseGrainedClusterMessage.scala | 60 +++++++- .../CoarseGrainedSchedulerBackend.scala | 134 ++++++++++++++---- .../MesosFineGrainedSchedulerBackend.scala | 3 +- .../cluster/mesos/MesosTaskLaunchData.scala | 20 ++- .../local/LocalSchedulerBackend.scala | 2 +- .../storage/ShuffleBlockFetcherIterator.scala | 4 +- .../spark/scheduler/TaskContextSuite.scala | 6 +- .../spark/scheduler/TaskSetManagerSuite.scala | 3 +- .../mesos/MesosTaskLaunchDataSuite.scala | 6 +- .../spark/sql/execution/SparkPlan.scala | 4 +- 25 files changed, 536 insertions(+), 150 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1c0b783ac82f5..b35f7bad0b7df 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -73,6 +73,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend +import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} @@ -232,6 +233,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ + private var _isDefaultClosureSerializer: Boolean = true /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -438,6 +440,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) + _isDefaultClosureSerializer = _env.closureSerializer.isInstanceOf[JavaSerializer] + // If running the REPL, register the repl's output dir with the file server. _conf.getOption("spark.repl.class.outputDir").foreach { path => val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path)) @@ -2054,7 +2058,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * serializable */ private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { - ClosureCleaner.clean(f, checkSerializable) + ClosureCleaner.clean(f, checkSerializable && _isDefaultClosureSerializer) f } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index beb69cbba5efc..287083e3b1f94 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -47,6 +47,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging +import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.netty.NettyBlockTransferService @@ -94,6 +95,11 @@ class SparkEnv ( private[spark] var driverTmpDir: Option[String] = None + private val codecCreator = CompressionCodec.codecCreator(conf, + CompressionCodec.getCodecName(conf)) + + def createCompressionCodec: CompressionCodec = codecCreator() + private[spark] def stop() { if (!isStopped) { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 1175fa347ea37..7dad024d2d12a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -31,9 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.internal.Logging import org.apache.spark.rpc._ -import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( @@ -50,10 +48,6 @@ private[spark] class CoarseGrainedExecutorBackend( var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None - // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need - // to be changed so that we don't share the serializer instance across threads - private[this] val ser: SerializerInstance = env.closureSerializer.newInstance() - override def onStart() { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => @@ -91,14 +85,28 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisterExecutorFailed(message) => exitExecutor(1, "Slave registration failed: " + message) - case LaunchTask(data) => + case LaunchTask(taskDesc) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { - val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, - taskDesc.name, taskDesc.serializedTask) + taskDesc.name, taskDesc.serializedTask, taskDesc.taskData.decompress(env)) + } + + case LaunchTasks(tasks, taskDataList) => + if (executor ne null) { + logDebug("Got assigned tasks " + tasks.map(_.taskId).mkString(",")) + for (task <- tasks) { + logInfo("Got assigned task " + task.taskId) + val ref = task.taskData.reference + val taskData = if (ref >= 0) taskDataList(ref) else task.taskData + executor.launchTask(this, taskId = task.taskId, + attemptNumber = task.attemptNumber, task.name, task.serializedTask, + taskData.decompress(env)) + } + } else { + exitExecutor(1, "Received LaunchTasks command but executor was null") } case KillTask(taskId, _, interruptThread) => diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 0d416e8e798aa..204b24a075bd1 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -140,9 +140,10 @@ private[spark] class Executor( taskId: Long, attemptNumber: Int, taskName: String, - serializedTask: ByteBuffer): Unit = { - val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, - serializedTask) + serializedTask: ByteBuffer, + taskDataBytes: Array[Byte]): Unit = { + val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, + taskName, serializedTask, taskDataBytes) runningTasks.put(taskId, tr) threadPool.execute(tr) } @@ -189,7 +190,8 @@ private[spark] class Executor( val taskId: Long, val attemptNumber: Int, taskName: String, - serializedTask: ByteBuffer) + serializedTask: ByteBuffer, + taskDataBytes: Array[Byte]) extends Runnable { /** Whether this task has been killed. */ @@ -251,6 +253,7 @@ private[spark] class Executor( updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) + task.taskDataBytes = taskDataBytes task.localProperties = taskProps task.setTaskMemoryManager(taskMemoryManager) @@ -308,11 +311,6 @@ private[spark] class Executor( throw new TaskKilledException } - val resultSer = env.serializer.newInstance() - val beforeSerialization = System.nanoTime() - val valueBytes = resultSer.serialize(value) - val afterSerialization = System.nanoTime() - // Deserialization happens in two parts: first, we deserialize a Task object, which // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. task.metrics.setExecutorDeserializeTime(math.max( @@ -321,13 +319,13 @@ private[spark] class Executor( task.metrics.setExecutorRunTime(math.max( taskFinish - taskStart - task.executorDeserializeTime, 0L) / 1000000.0) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) - task.metrics.setResultSerializationTime(math.max( - afterSerialization - beforeSerialization, 0L) / 1000000.0) - // Note: accumulator updates must be collected after TaskMetrics is updated + // Now resultSerializationTime is evaluated directly inside the + // serialization write methods and added to final serialized bytes + // to avoid double serialization of Task (for timing then TaskResult). val accumUpdates = task.collectAccumulatorUpdates() - // TODO: do not serialize value twice - val directResult = new DirectTaskResult(valueBytes, accumUpdates) + val directResult = new DirectTaskResult(value, accumUpdates, + Some(task.metrics.resultSerializationTimeMetric)) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index 680cfb733e9e6..442ea2088b61c 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -90,7 +90,7 @@ private[spark] class MesosExecutorBackend } else { SparkHadoopUtil.get.runAsSparkUser { () => executor.launchTask(this, taskId = taskId, attemptNumber = taskData.attemptNumber, - taskInfo.getName, taskData.serializedTask) + taskInfo.getName, taskData.serializedTask, taskData.taskData.decompress()) } } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 6c4725bb5df0f..cfd33f8fe0051 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -117,6 +117,7 @@ class TaskMetrics private[spark] () extends Serializable with KryoSerializable { private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v) private[spark] def setResultSerializationTime(v: Double): Unit = _resultSerializationTime.setValue(v) + private[spark] def resultSerializationTimeMetric = _resultSerializationTime private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v) private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v) private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v) @@ -316,7 +317,12 @@ private[spark] object TaskMetrics extends Logging { } else { tm.nameToAccums.get(name).foreach { case l: LongAccumulator => l.setValue(value.asInstanceOf[Long]) - case d => d.asInstanceOf[DoubleAccumulator].setValue(value.asInstanceOf[Double]) + case d: DoubleAccumulator => value match { + case v: Double => d.setValue(v) + case _ => d.setValue(value.asInstanceOf[Long]) + } + case o => throw new UnsupportedOperationException( + s"Unexpected accumulator $o for TaskMetrics") } } } diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index ae014becef755..1bf63fc6a490d 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -19,8 +19,8 @@ package org.apache.spark.io import java.io._ -import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} -import net.jpountz.lz4.LZ4BlockOutputStream +import com.ning.compress.lzf.{LZFDecoder, LZFEncoder, LZFInputStream, LZFOutputStream} +import net.jpountz.lz4.{LZ4BlockOutputStream, LZ4Factory} import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf @@ -42,6 +42,11 @@ trait CompressionCodec { def compressedOutputStream(s: OutputStream): OutputStream def compressedInputStream(s: InputStream): InputStream + + def compress(input: Array[Byte], inputLen: Int): Array[Byte] + + def decompress(input: Array[Byte], inputOffset: Int, inputLen: Int, + outputLen: Int): Array[Byte] } private[spark] object CompressionCodec { @@ -67,16 +72,32 @@ private[spark] object CompressionCodec { } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { + codecCreator(conf, codecName)() + } + + def codecCreator(conf: SparkConf, codecName: String): () => CompressionCodec = { + if (codecName == DEFAULT_COMPRESSION_CODEC) { + return () => new LZ4CompressionCodec(conf) + } val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName) - val codec = try { + try { val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf]) - Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec]) + () => { + try { + ctor.newInstance(conf).asInstanceOf[CompressionCodec] + } catch { + case e: IllegalArgumentException => throw fail(codecName) + } + } } catch { - case e: ClassNotFoundException => None - case e: IllegalArgumentException => None + case e: ClassNotFoundException => throw fail(codecName) + case e: NoSuchMethodException => throw fail(codecName) } - codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " + - s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC")) + } + + private def fail(codecName: String): IllegalArgumentException = { + new IllegalArgumentException(s"Codec [$codecName] is not available. " + + s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC") } /** @@ -116,6 +137,16 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { } override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s) + + override def compress(input: Array[Byte], inputLen: Int): Array[Byte] = { + LZ4Factory.fastestInstance().fastCompressor().compress(input, 0, inputLen) + } + + override def decompress(input: Array[Byte], inputOffset: Int, inputLen: Int, + outputLen: Int): Array[Byte] = { + LZ4Factory.fastestInstance().fastDecompressor().decompress(input, + inputOffset, outputLen) + } } @@ -135,6 +166,17 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { } override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s) + + override def compress(input: Array[Byte], inputLen: Int): Array[Byte] = { + LZFEncoder.encode(input, 0, inputLen) + } + + override def decompress(input: Array[Byte], inputOffset: Int, inputLen: Int, + outputLen: Int): Array[Byte] = { + val output = new Array[Byte](outputLen) + LZFDecoder.decode(input, inputOffset, inputLen, output) + output + } } @@ -157,6 +199,17 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { } override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) + + override def compress(input: Array[Byte], inputLen: Int): Array[Byte] = { + Snappy.rawCompress(input, inputLen) + } + + override def decompress(input: Array[Byte], inputOffset: Int, + inputLen: Int, outputLen: Int): Array[Byte] = { + val output = new Array[Byte](outputLen) + Snappy.uncompress(input, inputOffset, inputLen, output, 0) + output + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index e7e2ff1718f22..1c2bf868d4151 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -997,19 +997,36 @@ class DAGScheduler( // task gets a different copy of the RDD. This provides stronger isolation between tasks that // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. - var taskBinary: Broadcast[Array[Byte]] = null + var taskBinary: Option[Broadcast[Array[Byte]]] = None + var taskData: TaskData = TaskData.EMPTY try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). - val taskBinaryBytes: Array[Byte] = stage match { + val bytes = stage.taskBinaryBytes + val taskBinaryBytes: Array[Byte] = if (bytes != null) bytes else stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } - - taskBinary = sc.broadcast(taskBinaryBytes) + if (bytes == null) stage.taskBinaryBytes = taskBinaryBytes + + // use direct byte shipping for small size or if number of partitions is small + val taskBytesLen = taskBinaryBytes.length + if (taskBytesLen <= DAGScheduler.TASK_INLINE_LIMIT || + partitionsToCompute.length <= DAGScheduler.TASK_INLINE_PARTITION_LIMIT) { + if (stage.taskData.uncompressedLen > 0) { + taskData = stage.taskData + } else { + // compress inline task data (broadcast compresses as per conf) + taskData = new TaskData(env.createCompressionCodec.compress( + taskBinaryBytes, taskBytesLen), taskBytesLen) + stage.taskData = taskData + } + } else { + taskBinary = Some(sc.broadcast(taskBinaryBytes)) + } } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => @@ -1030,7 +1047,7 @@ class DAGScheduler( partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = stage.rdd.partitions(id) - new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, + new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskData, taskBinary, part, locs, stage.latestInfo.taskMetrics, properties) } @@ -1040,7 +1057,7 @@ class DAGScheduler( val p: Int = stage.partitions(id) val part = stage.rdd.partitions(p) val locs = taskIdToLocations(id) - new ResultTask(stage.id, stage.latestInfo.attemptId, + new ResultTask(stage.id, stage.latestInfo.attemptId, taskData, taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics) } } @@ -1400,7 +1417,7 @@ class DAGScheduler( * Marks a stage as finished and removes it from the list of running stages. */ private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = { - val serviceTime = stage.latestInfo.submissionTime match { + val serviceTime = if (!log.isInfoEnabled) 0L else stage.latestInfo.submissionTime match { case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0) case _ => "Unknown" } @@ -1691,4 +1708,12 @@ private[spark] object DAGScheduler { // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one // as more failure events come in val RESUBMIT_TIMEOUT = 200 + + // The maximum size of uncompressed common task bytes (rdd, closure) + // that will be shipped with the task else will be broadcast separately. + val TASK_INLINE_LIMIT = 100 * 1024 + + // The maximum number of partitions below which common task bytes will be + // shipped with the task else will be broadcast separately. + val TASK_INLINE_PARTITION_LIMIT = 8 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 6193ab2b09d7d..17e1757ec1abd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -21,7 +21,7 @@ import java.io._ import java.nio.ByteBuffer import java.util.Properties -import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark._ @@ -36,7 +36,9 @@ import org.apache.spark.rdd.RDD * * @param stageId id of the stage this task belongs to * @param stageAttemptId attempt id of the stage this task belongs to - * @param taskBinary broadcasted version of the serialized RDD and the function to apply on each + * @param _taskData if serialized RDD and function are small, then it is compressed + * and sent with its original decompressed size + * @param _taskBinary broadcasted version of the serialized RDD and the function to apply on each * partition of the given RDD. Once deserialized, the type should be * (RDD[T], (TaskContext, Iterator[T]) => U). * @param partition partition of the RDD this task is associated with @@ -49,14 +51,16 @@ import org.apache.spark.rdd.RDD private[spark] class ResultTask[T, U]( stageId: Int, stageAttemptId: Int, - private var taskBinary: Broadcast[Array[Byte]], + _taskData: TaskData, + _taskBinary: Option[Broadcast[Array[Byte]]], private var partition: Partition, locs: Seq[TaskLocation], private var _outputId: Int, localProperties: Properties, metrics: TaskMetrics) - extends Task[U](stageId, stageAttemptId, partition.index, metrics, localProperties) - with Serializable { + extends Task[U](stageId, stageAttemptId, partition.index, + _taskData, _taskBinary, metrics, localProperties) + with Serializable with KryoSerializable { final def outputId: Int = _outputId @@ -69,7 +73,7 @@ private[spark] class ResultTask[T, U]( val deserializeStartTime = System.nanoTime() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( - ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) + ByteBuffer.wrap(getTaskBytes), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = math.max(System.nanoTime() - deserializeStartTime, 0L) func(context, rdd.iterator(partition, context)) @@ -81,15 +85,13 @@ private[spark] class ResultTask[T, U]( override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")" override def write(kryo: Kryo, output: Output): Unit = { - super.write(kryo, output) - kryo.writeClassAndObject(output, taskBinary) + super.writeKryo(kryo, output) kryo.writeClassAndObject(output, partition) output.writeInt(_outputId) } override def read(kryo: Kryo, input: Input): Unit = { - super.read(kryo, input) - taskBinary = kryo.readClassAndObject(input).asInstanceOf[Broadcast[Array[Byte]]] + super.readKryo(kryo, input) partition = kryo.readClassAndObject(input).asInstanceOf[Partition] _outputId = input.readInt() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 2123d31915694..3e6bf66788885 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -22,7 +22,7 @@ import java.util.Properties import scala.language.existentials -import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark._ @@ -40,7 +40,9 @@ import org.apache.spark.shuffle.ShuffleWriter * * @param stageId id of the stage this task belongs to * @param stageAttemptId attempt id of the stage this task belongs to - * @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized, + * @param _taskData if serialized RDD and function are small, then it is compressed + * and sent with its original decompressed size + * @param _taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized, * the type should be (RDD[_], ShuffleDependency[_, _, _]). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling @@ -50,17 +52,20 @@ import org.apache.spark.shuffle.ShuffleWriter private[spark] class ShuffleMapTask( stageId: Int, stageAttemptId: Int, - private var taskBinary: Broadcast[Array[Byte]], + _taskData: TaskData, + _taskBinary: Option[Broadcast[Array[Byte]]], private var partition: Partition, @transient private var locs: Seq[TaskLocation], metrics: TaskMetrics, localProperties: Properties) - extends Task[MapStatus](stageId, stageAttemptId, partition.index, metrics, localProperties) - with Logging { + extends Task[MapStatus](stageId, stageAttemptId, partition.index, + _taskData, _taskBinary, metrics, localProperties) + with KryoSerializable with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ def this(partitionId: Int) { - this(0, 0, null, new Partition { override def index: Int = 0 }, null, null, new Properties) + this(0, 0, TaskData.EMPTY, null, new Partition { override def index: Int = 0 }, + null, null, new Properties) } @transient private val preferredLocs: Seq[TaskLocation] = { @@ -72,7 +77,7 @@ private[spark] class ShuffleMapTask( val deserializeStartTime = System.nanoTime() val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( - ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) + ByteBuffer.wrap(getTaskBytes), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = math.max(System.nanoTime() - deserializeStartTime, 0L) var writer: ShuffleWriter[Any, Any] = null @@ -100,14 +105,12 @@ private[spark] class ShuffleMapTask( override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId) override def write(kryo: Kryo, output: Output): Unit = { - super.write(kryo, output) - kryo.writeClassAndObject(output, taskBinary) + super.writeKryo(kryo, output) kryo.writeClassAndObject(output, partition) } override def read(kryo: Kryo, input: Input): Unit = { - super.read(kryo, input) - taskBinary = kryo.readClassAndObject(input).asInstanceOf[Broadcast[Array[Byte]]] + super.readKryo(kryo, input) partition = kryo.readClassAndObject(input).asInstanceOf[Partition] } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 2f972b064b477..fd824c4b2ff06 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -60,7 +60,9 @@ private[scheduler] abstract class Stage( val numTasks: Int, val parents: List[Stage], val firstJobId: Int, - val callSite: CallSite) + val callSite: CallSite, + @transient private[scheduler] var taskBinaryBytes: Array[Byte] = null, + @transient private[scheduler] var taskData: TaskData = TaskData.EMPTY) extends Logging { val numPartitions = rdd.partitions.length diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 3fed924359d3e..42d5c69f2f41e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -24,10 +24,11 @@ import java.util.Properties import scala.collection.mutable import scala.collection.mutable.HashMap -import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} import org.apache.spark.metrics.MetricsSystem @@ -55,10 +56,11 @@ private[spark] abstract class Task[T]( private var _stageId: Int, private var _stageAttemptId: Int, private var _partitionId: Int, + @transient private[spark] var taskData: TaskData = TaskData.EMPTY, // The default value is only used in tests. + protected var taskBinary: Option[Broadcast[Array[Byte]]] = None, private var _metrics: TaskMetrics = TaskMetrics.registered, - @transient var localProperties: Properties = new Properties) extends Serializable - with KryoSerializable { + @transient var localProperties: Properties = new Properties) extends Serializable { final def stageId: Int = _stageId @@ -68,6 +70,13 @@ private[spark] abstract class Task[T]( final def metrics: TaskMetrics = _metrics + @transient private[spark] var taskDataBytes: Array[Byte] = _ + + protected final def getTaskBytes: Array[Byte] = { + val bytes = taskDataBytes + if ((bytes ne null) && bytes.length > 0) bytes else taskBinary.get.value + } + /** * Called by [[org.apache.spark.executor.Executor]] to run this task. * @@ -197,21 +206,36 @@ private[spark] abstract class Task[T]( } } - override def write(kryo: Kryo, output: Output): Unit = { + protected def writeKryo(kryo: Kryo, output: Output): Unit = { output.writeInt(_stageId) output.writeVarInt(_stageAttemptId, true) output.writeVarInt(_partitionId, true) output.writeLong(epoch) output.writeLong(_executorDeserializeTime) + if ((taskData ne null) && taskData.uncompressedLen > 0) { + // actual bytes will be shipped in TaskDescription + output.writeBoolean(true) + } else { + output.writeBoolean(false) + kryo.writeClassAndObject(output, taskBinary.get) + } _metrics.write(kryo, output) } - override def read(kryo: Kryo, input: Input): Unit = { + def readKryo(kryo: Kryo, input: Input): Unit = { _stageId = input.readInt() _stageAttemptId = input.readVarInt(true) _partitionId = input.readVarInt(true) epoch = input.readLong() _executorDeserializeTime = input.readLong() + // actual bytes are shipped in TaskDescription + taskData = TaskData.EMPTY + if (input.readBoolean()) { + taskBinary = None + } else { + taskBinary = Some(kryo.readClassAndObject(input) + .asInstanceOf[Broadcast[Array[Byte]]]) + } _metrics = new TaskMetrics _metrics.read(kryo, input) } @@ -302,3 +326,71 @@ private[spark] object Task { (taskFiles, taskJars, taskProps, subBuffer) } } + +private[spark] final class TaskData private(var compressedBytes: Array[Byte], + var uncompressedLen: Int, var reference: Int) extends Serializable { + + def this(compressedBytes: Array[Byte], uncompressedLen: Int) = + this(compressedBytes, uncompressedLen, TaskData.NO_REF) + + @transient private var decompressed: Array[Byte] = _ + + /** decompress the common task data if present */ + def decompress(env: SparkEnv = SparkEnv.get): Array[Byte] = { + if (uncompressedLen > 0) { + if (decompressed eq null) { + decompressed = env.createCompressionCodec.decompress(compressedBytes, + 0, compressedBytes.length, uncompressedLen) + } + decompressed + } else TaskData.EMPTY_BYTES + } + + override def hashCode(): Int = java.util.Arrays.hashCode(compressedBytes) + + override def equals(obj: Any): Boolean = obj match { + case d: TaskData => + uncompressedLen == d.uncompressedLen && + reference == d.reference && + java.util.Arrays.equals(compressedBytes, d.compressedBytes) + case _ => false + } +} + +private[spark] object TaskData { + + private val NO_REF: Int = -1 + private val EMPTY_BYTES: Array[Byte] = Array.empty[Byte] + private val FIRST: TaskData = new TaskData(EMPTY_BYTES, 0, 0) + val EMPTY: TaskData = new TaskData(EMPTY_BYTES, 0, -2) + + def apply(reference: Int): TaskData = { + if (reference == 0) FIRST + else if (reference > 0) new TaskData(EMPTY_BYTES, 0, reference) + else EMPTY + } + + def write(data: TaskData, output: Output): Unit = Utils.tryOrIOException { + if (data.reference != NO_REF) { + output.writeVarInt(data.reference, false) + } else { + val bytes = data.compressedBytes + assert(bytes != null) + output.writeVarInt(NO_REF, false) + output.writeVarInt(data.uncompressedLen, true) + output.writeVarInt(bytes.length, true) + output.writeBytes(bytes) + } + } + + def read(input: Input): TaskData = Utils.tryOrIOException { + val reference = input.readVarInt(false) + if (reference != NO_REF) { + TaskData(reference) + } else { + val uncompressedLen = input.readVarInt(true) + val bytesLen = input.readVarInt(true) + new TaskData(input.readBytes(bytesLen), uncompressedLen) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index b974ce30d4ccd..74afc7efb418f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -34,7 +34,8 @@ private[spark] class TaskDescription( private var _executorId: String, private var _name: String, private var _index: Int, // Index within this task's TaskSet - @transient private var _serializedTask: ByteBuffer) + @transient private var _serializedTask: ByteBuffer, + private[spark] var taskData: TaskData = TaskData.EMPTY) extends Serializable with KryoSerializable { def taskId: Long = _taskId @@ -58,6 +59,7 @@ private[spark] class TaskDescription( output.writeInt(_index) output.writeInt(_serializedTask.remaining()) Utils.writeByteBuffer(_serializedTask, output) + TaskData.write(taskData, output) } override def read(kryo: Kryo, input: Input): Unit = { @@ -68,6 +70,7 @@ private[spark] class TaskDescription( _index = input.readInt() val len = input.readInt() _serializedTask = ByteBuffer.wrap(input.readBytes(len)) + taskData = TaskData.read(input) } override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 80f2bf41224b5..8cbdf8d145438 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -18,13 +18,14 @@ package org.apache.spark.scheduler import java.io._ -import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer -import org.apache.spark.SparkEnv +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark.storage.BlockId -import org.apache.spark.util.{AccumulatorV2, Utils} +import org.apache.spark.util.{AccumulatorV2, DoubleAccumulator, Utils} // Task result. Also contains updates to accumulator variables. private[spark] sealed trait TaskResult[T] @@ -35,27 +36,32 @@ private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int) /** A TaskResult that contains the task's return value and accumulator updates. */ private[spark] class DirectTaskResult[T]( - var valueBytes: ByteBuffer, - var accumUpdates: Seq[AccumulatorV2[_, _]]) - extends TaskResult[T] with Externalizable { - - private var valueObjectDeserialized = false - private var valueObject: T = _ + private var _value: Any, + var accumUpdates: Seq[AccumulatorV2[_, _]], + private val serializationTimeMetric: Option[DoubleAccumulator] = None) + extends TaskResult[T] with Externalizable with KryoSerializable { - def this() = this(null.asInstanceOf[ByteBuffer], null) + def this() = this(null, null) override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { - out.writeInt(valueBytes.remaining) - Utils.writeByteBuffer(valueBytes, out) - out.writeInt(accumUpdates.size) - accumUpdates.foreach(out.writeObject) + serializationTimeMetric match { + case Some(timeMetric) => + val start = System.nanoTime() + out.writeObject(_value) + out.writeInt(accumUpdates.size + 1) + accumUpdates.foreach(out.writeObject) + val end = System.nanoTime() + timeMetric.setValue(math.max(end - start, 0L) / 1000000.0) + out.writeObject(timeMetric) + case None => + out.writeObject(_value) + out.writeInt(accumUpdates.size) + accumUpdates.foreach(out.writeObject) + } } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - val blen = in.readInt() - val byteVal = new Array[Byte](blen) - in.readFully(byteVal) - valueBytes = ByteBuffer.wrap(byteVal) + _value = in.readObject() val numUpdates = in.readInt if (numUpdates == 0) { @@ -67,26 +73,50 @@ private[spark] class DirectTaskResult[T]( } accumUpdates = _accumUpdates } - valueObjectDeserialized = false } - /** - * When `value()` is called at the first time, it needs to deserialize `valueObject` from - * `valueBytes`. It may cost dozens of seconds for a large instance. So when calling `value` at - * the first time, the caller should avoid to block other threads. - * - * After the first time, `value()` is trivial and just returns the deserialized `valueObject`. - */ - def value(): T = { - if (valueObjectDeserialized) { - valueObject + override def write(kryo: Kryo, output: Output): Unit = Utils.tryOrIOException { + serializationTimeMetric match { + case Some(timeMetric) => + val start = System.nanoTime() + kryo.writeClassAndObject(output, _value) + output.writeVarInt(accumUpdates.size, true) + output.writeBoolean(true) // indicates additional timeMetric + accumUpdates.foreach(kryo.writeClassAndObject(output, _)) + val end = System.nanoTime() + timeMetric.setValue(math.max(end - start, 0L) / 1000000.0) + timeMetric.write(kryo, output) + case None => + kryo.writeClassAndObject(output, _value) + output.writeVarInt(accumUpdates.size, true) + output.writeBoolean(false) // indicates no timeMetric + accumUpdates.foreach(kryo.writeClassAndObject(output, _)) + } + } + + override def read(kryo: Kryo, input: Input): Unit = Utils.tryOrIOException { + _value = kryo.readClassAndObject(input) + + var numUpdates = input.readVarInt(true) + val hasTimeMetric = input.readBoolean() + if (numUpdates == 0 && !hasTimeMetric) { + accumUpdates = null } else { - // This should not run when holding a lock because it may cost dozens of seconds for a large - // value. - val resultSer = SparkEnv.get.serializer.newInstance() - valueObject = resultSer.deserialize(valueBytes) - valueObjectDeserialized = true - valueObject + val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]]( + if (hasTimeMetric) numUpdates + 1 else numUpdates) + while (numUpdates > 0) { + _accumUpdates += kryo.readClassAndObject(input) + .asInstanceOf[AccumulatorV2[_, _]] + numUpdates -= 1 + } + if (hasTimeMetric) { + val timeMetric = new DoubleAccumulator + timeMetric.read(kryo, input) + _accumUpdates += timeMetric + } + accumUpdates = _accumUpdates } } + + def value(): T = _value.asInstanceOf[T] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 7afb026764da5..6983a3ece0e95 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -524,7 +524,7 @@ private[spark] class TaskSetManager( sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(_taskId = taskId, _attemptNumber = attemptNum, execId, - taskName, index, serializedTask)) + taskName, index, serializedTask, task.taskData)) case _ => } } @@ -954,5 +954,5 @@ private[spark] class TaskSetManager( private[spark] object TaskSetManager { // The user will be warned if any stages contain a task that has a serialized size greater than // this. - val TASK_SIZE_TO_WARN_KB = 100 + val TASK_SIZE_TO_WARN_KB = 128 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 38ddc94ad3639..c44eaee5a7050 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -19,12 +19,14 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer +import scala.collection.mutable + import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark.TaskState.TaskState import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.ExecutorLossReason +import org.apache.spark.scheduler.{ExecutorLossReason, TaskData, TaskDescription} import org.apache.spark.util.{SerializableBuffer, Utils} private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable @@ -36,7 +38,61 @@ private[spark] object CoarseGrainedClusterMessages { case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage // Driver to executors - case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage + case class LaunchTask(private var task: TaskDescription) + extends CoarseGrainedClusterMessage with KryoSerializable { + + override def write(kryo: Kryo, output: Output): Unit = { + task.write(kryo, output) + } + + override def read(kryo: Kryo, input: Input): Unit = { + task = new TaskDescription(0L, 0, null, null, 0, null) + task.read(kryo, input) + } + } + + case class LaunchTasks(private var tasks: mutable.ArrayBuffer[TaskDescription], + private var taskDataList: mutable.ArrayBuffer[TaskData]) + extends CoarseGrainedClusterMessage with KryoSerializable { + + override def write(kryo: Kryo, output: Output): Unit = Utils.tryOrIOException { + val tasks = this.tasks + val numTasks = tasks.length + output.writeVarInt(numTasks, true) + var i = 0 + while (i < numTasks) { + tasks(i).write(kryo, output) + i += 1 + } + val taskDataList = this.taskDataList + val numData = taskDataList.length + output.writeVarInt(numData, true) + i = 0 + while (i < numData) { + TaskData.write(taskDataList(i), output) + i += 1 + } + } + + override def read(kryo: Kryo, input: Input): Unit = Utils.tryOrIOException { + var numTasks = input.readVarInt(true) + val tasks = new mutable.ArrayBuffer[TaskDescription](numTasks) + while (numTasks > 0) { + val task = new TaskDescription(0, 0, null, null, 0, null) + task.read(kryo, input) + tasks += task + numTasks -= 1 + } + var numData = input.readVarInt(true) + val taskDataList = new mutable.ArrayBuffer[TaskData](numData) + while (numData > 0) { + taskDataList += TaskData.read(input) + numData -= 1 + } + this.tasks = tasks + this.taskDataList = taskDataList + } + } case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index cccc107c250fc..440333cb42f59 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -41,15 +41,15 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future -import scala.concurrent.duration.Duration -import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} +import org.apache.spark.{ExecutorAllocationClient, SparkException, TaskState} import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME -import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} +import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} +import org.apache.spark.util.collection.OpenHashMap /** * A scheduler backend that waits for coarse-grained executors to connect. @@ -116,11 +116,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] - // If this DriverEndpoint is changed to support multiple threads, - // then this may need to be changed so that we don't share the serializer - // instance across threads - private val ser = SparkEnv.get.closureSerializer.newInstance() - protected val addressToExecutorId = new HashMap[RpcAddress, String] private val reviveThread = @@ -268,33 +263,67 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp !executorsPendingLossReason.contains(executorId) } - // Launch tasks returned by a set of resource offers - private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { - for (task <- tasks.flatten) { - val serializedTask = ser.serialize(task) - if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => - try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + + protected def checkTaskSizeLimit(task: TaskDescription, taskSize: Int): Boolean = { + if (taskSize > maxRpcMessageSize) { + scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => + try { + var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + "spark.rpc.message.maxSize (%d bytes). Consider increasing " + "spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) - } catch { - case e: Exception => logError("Exception in error callback", e) - } + msg = msg.format(task.taskId, task.index, taskSize, maxRpcMessageSize) + taskSetMgr.abort(msg) + } catch { + case e: Exception => logError("Exception in error callback", e) } } - else { - val executorData = executorDataMap(task.executorId) - executorData.freeCores -= scheduler.CPUS_PER_TASK - - logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + - s"${executorData.executorHost}.") + false + } else true + } - executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) + // Launch tasks returned by a set of resource offers + protected def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = { + val executorTaskGroupMap = new OpenHashMap[String, ExecutorTaskGroup](8) + for (taskSet <- tasks) { + for (task <- taskSet) { + val taskLimit = task.serializedTask.limit + val taskSize = taskLimit + task.taskData.compressedBytes.length + if (checkTaskSizeLimit(task, taskSize)) { + // group tasks per executor as long as message limit is not breached + executorTaskGroupMap.changeValue(task.executorId, { + val executorData = executorDataMap(task.executorId) + val executorTaskGroup = new ExecutorTaskGroup(executorData, taskSize) + executorTaskGroup.taskGroup += task + executorTaskGroup.taskDataList += task.taskData + // add reference to first index in taskDataList + task.taskData = TaskData(0) + executorTaskGroup + }, { executorTaskGroup => + // group into existing if size fits in the max allowed + if (!executorTaskGroup.addTask(task, taskLimit, maxRpcMessageSize)) { + // send this task separately + val executorData = executorTaskGroup.executorData + executorData.freeCores -= scheduler.CPUS_PER_TASK + logInfo(s"Launching task ${task.taskId} on executor id: " + + s"${task.executorId} hostname: ${executorData.executorHost}.") + + executorData.executorEndpoint.send(LaunchTask(task)) + } + executorTaskGroup + }) + } } } + // send the accumulated task groups per executor + executorTaskGroupMap.foreach { case (executorId, executorTaskGroup) => + val taskGroup = executorTaskGroup.taskGroup + val executorData = executorTaskGroup.executorData + + executorData.freeCores -= (scheduler.CPUS_PER_TASK * taskGroup.length) + logDebug(s"Launching tasks ${taskGroup.map(_.taskId).mkString(",")} on " + + s"executor id: $executorId hostname: ${executorData.executorHost}.") + executorData.executorEndpoint.send(LaunchTasks(taskGroup, + executorTaskGroup.taskDataList)) + } } // Remove a disconnected slave from the cluster @@ -626,3 +655,52 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private[spark] object CoarseGrainedSchedulerBackend { val ENDPOINT_NAME = "CoarseGrainedScheduler" } + +private[spark] final class ExecutorTaskGroup( + private[cluster] var executorData: ExecutorData, + private var groupSize: Int = 0) { + + private[cluster] val taskGroup = new ArrayBuffer[TaskDescription](2) + // field to carry around common task data + private[cluster] val taskDataList = new ArrayBuffer[TaskData](2) + + def addTask(task: TaskDescription, taskLimit: Int, limit: Int): Boolean = { + val newGroupSize = groupSize + taskLimit + if (newGroupSize > limit) return false + + groupSize = newGroupSize + // linear search is best since there cannot be many different + // tasks in a single taskSet + if (task.taskData.uncompressedLen == 0 || + findOrAddTaskData(task, taskDataList, limit)) { + taskGroup += task + true + } else { + // task rejected from group + groupSize -= taskLimit + false + } + } + + private def findOrAddTaskData(task: TaskDescription, + taskDataList: ArrayBuffer[TaskData], limit: Int): Boolean = { + val data = task.taskData + val numData = taskDataList.length + var i = 0 + while (i < numData) { + if (taskDataList(i) eq data) { + // add reference to index `i` in taskDataList + task.taskData = TaskData(i) + return true + } + i += 1 + } + val newGroupSize = groupSize + data.compressedBytes.length + if (newGroupSize <= limit) { + groupSize = newGroupSize + taskDataList += data + task.taskData = TaskData(numData) + true + } else false + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index e08dc3b5957bb..467ac8111b2fc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -350,7 +350,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( .setExecutor(executorInfo) .setName(task.name) .addAllResources(cpuResources.asJava) - .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString) + .setData(MesosTaskLaunchData(task.serializedTask, task.taskData, + task.attemptNumber).toByteString) .build() (taskInfo, finalResources.asJava) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala index 8370b61145e45..bcc30a3a19c1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala @@ -22,17 +22,26 @@ import java.nio.ByteBuffer import org.apache.mesos.protobuf.ByteString import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.TaskData /** * Wrapper for serializing the data sent when launching Mesos tasks. */ private[spark] case class MesosTaskLaunchData( serializedTask: ByteBuffer, + taskData: TaskData, attemptNumber: Int) extends Logging { def toByteString: ByteString = { - val dataBuffer = ByteBuffer.allocate(4 + serializedTask.limit) + val compressedBytes = taskData.compressedBytes + val dataLen = compressedBytes.length + val dataBuffer = ByteBuffer.allocate(12 + serializedTask.limit + dataLen) dataBuffer.putInt(attemptNumber) + dataBuffer.putInt(dataLen) + if (dataLen > 0) { + dataBuffer.putInt(taskData.uncompressedLen) + dataBuffer.put(compressedBytes) + } dataBuffer.put(serializedTask) dataBuffer.rewind logDebug(s"ByteBuffer size: [${dataBuffer.remaining}]") @@ -45,7 +54,14 @@ private[spark] object MesosTaskLaunchData extends Logging { val byteBuffer = byteString.asReadOnlyByteBuffer() logDebug(s"ByteBuffer size: [${byteBuffer.remaining}]") val attemptNumber = byteBuffer.getInt // updates the position by 4 bytes + val dataLen = byteBuffer.getInt + val taskData = if (dataLen > 0) { + val uncompressedLen = byteBuffer.getInt + val compressedBytes = new Array[Byte](dataLen) + byteBuffer.get(compressedBytes) + new TaskData(compressedBytes, uncompressedLen) + } else TaskData.EMPTY val serializedTask = byteBuffer.slice() // subsequence starting at the current position - MesosTaskLaunchData(serializedTask, attemptNumber) + MesosTaskLaunchData(serializedTask, taskData, attemptNumber) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index e386052814039..7c8ebcf3c3089 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -85,7 +85,7 @@ private[spark] class LocalEndpoint( for (task <- scheduler.resourceOffers(offers).flatten) { freeCores -= scheduler.CPUS_PER_TASK executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, - task.name, task.serializedTask) + task.name, task.serializedTask, task.taskData.decompress()) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 75cbb00050134..13c30220f1be0 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -180,9 +180,7 @@ final class ShuffleBlockFetcherIterator( remainingBlocks -= blockId results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, remainingBlocks.isEmpty)) - if (isDebugEnabled) { - logDebug("remainingBlocks: " + remainingBlocks) - } + if (isDebugEnabled) logDebug("remainingBlocks: " + remainingBlocks) } } if (isTraceEnabled) logTrace("Got remote block " + blockId + " after " + diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala index 9eda79ace18d0..283a287b01887 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala @@ -62,7 +62,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val func = (c: TaskContext, i: Iterator[String]) => i.next() val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func)))) val task = new ResultTask[String, String]( - 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics) + 0, 0, TaskData.EMPTY, Some(taskBinary), rdd.partitions(0), Seq.empty, 0, + new Properties, new TaskMetrics) intercept[RuntimeException] { task.run(0, 0, null) } @@ -83,7 +84,8 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark val func = (c: TaskContext, i: Iterator[String]) => i.next() val taskBinary = sc.broadcast(JavaUtils.bufferToArray(closureSerializer.serialize((rdd, func)))) val task = new ResultTask[String, String]( - 0, 0, taskBinary, rdd.partitions(0), Seq.empty, 0, new Properties, new TaskMetrics) + 0, 0, TaskData.EMPTY, Some(taskBinary), rdd.partitions(0), Seq.empty, 0, + new Properties, new TaskMetrics) intercept[RuntimeException] { task.run(0, 0, null) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index b98f945bac253..5fc55c3bfb40f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -794,7 +794,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { - val valueSer = SparkEnv.get.serializer.newInstance() - new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates) + new DirectTaskResult[Int](id, accumUpdates) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala index 5a81bb335fdb7..51ffbd63df1c2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala @@ -20,17 +20,21 @@ package org.apache.spark.scheduler.cluster.mesos import java.nio.ByteBuffer import org.apache.spark.SparkFunSuite +import org.apache.spark.scheduler.TaskData class MesosTaskLaunchDataSuite extends SparkFunSuite { test("serialize and deserialize data must be same") { val serializedTask = ByteBuffer.allocate(40) + val taskBytes = Range(0, 100).map(_.toByte).toArray + val taskData = new TaskData(taskBytes, 200) (Range(100, 110).map(serializedTask.putInt(_))) serializedTask.rewind val attemptNumber = 100 - val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString + val byteString = MesosTaskLaunchData(serializedTask, taskData, attemptNumber).toByteString serializedTask.rewind val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString) assert(mesosTaskLaunchData.attemptNumber == attemptNumber) assert(mesosTaskLaunchData.serializedTask.equals(serializedTask)) + assert(mesosTaskLaunchData.taskData == taskData) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index fa40414bcea86..d0a44014e1b63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -240,7 +240,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ execute().mapPartitionsInternal { iter => var count = 0 val buffer = new Array[Byte](4 << 10) // 4K - val codec = CompressionCodec.createCodec(SparkEnv.get.conf) + val codec = SparkEnv.get.createCompressionCodec val bos = new ByteArrayOutputStream() val out = new DataOutputStream(codec.compressedOutputStream(bos)) while (iter.hasNext && (n < 0 || count < n)) { @@ -262,7 +262,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ private def decodeUnsafeRows(bytes: Array[Byte]): Iterator[InternalRow] = { val nFields = schema.length - val codec = CompressionCodec.createCodec(SparkEnv.get.conf) + val codec = SparkEnv.get.createCompressionCodec val bis = new ByteArrayInputStream(bytes) val ins = new DataInputStream(codec.compressedInputStream(bis))