Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-4924
Browse files Browse the repository at this point in the history
Conflicts:
	bin/spark-submit
	bin/spark-submit2.cmd
  • Loading branch information
Marcelo Vanzin committed Jan 10, 2015
2 parents f26556b + 8782eb9 commit fc6a3e2
Show file tree
Hide file tree
Showing 21 changed files with 343 additions and 82 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi

/**
* Exception thrown when a task cannot be serialized.
*/
private[spark] class TaskNotSerializableException(error: Throwable) extends Exception(error)
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.""".stripMargin
| working directory of each executor.
""".stripMargin
)
SparkSubmit.exitFn()
}
Expand Down
20 changes: 0 additions & 20 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -866,26 +866,6 @@ class DAGScheduler(
}

if (tasks.size > 0) {
// Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception
// down the road, where we have several different implementations for local scheduler and
// cluster schedulers.
//
// We've already serialized RDDs and closures in taskBinary, but here we check for all other
// objects such as Partition.
try {
closureSerializer.serialize(tasks.head)
} catch {
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString)
runningStages -= stage
return
case NonFatal(e) => // Other exceptions, such as IllegalArgumentException from Kryo.
abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
runningStages -= stage
return
}

logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.util.Utils
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
Expand Down Expand Up @@ -209,6 +210,40 @@ private[spark] class TaskSchedulerImpl(
.format(manager.taskSet.id, manager.parent.name))
}

private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}

/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
Expand Down Expand Up @@ -251,23 +286,8 @@ private[spark] class TaskSchedulerImpl(
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
}
}
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.nio.ByteBuffer
import java.util.Arrays

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.math.{min, max}
import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
Expand Down Expand Up @@ -417,6 +419,7 @@ private[spark] class TaskSetManager(
* @param host the host Id of the offered resource
* @param maxLocality the maximum locality we want to schedule the tasks at
*/
@throws[TaskNotSerializableException]
def resourceOffer(
execId: String,
host: String,
Expand Down Expand Up @@ -456,10 +459,17 @@ private[spark] class TaskSetManager(
}
// Serialize and return the task
val startTime = clock.getTime()
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
// we assume the task can be serialized without exceptions.
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
// If the task cannot be serialized, then there's no point to re-attempt the task,
// as it will always fail. So just abort the whole task-set.
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
var conf = new SparkConf(false)

override def beforeAll() {
_sc = new SparkContext("local", "test", conf)
_sc = new SparkContext("local[4]", "test", conf)
super.beforeAll()
}

Expand Down
21 changes: 21 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.rdd

import java.io.{ObjectInputStream, ObjectOutputStream, IOException}

import com.esotericsoftware.kryo.KryoException

import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
Expand Down Expand Up @@ -887,6 +891,23 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(ancestors6.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 3)
}

test("task serialization exception should not hang scheduler") {
class BadSerializable extends Serializable {
@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit = throw new KryoException("Bad serialization")

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = {}
}
// Note that in the original bug, SPARK-4349, that this verifies, the job would only hang if there were
// more threads in the Spark Context than there were number of objects in this sequence.
intercept[Throwable] {
sc.parallelize(Seq(new BadSerializable, new BadSerializable)).collect
}
// Check that the context has not crashed
sc.parallelize(1 to 100).map(x => x*2).collect
}

/** A contrived RDD that allows the manual addition of dependencies after creation. */
private class CyclicalDependencyRDD[T: ClassTag] extends RDD[T](sc, Nil) {
private val mutableDependencies: ArrayBuffer[Dependency[_]] = ArrayBuffer.empty
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import java.io.{ObjectInputStream, ObjectOutputStream, IOException}

import org.apache.spark.TaskContext

/**
* A Task implementation that fails to serialize.
*/
private[spark] class NotSerializableFakeTask(myId: Int, stageId: Int) extends Task[Array[Byte]](stageId, 0) {
override def runTask(context: TaskContext): Array[Byte] = Array.empty[Byte]
override def preferredLocations: Seq[TaskLocation] = Seq[TaskLocation]()

@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit = {
if (stageId == 0) {
throw new IllegalStateException("Cannot serialize")
}
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,34 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
assert(1 === taskDescriptions.length)
assert("executor0" === taskDescriptions(0).executorId)
}

test("Scheduler does not crash when tasks are not serializable") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskCpus = 2

sc.conf.set("spark.task.cpus", taskCpus.toString)
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
val numFreeCores = 1
taskScheduler.setDAGScheduler(dagScheduler)
var taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
var taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(0 === taskDescriptions.length)

// Now check that we can still submit tasks
// Even if one of the tasks has not-serializable tasks, the other task set should still be processed without error
taskScheduler.submitTasks(taskSet)
taskScheduler.submitTasks(FakeTask.createTaskSet(1))
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.scheduler

import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
import java.util.Random

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -563,6 +564,19 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.emittedTaskSizeWarning)
}

test("Not serializable exception thrown if the task cannot be serialized") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))

val taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)

intercept[TaskNotSerializableException] {
manager.resourceOffer("exec1", "host1", ANY)
}
assert(manager.isZombie)
}

test("abort the job if total size of results is too large") {
val conf = new SparkConf().set("spark.driver.maxResultSize", "2m")
sc = new SparkContext("local", "test", conf)
Expand Down
2 changes: 1 addition & 1 deletion docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ SPARK_VERSION: 1.3.0-SNAPSHOT
SPARK_VERSION_SHORT: 1.3.0
SCALA_BINARY_VERSION: "2.10"
SCALA_VERSION: "2.10.4"
MESOS_VERSION: 0.18.1
MESOS_VERSION: 0.21.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
19 changes: 17 additions & 2 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes

<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.yarn.am.memory</code></td>
<td>512m</td>
<td>
Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. <code>512m</code>, <code>2g</code>).
In cluster mode, use <code>spark.driver.memory</code> instead.
</td>
</tr>
<tr>
<td><code>spark.yarn.am.waitTime</code></td>
<td>100000</td>
Expand Down Expand Up @@ -90,7 +98,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
<td><code>spark.yarn.driver.memoryOverhead</code></td>
<td>driverMemory * 0.07, with minimum of 384 </td>
<td>
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
The amount of off heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
</td>
</tr>
<tr>
<td><code>spark.yarn.am.memoryOverhead</code></td>
<td>AM memory * 0.07, with minimum of 384 </td>
<td>
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the Application Master in client mode.
</td>
</tr>
<tr>
Expand Down Expand Up @@ -145,7 +160,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
<td><code>spark.yarn.am.extraJavaOptions</code></td>
<td>(none)</td>
<td>
A string of extra JVM options to pass to the Yarn ApplicationMaster in client mode.
A string of extra JVM options to pass to the YARN Application Master in client mode.
In cluster mode, use spark.driver.extraJavaOptions instead.
</td>
</tr>
Expand Down
Loading

0 comments on commit fc6a3e2

Please sign in to comment.