From c874a10c6cf6f9fe07cee2fb76a87dd172c974e7 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Sat, 3 Dec 2016 17:36:38 +0530 Subject: [PATCH] [SNAP-1190] Reduce partition message overhead from driver to executor (#31) - DAGScheduler: - For small enough common task data (RDD + closure) send inline with the Task instead of a broadcast - Transiently store task binary data in Stage to re-use if possible - Compress the common task bytes to save on network cost - Task: New TaskData class to encapsulate task compressed bytes from above, the uncompressed length and reference index if TaskData is being read from a separate list (see next comments) - CoarseGrainedClusterMessage: Added new LaunchTasks message to encapsulate multiple Task messages to same executor - CoarseGrainedSchedulerBackend: - Create LaunchTasks by grouping messages in ExecutorTaskGroup per executor - Actual TaskData is sent as part of TaskDescription and not the Task to easily separate out the common portions in a separate list - Send the common TaskData as a separate ArrayBuffer of data with the index into this list set in the original task's TaskData - CoarseGrainedExecutorBackend: Handle LaunchTasks by splitting into individual jobs - CompressionCodec: added bytes compress/decompress methods for more efficient byte array compression - Executor: - Set the common decompressed task data back into the Task object. - Avoid additional serialization of TaskResult just to determine the serialization time. Instead now calculate the time inline during serialization write/writeExternal methods - TaskMetrics: more generic handling for DoubleAccumulator case - Task: Handling of TaskData during serialization to send a flag to indicate whether data is inlined or will be received via broadcast - ResultTask, ShuffleMapTask: delegate handling of TaskData to parent Task class - SparkEnv: encapsulate codec creation as a zero-arg function to avoid repeated conf lookups - SparkContext.clean: avoid checking serializability in case non-default closure serializer is being used - Test updates for above Conflicts: core/src/main/scala/org/apache/spark/SparkEnv.scala core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala core/src/main/scala/org/apache/spark/scheduler/Task.scala core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala core/src/main/scala/org/apache/spark/executor/Executor.scala core/src/main/scala/org/apache/spark/io/CompressionCodec.scala core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala core/src/main/scala/org/apache/spark/scheduler/Task.scala core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchData.scala resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala resource-managers/mesos/src/test/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherSuite.scala --- .../scala/org/apache/spark/SparkContext.scala | 6 +- .../scala/org/apache/spark/SparkEnv.scala | 6 + .../CoarseGrainedExecutorBackend.scala | 27 +++-- .../org/apache/spark/executor/Executor.scala | 18 ++- .../apache/spark/executor/TaskMetrics.scala | 8 +- .../apache/spark/io/CompressionCodec.scala | 74 ++++++++++-- .../apache/spark/scheduler/DAGScheduler.scala | 30 ++++- .../apache/spark/scheduler/ResultTask.scala | 23 ++-- .../spark/scheduler/ShuffleMapTask.scala | 26 +++-- .../org/apache/spark/scheduler/Stage.scala | 4 +- .../org/apache/spark/scheduler/Task.scala | 33 +++++- .../spark/scheduler/TaskDescription.scala | 5 +- .../apache/spark/scheduler/TaskResult.scala | 101 +++++++++++------ .../spark/scheduler/TaskSetManager.scala | 2 +- .../cluster/CoarseGrainedClusterMessage.scala | 60 +++++++++- .../CoarseGrainedSchedulerBackend.scala | 106 ++++++++++++++++-- .../local/LocalSchedulerBackend.scala | 3 +- .../spark/scheduler/TaskSetManagerSuite.scala | 3 +- .../spark/executor/MesosExecutorBackend.scala | 3 +- .../spark/sql/execution/SparkPlan.scala | 4 +- 20 files changed, 423 insertions(+), 119 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3a0c093587157..004bfe9756c4c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -72,6 +72,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend} import org.apache.spark.scheduler.local.LocalSchedulerBackend import org.apache.spark.status.AppStatusStore +import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} @@ -230,6 +231,7 @@ class SparkContext(config: SparkConf) extends Logging { private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ private var _statusStore: AppStatusStore = _ + private var _isDefaultClosureSerializer: Boolean = true /* ------------------------------------------------------------------------------------- * | Accessors and public fields. These provide access to the internal state of the | @@ -446,6 +448,8 @@ class SparkContext(config: SparkConf) extends Logging { _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) + _isDefaultClosureSerializer = _env.closureSerializer.isInstanceOf[JavaSerializer] + // If running the REPL, register the repl's output dir with the file server. _conf.getOption("spark.repl.class.outputDir").foreach { path => val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path)) @@ -2312,7 +2316,7 @@ class SparkContext(config: SparkConf) extends Logging { * @return the cleaned closure */ private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { - ClosureCleaner.clean(f, checkSerializable) + ClosureCleaner.clean(f, checkSerializable && _isDefaultClosureSerializer) f } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 8a9d8a9292b34..97c0f5bcc2c7f 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -49,6 +49,7 @@ import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.io.CompressionCodec import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.netty.NettyBlockTransferService @@ -97,6 +98,11 @@ class SparkEnv ( private[spark] var driverTmpDir: Option[String] = None + private val codecCreator = CompressionCodec.codecCreator(conf, + CompressionCodec.getCodecName(conf)) + + def createCompressionCodec: CompressionCodec = codecCreator() + private[spark] def stop() { if (!isStopped) { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index fab79c8904c07..f0fdbcefb80cc 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -34,7 +34,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( @@ -51,10 +50,6 @@ private[spark] class CoarseGrainedExecutorBackend( var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None - // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need - // to be changed so that we don't share the serializer instance across threads - private[this] val ser: SerializerInstance = env.closureSerializer.newInstance() - override def onStart() { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => @@ -94,14 +89,30 @@ private[spark] class CoarseGrainedExecutorBackend( logError("Slave registration failed: " + message) exitExecutor() - case LaunchTask(data) => + case LaunchTask(taskDesc) => if (executor == null) { logError("Received LaunchTask command but executor was null") exitExecutor() } else { - val taskDesc = TaskDescription.decode(data.value) + val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) - executor.launchTask(this, taskDesc) + executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, + taskDesc.name, taskDesc.serializedTask, taskDesc.taskData.decompress(env)) + } + + case LaunchTasks(tasks, taskDataList) => + if (executor ne null) { + logDebug("Got assigned tasks " + tasks.map(_.taskId).mkString(",")) + for (task <- tasks) { + logInfo("Got assigned task " + task.taskId) + val ref = task.taskData.reference + val taskData = if (ref >= 0) taskDataList(ref) else task.taskData + executor.launchTask(this, taskId = task.taskId, + attemptNumber = task.attemptNumber, task.name, task.serializedTask, + taskData.decompress(env)) + } + } else { + exitExecutor(1, "Received LaunchTasks command but executor was null") } case KillTask(taskId, _, interruptThread, reason) => diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f8cf7a0943584..e724f824d15c4 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -246,7 +246,12 @@ private[spark] class Executor( class TaskRunner( execBackend: ExecutorBackend, - private val taskDescription: TaskDescription) + private val taskDescription: TaskDescription, + val taskId: Long, + val attemptNumber: Int, + taskName: String, + serializedTask: ByteBuffer, + taskDataBytes: Array[Byte]) extends Runnable { val taskId = taskDescription.taskId @@ -404,11 +409,6 @@ private[spark] class Executor( // If the task has been killed, let's fail it. task.context.killTaskIfInterrupted() - val resultSer = env.serializer.newInstance() - val beforeSerialization = System.nanoTime() - val valueBytes = resultSer.serialize(value) - val afterSerialization = System.nanoTime() - // Deserialization happens in two parts: first, we deserialize a Task object, which // includes the Partition. Second, Task.run() deserializes the RDD and function to be run. task.metrics.setExecutorDeserializeTime(math.max( @@ -421,8 +421,6 @@ private[spark] class Executor( task.metrics.setExecutorCpuTime( (taskFinishCpu - taskStartCpu) - task.executorDeserializeCpuTime) task.metrics.setJvmGCTime(computeTotalGcTime() - startGCTime) - task.metrics.setResultSerializationTime(math.max( - afterSerialization - beforeSerialization, 0L) / 1000000.0) // Expose task metrics using the Dropwizard metrics system. // Update task metrics counters @@ -467,8 +465,8 @@ private[spark] class Executor( // Note: accumulator updates must be collected after TaskMetrics is updated val accumUpdates = task.collectAccumulatorUpdates() - // TODO: do not serialize value twice - val directResult = new DirectTaskResult(valueBytes, accumUpdates) + val directResult = new DirectTaskResult(value, accumUpdates, + Some(task.metrics.resultSerializationTimeMetric)) val serializedDirectResult = ser.serialize(directResult) val resultSize = serializedDirectResult.limit() diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 3facda3df95bc..445ca611118cf 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -139,6 +139,7 @@ class TaskMetrics private[spark] () extends Serializable with KryoSerializable { private[spark] def setJvmGCTime(v: Long): Unit = _jvmGCTime.setValue(v) private[spark] def setResultSerializationTime(v: Double): Unit = _resultSerializationTime.setValue(v) + private[spark] def resultSerializationTimeMetric = _resultSerializationTime private[spark] def incMemoryBytesSpilled(v: Long): Unit = _memoryBytesSpilled.add(v) private[spark] def incDiskBytesSpilled(v: Long): Unit = _diskBytesSpilled.add(v) private[spark] def incPeakExecutionMemory(v: Long): Unit = _peakExecutionMemory.add(v) @@ -338,7 +339,12 @@ private[spark] object TaskMetrics extends Logging { } else { tm.nameToAccums.get(name).foreach { case l: LongAccumulator => l.setValue(value.asInstanceOf[Long]) - case d => d.asInstanceOf[DoubleAccumulator].setValue(value.asInstanceOf[Double]) + case d: DoubleAccumulator => value match { + case v: Double => d.setValue(v) + case _ => d.setValue(value.asInstanceOf[Long]) + } + case o => throw new UnsupportedOperationException( + s"Unexpected accumulator $o for TaskMetrics") } } } diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 7722db56ee297..22c663686d6ef 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -21,8 +21,8 @@ import java.io._ import java.util.Locale import com.github.luben.zstd.{ZstdInputStream, ZstdOutputStream} -import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} -import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} +import com.ning.compress.lzf.{LZFDecoder, LZFEncoder, LZFInputStream, LZFOutputStream} +import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream, LZ4Factory} import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf @@ -43,6 +43,11 @@ trait CompressionCodec { def compressedOutputStream(s: OutputStream): OutputStream def compressedInputStream(s: InputStream): InputStream + + def compress(input: Array[Byte], inputLen: Int): Array[Byte] + + def decompress(input: Array[Byte], inputOffset: Int, inputLen: Int, + outputLen: Int): Array[Byte] } private[spark] object CompressionCodec { @@ -69,16 +74,32 @@ private[spark] object CompressionCodec { } def createCodec(conf: SparkConf, codecName: String): CompressionCodec = { - val codecClass = - shortCompressionCodecNames.getOrElse(codecName.toLowerCase(Locale.ROOT), codecName) - val codec = try { + codecCreator(conf, codecName)() + } + + def codecCreator(conf: SparkConf, codecName: String): () => CompressionCodec = { + if (codecName == DEFAULT_COMPRESSION_CODEC) { + return () => new LZ4CompressionCodec(conf) + } + val codecClass = shortCompressionCodecNames.getOrElse(codecName.toLowerCase, codecName) + try { val ctor = Utils.classForName(codecClass).getConstructor(classOf[SparkConf]) - Some(ctor.newInstance(conf).asInstanceOf[CompressionCodec]) + () => { + try { + ctor.newInstance(conf).asInstanceOf[CompressionCodec] + } catch { + case e: IllegalArgumentException => throw fail(codecName) + } + } } catch { - case _: ClassNotFoundException | _: IllegalArgumentException => None + case e: ClassNotFoundException => throw fail(codecName) + case e: NoSuchMethodException => throw fail(codecName) } - codec.getOrElse(throw new IllegalArgumentException(s"Codec [$codecName] is not available. " + - s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC")) + } + + private def fail(codecName: String): IllegalArgumentException = { + new IllegalArgumentException(s"Codec [$codecName] is not available. " + + s"Consider setting $configKey=$FALLBACK_COMPRESSION_CODEC") } /** @@ -117,9 +138,16 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { new LZ4BlockOutputStream(s, blockSize) } - override def compressedInputStream(s: InputStream): InputStream = { - val disableConcatenationOfByteStream = false - new LZ4BlockInputStream(s, disableConcatenationOfByteStream) + override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s) + + override def compress(input: Array[Byte], inputLen: Int): Array[Byte] = { + LZ4Factory.fastestInstance().fastCompressor().compress(input, 0, inputLen) + } + + override def decompress(input: Array[Byte], inputOffset: Int, inputLen: Int, + outputLen: Int): Array[Byte] = { + LZ4Factory.fastestInstance().fastDecompressor().decompress(input, + inputOffset, outputLen) } } @@ -140,6 +168,17 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { } override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s) + + override def compress(input: Array[Byte], inputLen: Int): Array[Byte] = { + LZFEncoder.encode(input, 0, inputLen) + } + + override def decompress(input: Array[Byte], inputOffset: Int, inputLen: Int, + outputLen: Int): Array[Byte] = { + val output = new Array[Byte](outputLen) + LZFDecoder.decode(input, inputOffset, inputLen, output) + output + } } @@ -162,6 +201,17 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { } override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s) + + override def compress(input: Array[Byte], inputLen: Int): Array[Byte] = { + Snappy.rawCompress(input, inputLen) + } + + override def decompress(input: Array[Byte], inputOffset: Int, + inputLen: Int, outputLen: Int): Array[Byte] = { + val output = new Array[Byte](outputLen) + Snappy.uncompress(input, inputOffset, inputLen, output, 0) + output + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8c46a84323392..396dbd46946a4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1020,6 +1020,7 @@ class DAGScheduler( try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). +<<<<<<< HEAD var taskBinaryBytes: Array[Byte] = null // taskBinaryBytes and partitions are both effected by the checkpoint status. We need // this synchronization in case another concurrent job is checkpointing this RDD, so we get a @@ -1035,8 +1036,23 @@ class DAGScheduler( partitions = stage.rdd.partitions } - - taskBinary = sc.broadcast(taskBinaryBytes) + if (bytes == null) stage.taskBinaryBytes = taskBinaryBytes + + // use direct byte shipping for small size or if number of partitions is small + val taskBytesLen = taskBinaryBytes.length + if (taskBytesLen <= DAGScheduler.TASK_INLINE_LIMIT || + partitionsToCompute.length <= DAGScheduler.TASK_INLINE_PARTITION_LIMIT) { + if (stage.taskData.uncompressedLen > 0) { + taskData = stage.taskData + } else { + // compress inline task data (broadcast compresses as per conf) + taskData = new TaskData(env.createCompressionCodec.compress( + taskBinaryBytes, taskBytesLen), taskBytesLen) + stage.taskData = taskData + } + } else { + taskBinary = Some(sc.broadcast(taskBinaryBytes)) + } } catch { // In the case of a failure during serialization, abort the stage. case e: NotSerializableException => @@ -1545,7 +1561,7 @@ class DAGScheduler( * Marks a stage as finished and removes it from the list of running stages. */ private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = { - val serviceTime = stage.latestInfo.submissionTime match { + val serviceTime = if (!log.isInfoEnabled) 0L else stage.latestInfo.submissionTime match { case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0) case _ => "Unknown" } @@ -1847,4 +1863,12 @@ private[spark] object DAGScheduler { // Number of consecutive stage attempts allowed before a stage is aborted val DEFAULT_MAX_CONSECUTIVE_STAGE_ATTEMPTS = 4 + + // The maximum size of uncompressed common task bytes (rdd, closure) + // that will be shipped with the task else will be broadcast separately. + val TASK_INLINE_LIMIT = 100 * 1024 + + // The maximum number of partitions below which common task bytes will be + // shipped with the task else will be broadcast separately. + val TASK_INLINE_PARTITION_LIMIT = 8 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala index 86f188dbb6022..f94aad960e41a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala @@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory import java.nio.ByteBuffer import java.util.Properties -import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark._ @@ -36,7 +36,9 @@ import org.apache.spark.rdd.RDD * * @param stageId id of the stage this task belongs to * @param stageAttemptId attempt id of the stage this task belongs to - * @param taskBinary broadcasted version of the serialized RDD and the function to apply on each + * @param _taskData if serialized RDD and function are small, then it is compressed + * and sent with its original decompressed size + * @param _taskBinary broadcasted version of the serialized RDD and the function to apply on each * partition of the given RDD. Once deserialized, the type should be * (RDD[T], (TaskContext, Iterator[T]) => U). * @param partition partition of the RDD this task is associated with @@ -55,7 +57,8 @@ import org.apache.spark.rdd.RDD private[spark] class ResultTask[T, U]( stageId: Int, stageAttemptId: Int, - private var taskBinary: Broadcast[Array[Byte]], + _taskData: TaskData, + _taskBinary: Option[Broadcast[Array[Byte]]], private var partition: Partition, locs: Seq[TaskLocation], private var _outputId: Int, @@ -64,9 +67,9 @@ private[spark] class ResultTask[T, U]( jobId: Option[Int] = None, appId: Option[String] = None, appAttemptId: Option[String] = None) - extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics, - jobId, appId, appAttemptId) - with Serializable { + extends Task[U](stageId, stageAttemptId, partition.index, _taskData, + _taskBinary, metrics, localProperties, jobId, appId, appAttemptId) + with Serializable with KryoSerializable { final def outputId: Int = _outputId @@ -83,7 +86,7 @@ private[spark] class ResultTask[T, U]( } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( - ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) + ByteBuffer.wrap(getTaskBytes), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = math.max(System.nanoTime() - deserializeStartTime, 0L) _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime @@ -98,15 +101,13 @@ private[spark] class ResultTask[T, U]( override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")" override def write(kryo: Kryo, output: Output): Unit = { - super.write(kryo, output) - kryo.writeClassAndObject(output, taskBinary) + super.writeKryo(kryo, output) kryo.writeClassAndObject(output, partition) output.writeInt(_outputId) } override def read(kryo: Kryo, input: Input): Unit = { - super.read(kryo, input) - taskBinary = kryo.readClassAndObject(input).asInstanceOf[Broadcast[Array[Byte]]] + super.readKryo(kryo, input) partition = kryo.readClassAndObject(input).asInstanceOf[Partition] _outputId = input.readInt() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index c5e757024ed45..46e03c033f36c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -23,7 +23,7 @@ import java.util.Properties import scala.language.existentials -import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark._ @@ -40,7 +40,9 @@ import org.apache.spark.shuffle.ShuffleWriter * * @param stageId id of the stage this task belongs to * @param stageAttemptId attempt id of the stage this task belongs to - * @param taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized, + * @param _taskData if serialized RDD and function are small, then it is compressed + * and sent with its original decompressed size + * @param _taskBinary broadcast version of the RDD and the ShuffleDependency. Once deserialized, * the type should be (RDD[_], ShuffleDependency[_, _, _]). * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling @@ -56,7 +58,8 @@ import org.apache.spark.shuffle.ShuffleWriter private[spark] class ShuffleMapTask( stageId: Int, stageAttemptId: Int, - private var taskBinary: Broadcast[Array[Byte]], + _taskData: TaskData, + _taskBinary: Option[Broadcast[Array[Byte]]], private var partition: Partition, @transient private var locs: Seq[TaskLocation], localProperties: Properties, @@ -64,13 +67,14 @@ private[spark] class ShuffleMapTask( jobId: Option[Int] = None, appId: Option[String] = None, appAttemptId: Option[String] = None) - extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties, - serializedTaskMetrics, jobId, appId, appAttemptId) - with Logging { + extends Task[MapStatus](stageId, stageAttemptId, partition.index, _taskData, + _taskBinary, metrics, localProperties, jobId, appId, appAttemptId) +with KryoSerializable with Logging { /** A constructor used only in test suites. This does not require passing in an RDD. */ def this(partitionId: Int) { - this(0, 0, null, new Partition { override def index: Int = 0 }, null, new Properties, null) + this(0, 0, TaskData.EMPTY, null, new Partition { override def index: Int = 0 }, + null, null, new Properties) } @transient private val preferredLocs: Seq[TaskLocation] = { @@ -86,7 +90,7 @@ private[spark] class ShuffleMapTask( } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( - ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) + ByteBuffer.wrap(getTaskBytes), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = math.max(System.nanoTime() - deserializeStartTime, 0L) _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime @@ -117,14 +121,12 @@ private[spark] class ShuffleMapTask( override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId) override def write(kryo: Kryo, output: Output): Unit = { - super.write(kryo, output) - kryo.writeClassAndObject(output, taskBinary) + super.writeKryo(kryo, output) kryo.writeClassAndObject(output, partition) } override def read(kryo: Kryo, input: Input): Unit = { - super.read(kryo, input) - taskBinary = kryo.readClassAndObject(input).asInstanceOf[Broadcast[Array[Byte]]] + super.readKryo(kryo, input) partition = kryo.readClassAndObject(input).asInstanceOf[Partition] } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index 290fd073caf27..65af6bdd50334 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -59,7 +59,9 @@ private[scheduler] abstract class Stage( val numTasks: Int, val parents: List[Stage], val firstJobId: Int, - val callSite: CallSite) + val callSite: CallSite, + @transient private[scheduler] var taskBinaryBytes: Array[Byte] = null, + @transient private[scheduler] var taskData: TaskData = TaskData.EMPTY) extends Logging { val numPartitions = rdd.partitions.length diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index 867bca3b84de2..67153e58096cf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -23,10 +23,11 @@ import java.util.Properties import scala.collection.mutable import scala.collection.mutable.HashMap -import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.{KryoSerializable, Kryo} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark._ +import org.apache.spark.broadcast.Broadcast import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config.APP_CALLER_CONTEXT import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} @@ -61,7 +62,9 @@ private[spark] abstract class Task[T]( private var _stageId: Int, private var _stageAttemptId: Int, private var _partitionId: Int, + @transient private[spark] var taskData: TaskData = TaskData.EMPTY, // The default value is only used in tests. + protected var taskBinary: Option[Broadcast[Array[Byte]]] = None, private var _metrics: TaskMetrics = TaskMetrics.registered, @transient var localProperties: Properties = new Properties, // The default value is only used in tests. @@ -78,11 +81,16 @@ private[spark] abstract class Task[T]( final def partitionId: Int = _partitionId - final def metrics: TaskMetrics = _metrics - @transient lazy val metrics: TaskMetrics = SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics)) + @transient private[spark] var taskDataBytes: Array[Byte] = _ + + protected final def getTaskBytes: Array[Byte] = { + val bytes = taskDataBytes + if ((bytes ne null) && bytes.length > 0) bytes else taskBinary.get.value + } + /** * Called by [[org.apache.spark.executor.Executor]] to run this task. * @@ -235,21 +243,36 @@ private[spark] abstract class Task[T]( } } - override def write(kryo: Kryo, output: Output): Unit = { + protected def writeKryo(kryo: Kryo, output: Output): Unit = { output.writeInt(_stageId) output.writeVarInt(_stageAttemptId, true) output.writeVarInt(_partitionId, true) output.writeLong(epoch) output.writeLong(_executorDeserializeTime) + if ((taskData ne null) && taskData.uncompressedLen > 0) { + // actual bytes will be shipped in TaskDescription + output.writeBoolean(true) + } else { + output.writeBoolean(false) + kryo.writeClassAndObject(output, taskBinary.get) + } _metrics.write(kryo, output) } - override def read(kryo: Kryo, input: Input): Unit = { + def readKryo(kryo: Kryo, input: Input): Unit = { _stageId = input.readInt() _stageAttemptId = input.readVarInt(true) _partitionId = input.readVarInt(true) epoch = input.readLong() _executorDeserializeTime = input.readLong() + // actual bytes are shipped in TaskDescription + taskData = TaskData.EMPTY + if (input.readBoolean()) { + taskBinary = None + } else { + taskBinary = Some(kryo.readClassAndObject(input) + .asInstanceOf[Broadcast[Array[Byte]]]) + } _metrics = new TaskMetrics _metrics.read(kryo, input) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 583898e12145f..fb86409cc5a89 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -52,7 +52,8 @@ private[spark] class TaskDescription( private var _executorId: String, private var _name: String, private var _index: Int, // Index within this task's TaskSet - @transient private var _serializedTask: ByteBuffer) + @transient private var _serializedTask: ByteBuffer, + private[spark] var taskData: TaskData = TaskData.EMPTY) extends Serializable with KryoSerializable { def taskId: Long = _taskId @@ -76,6 +77,7 @@ private[spark] class TaskDescription( output.writeInt(_index) output.writeInt(_serializedTask.remaining()) Utils.writeByteBuffer(_serializedTask, output) + TaskData.write(taskData, output) } override def read(kryo: Kryo, input: Input): Unit = { @@ -86,6 +88,7 @@ private[spark] class TaskDescription( _index = input.readInt() val len = input.readInt() _serializedTask = ByteBuffer.wrap(input.readBytes(len)) + taskData = TaskData.read(input) } dataOut.writeLong(taskDescription.taskId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index 836769e1723d5..5b564366439a8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -18,14 +18,14 @@ package org.apache.spark.scheduler import java.io._ -import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer -import org.apache.spark.SparkEnv -import org.apache.spark.serializer.SerializerInstance +import com.esotericsoftware.kryo.{Kryo, KryoSerializable} +import com.esotericsoftware.kryo.io.{Input, Output} + import org.apache.spark.storage.BlockId -import org.apache.spark.util.{AccumulatorV2, Utils} +import org.apache.spark.util.{AccumulatorV2, DoubleAccumulator, Utils} // Task result. Also contains updates to accumulator variables. private[spark] sealed trait TaskResult[T] @@ -36,27 +36,32 @@ private[spark] case class IndirectTaskResult[T](blockId: BlockId, size: Int) /** A TaskResult that contains the task's return value and accumulator updates. */ private[spark] class DirectTaskResult[T]( - var valueBytes: ByteBuffer, - var accumUpdates: Seq[AccumulatorV2[_, _]]) - extends TaskResult[T] with Externalizable { - - private var valueObjectDeserialized = false - private var valueObject: T = _ + private var _value: Any, + var accumUpdates: Seq[AccumulatorV2[_, _]], + private val serializationTimeMetric: Option[DoubleAccumulator] = None) + extends TaskResult[T] with Externalizable with KryoSerializable { - def this() = this(null.asInstanceOf[ByteBuffer], null) + def this() = this(null, null) override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { - out.writeInt(valueBytes.remaining) - Utils.writeByteBuffer(valueBytes, out) - out.writeInt(accumUpdates.size) - accumUpdates.foreach(out.writeObject) + serializationTimeMetric match { + case Some(timeMetric) => + val start = System.nanoTime() + out.writeObject(_value) + out.writeInt(accumUpdates.size + 1) + accumUpdates.foreach(out.writeObject) + val end = System.nanoTime() + timeMetric.setValue(math.max(end - start, 0L) / 1000000.0) + out.writeObject(timeMetric) + case None => + out.writeObject(_value) + out.writeInt(accumUpdates.size) + accumUpdates.foreach(out.writeObject) + } } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - val blen = in.readInt() - val byteVal = new Array[Byte](blen) - in.readFully(byteVal) - valueBytes = ByteBuffer.wrap(byteVal) + _value = in.readObject() val numUpdates = in.readInt if (numUpdates == 0) { @@ -68,26 +73,50 @@ private[spark] class DirectTaskResult[T]( } accumUpdates = _accumUpdates } - valueObjectDeserialized = false } - /** - * When `value()` is called at the first time, it needs to deserialize `valueObject` from - * `valueBytes`. It may cost dozens of seconds for a large instance. So when calling `value` at - * the first time, the caller should avoid to block other threads. - * - * After the first time, `value()` is trivial and just returns the deserialized `valueObject`. - */ - def value(resultSer: SerializerInstance = null): T = { - if (valueObjectDeserialized) { - valueObject + override def write(kryo: Kryo, output: Output): Unit = Utils.tryOrIOException { + serializationTimeMetric match { + case Some(timeMetric) => + val start = System.nanoTime() + kryo.writeClassAndObject(output, _value) + output.writeVarInt(accumUpdates.size, true) + output.writeBoolean(true) // indicates additional timeMetric + accumUpdates.foreach(kryo.writeClassAndObject(output, _)) + val end = System.nanoTime() + timeMetric.setValue(math.max(end - start, 0L) / 1000000.0) + timeMetric.write(kryo, output) + case None => + kryo.writeClassAndObject(output, _value) + output.writeVarInt(accumUpdates.size, true) + output.writeBoolean(false) // indicates no timeMetric + accumUpdates.foreach(kryo.writeClassAndObject(output, _)) + } + } + + override def read(kryo: Kryo, input: Input): Unit = Utils.tryOrIOException { + _value = kryo.readClassAndObject(input) + + var numUpdates = input.readVarInt(true) + val hasTimeMetric = input.readBoolean() + if (numUpdates == 0 && !hasTimeMetric) { + accumUpdates = null } else { - // This should not run when holding a lock because it may cost dozens of seconds for a large - // value - val ser = if (resultSer == null) SparkEnv.get.serializer.newInstance() else resultSer - valueObject = ser.deserialize(valueBytes) - valueObjectDeserialized = true - valueObject + val _accumUpdates = new ArrayBuffer[AccumulatorV2[_, _]]( + if (hasTimeMetric) numUpdates + 1 else numUpdates) + while (numUpdates > 0) { + _accumUpdates += kryo.readClassAndObject(input) + .asInstanceOf[AccumulatorV2[_, _]] + numUpdates -= 1 + } + if (hasTimeMetric) { + val timeMetric = new DoubleAccumulator + timeMetric.read(kryo, input) + _accumUpdates += timeMetric + } + accumUpdates = _accumUpdates } } + + def value(): T = _value.asInstanceOf[T] } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 3519669e96fe6..25029ec9f48b5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1085,5 +1085,5 @@ private[spark] class TaskSetManager( private[spark] object TaskSetManager { // The user will be warned if any stages contain a task that has a serialized size greater than // this. - val TASK_SIZE_TO_WARN_KB = 100 + val TASK_SIZE_TO_WARN_KB = 128 } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index fc5115c57fb03..a22593c66ab52 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -19,12 +19,14 @@ package org.apache.spark.scheduler.cluster import java.nio.ByteBuffer +import scala.collection.mutable + import com.esotericsoftware.kryo.{Kryo, KryoSerializable} import com.esotericsoftware.kryo.io.{Input, Output} import org.apache.spark.TaskState.TaskState import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.ExecutorLossReason +import org.apache.spark.scheduler.{ExecutorLossReason, TaskData, TaskDescription} import org.apache.spark.util.{SerializableBuffer, Utils} private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable @@ -42,7 +44,61 @@ private[spark] object CoarseGrainedClusterMessages { case object RetrieveLastAllocatedExecutorId extends CoarseGrainedClusterMessage // Driver to executors - case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage + case class LaunchTask(private var task: TaskDescription) + extends CoarseGrainedClusterMessage with KryoSerializable { + + override def write(kryo: Kryo, output: Output): Unit = { + task.write(kryo, output) + } + + override def read(kryo: Kryo, input: Input): Unit = { + task = new TaskDescription(0L, 0, null, null, 0, null) + task.read(kryo, input) + } + } + + case class LaunchTasks(private var tasks: mutable.ArrayBuffer[TaskDescription], + private var taskDataList: mutable.ArrayBuffer[TaskData]) + extends CoarseGrainedClusterMessage with KryoSerializable { + + override def write(kryo: Kryo, output: Output): Unit = Utils.tryOrIOException { + val tasks = this.tasks + val numTasks = tasks.length + output.writeVarInt(numTasks, true) + var i = 0 + while (i < numTasks) { + tasks(i).write(kryo, output) + i += 1 + } + val taskDataList = this.taskDataList + val numData = taskDataList.length + output.writeVarInt(numData, true) + i = 0 + while (i < numData) { + TaskData.write(taskDataList(i), output) + i += 1 + } + } + + override def read(kryo: Kryo, input: Input): Unit = Utils.tryOrIOException { + var numTasks = input.readVarInt(true) + val tasks = new mutable.ArrayBuffer[TaskDescription](numTasks) + while (numTasks > 0) { + val task = new TaskDescription(0, 0, null, null, 0, null) + task.read(kryo, input) + tasks += task + numTasks -= 1 + } + var numData = input.readVarInt(true) + val taskDataList = new mutable.ArrayBuffer[TaskData](numData) + while (numData > 0) { + taskDataList += TaskData.read(input) + numData -= 1 + } + this.tasks = tasks + this.taskDataList = taskDataList + } + } case class KillTask(taskId: Long, executor: String, interruptThread: Boolean, reason: String) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7ffae9b68cb0e..76877fb691334 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -42,13 +42,14 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future -import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} +import org.apache.spark.{SparkEnv, ExecutorAllocationClient, SparkException, TaskState} import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME -import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils, Utils} +import org.apache.spark.util.{RpcUtils, ThreadUtils, Utils} +import org.apache.spark.util.collection.OpenHashMap /** * A scheduler backend that waits for coarse-grained executors to connect. @@ -324,16 +325,54 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } } - else { - val executorData = executorDataMap(task.executorId) - executorData.freeCores -= scheduler.CPUS_PER_TASK - - logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + - s"${executorData.executorHost}.") + false + } else true + } - executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) + // Launch tasks returned by a set of resource offers + protected def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = { + val executorTaskGroupMap = new OpenHashMap[String, ExecutorTaskGroup](8) + for (taskSet <- tasks) { + for (task <- taskSet) { + val taskLimit = task.serializedTask.limit + val taskSize = taskLimit + task.taskData.compressedBytes.length + if (checkTaskSizeLimit(task, taskSize)) { + // group tasks per executor as long as message limit is not breached + executorTaskGroupMap.changeValue(task.executorId, { + val executorData = executorDataMap(task.executorId) + val executorTaskGroup = new ExecutorTaskGroup(executorData, taskSize) + executorTaskGroup.taskGroup += task + executorTaskGroup.taskDataList += task.taskData + // add reference to first index in taskDataList + task.taskData = TaskData(0) + executorTaskGroup + }, { executorTaskGroup => + // group into existing if size fits in the max allowed + if (!executorTaskGroup.addTask(task, taskLimit, maxRpcMessageSize)) { + // send this task separately + val executorData = executorTaskGroup.executorData + executorData.freeCores -= scheduler.CPUS_PER_TASK + logInfo(s"Launching task ${task.taskId} on executor id: " + + s"${task.executorId} hostname: ${executorData.executorHost}.") + + executorData.executorEndpoint.send(LaunchTask(task)) + } + executorTaskGroup + }) + } } } + // send the accumulated task groups per executor + executorTaskGroupMap.foreach { case (executorId, executorTaskGroup) => + val taskGroup = executorTaskGroup.taskGroup + val executorData = executorTaskGroup.executorData + + executorData.freeCores -= (scheduler.CPUS_PER_TASK * taskGroup.length) + logDebug(s"Launching tasks ${taskGroup.map(_.taskId).mkString(",")} on " + + s"executor id: $executorId hostname: ${executorData.executorHost}.") + executorData.executorEndpoint.send(LaunchTasks(taskGroup, + executorTaskGroup.taskDataList)) + } } // Remove a disconnected slave from the cluster @@ -705,3 +744,52 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private[spark] object CoarseGrainedSchedulerBackend { val ENDPOINT_NAME = "CoarseGrainedScheduler" } + +private[spark] final class ExecutorTaskGroup( + private[cluster] var executorData: ExecutorData, + private var groupSize: Int = 0) { + + private[cluster] val taskGroup = new ArrayBuffer[TaskDescription](2) + // field to carry around common task data + private[cluster] val taskDataList = new ArrayBuffer[TaskData](2) + + def addTask(task: TaskDescription, taskLimit: Int, limit: Int): Boolean = { + val newGroupSize = groupSize + taskLimit + if (newGroupSize > limit) return false + + groupSize = newGroupSize + // linear search is best since there cannot be many different + // tasks in a single taskSet + if (task.taskData.uncompressedLen == 0 || + findOrAddTaskData(task, taskDataList, limit)) { + taskGroup += task + true + } else { + // task rejected from group + groupSize -= taskLimit + false + } + } + + private def findOrAddTaskData(task: TaskDescription, + taskDataList: ArrayBuffer[TaskData], limit: Int): Boolean = { + val data = task.taskData + val numData = taskDataList.length + var i = 0 + while (i < numData) { + if (taskDataList(i) eq data) { + // add reference to index `i` in taskDataList + task.taskData = TaskData(i) + return true + } + i += 1 + } + val newGroupSize = groupSize + data.compressedBytes.length + if (newGroupSize <= limit) { + groupSize = newGroupSize + taskDataList += data + task.taskData = TaskData(numData) + true + } else false + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala index 4c614c5c0f602..83198c0d7c10d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala @@ -84,7 +84,8 @@ private[spark] class LocalEndpoint( val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores)) for (task <- scheduler.resourceOffers(offers).flatten) { freeCores -= scheduler.CPUS_PER_TASK - executor.launchTask(executorBackend, task) + executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber, + task.name, task.serializedTask, task.taskData.decompress()) } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 2ce81ae27daf6..92bc89722b872 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -1365,7 +1365,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg private def createTaskResult( id: Int, accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = { - val valueSer = SparkEnv.get.serializer.newInstance() - new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates) + new DirectTaskResult[Int](id, accumUpdates) } } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index 61bfa27a84fd8..6c61540fe92b6 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -89,7 +89,8 @@ private[spark] class MesosExecutorBackend logError("Received launchTask but executor was null") } else { SparkHadoopUtil.get.runAsSparkUser { () => - executor.launchTask(this, taskDescription) + executor.launchTask(this, taskId = taskId, attemptNumber = taskData.attemptNumber, + taskInfo.getName, taskData.serializedTask, taskData.taskData.decompress()) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 398758a3331b4..b237960deddc6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -247,7 +247,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ execute().mapPartitionsInternal { iter => var count = 0 val buffer = new Array[Byte](4 << 10) // 4K - val codec = CompressionCodec.createCodec(SparkEnv.get.conf) + val codec = SparkEnv.get.createCompressionCodec val bos = new ByteArrayOutputStream() val out = new DataOutputStream(codec.compressedOutputStream(bos)) while (iter.hasNext && (n < 0 || count < n)) { @@ -269,7 +269,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ private def decodeUnsafeRows(bytes: Array[Byte]): Iterator[InternalRow] = { val nFields = schema.length - val codec = CompressionCodec.createCodec(SparkEnv.get.conf) + val codec = SparkEnv.get.createCompressionCodec val bis = new ByteArrayInputStream(bytes) val ins = new DataInputStream(codec.compressedInputStream(bis))