diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index 668032a3a2680..0aa8852649e05 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -1,19 +1,19 @@
/*
*
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index 481ff8c3e02d2..b1e1576dadc1a 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -76,7 +76,7 @@ private[spark] object ShuffleCopier extends Logging {
extends FileClientHandler with Logging {
override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) {
- logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)");
+ logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)")
resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen))
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index 9b0c882481cfc..0de22f0e06e49 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -70,7 +70,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
override def compute(split: Partition, context: TaskContext) = {
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+ y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}
override def getDependencies: Seq[Dependency[_]] = List(
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index 165cd412fcfb8..574dd4233fb27 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -33,11 +33,13 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
extends NarrowDependency[T](rdd) {
@transient
- val partitions: Array[Partition] = rdd.partitions.zipWithIndex
- .filter(s => partitionFilterFunc(s._2))
+ val partitions: Array[Partition] = rdd.partitions
+ .filter(s => partitionFilterFunc(s.index)).zipWithIndex
.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
- override def getParents(partitionId: Int) = List(partitions(partitionId).index)
+ override def getParents(partitionId: Int) = {
+ List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index a45bee536ca9d..d0ba5bf55dcfd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -199,6 +199,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
}
override def stop() {
+ stopExecutors()
try {
if (driverActor != null) {
val future = driverActor.ask(StopDriver)(timeout)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index 0ea35e2b7a311..e000531a26f7e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -62,7 +62,6 @@ private[spark] class SimrSchedulerBackend(
val conf = new Configuration()
val fs = FileSystem.get(conf)
fs.delete(new Path(driverFilePath), false)
- super.stopExecutors()
super.stop()
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index 42e9be6e19254..e596690bc3df8 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -76,7 +76,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
- val execInfo = for (b <- 0 until storageStatusList.size) yield getExecInfo(b)
+ val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execTable = UIUtils.listingTable(execHead, execRow, execInfo)
val content =
@@ -99,16 +99,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors)
}
- def getExecInfo(a: Int): Seq[String] = {
- val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId
- val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort
- val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString
- val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
- val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
- val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
- val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size).getOrElse(0)
- val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0)
- val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0)
+ def getExecInfo(statusId: Int): Seq[String] = {
+ val status = sc.getExecutorStorageStatus(statusId)
+ val execId = status.blockManagerId.executorId
+ val hostPort = status.blockManagerId.hostPort
+ val rddBlocks = status.blocks.size.toString
+ val memUsed = status.memUsed().toString
+ val maxMem = status.maxMem.toString
+ val diskUsed = status.diskUsed().toString
+ val activeTasks = listener.executorToTasksActive.getOrElse(execId, HashSet.empty[Long]).size
+ val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
+ val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
Seq(
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index c1c7aa70e6c92..fbd822867fa0d 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -133,7 +133,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
summary ++
Summary Metrics for {numCompleted} Completed Tasks
++
{summaryTable.getOrElse("No tasks have reported metrics yet.")}
++
- Tasks
++ taskTable;
+ Tasks
++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
}
diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
index 459e257d79a36..8dd5786da6ff5 100644
--- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala
@@ -30,7 +30,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
@transient var sc: SparkContext = _
override def beforeAll() {
- InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
+ InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
super.beforeAll()
}
diff --git a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
deleted file mode 100644
index 21f16ef2c6ece..0000000000000
--- a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.scalatest.FunSuite
-import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.{RDD, PartitionPruningRDD}
-
-
-class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
-
- test("Pruned Partitions inherit locality prefs correctly") {
- class TestPartition(i: Int) extends Partition {
- def index = i
- }
- val rdd = new RDD[Int](sc, Nil) {
- override protected def getPartitions = {
- Array[Partition](
- new TestPartition(1),
- new TestPartition(2),
- new TestPartition(3))
- }
- def compute(split: Partition, context: TaskContext) = {Iterator()}
- }
- val prunedRDD = PartitionPruningRDD.create(rdd, {x => if (x==2) true else false})
- val p = prunedRDD.partitions(0)
- assert(p.index == 2)
- assert(prunedRDD.partitions.length == 1)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7d938917f2650..1374d01774693 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -142,11 +142,11 @@ class PartitioningSuite extends FunSuite with SharedSparkContext {
.filter(_ >= 0.0)
// Run the partitions, including the consecutive empty ones, through StatCounter
- val stats: StatCounter = rdd.stats();
- assert(abs(6.0 - stats.sum) < 0.01);
- assert(abs(6.0/2 - rdd.mean) < 0.01);
- assert(abs(1.0 - rdd.variance) < 0.01);
- assert(abs(1.0 - rdd.stdev) < 0.01);
+ val stats: StatCounter = rdd.stats()
+ assert(abs(6.0 - stats.sum) < 0.01)
+ assert(abs(6.0/2 - rdd.mean) < 0.01)
+ assert(abs(1.0 - rdd.variance) < 0.01)
+ assert(abs(1.0 - rdd.stdev) < 0.01)
// Add other tests here for classes that should be able to handle empty partitions correctly
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
new file mode 100644
index 0000000000000..53a7b7c44df1c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.scalatest.FunSuite
+import org.apache.spark.{TaskContext, Partition, SharedSparkContext}
+
+
+class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {
+
+
+ test("Pruned Partitions inherit locality prefs correctly") {
+
+ val rdd = new RDD[Int](sc, Nil) {
+ override protected def getPartitions = {
+ Array[Partition](
+ new TestPartition(0, 1),
+ new TestPartition(1, 1),
+ new TestPartition(2, 1))
+ }
+
+ def compute(split: Partition, context: TaskContext) = {
+ Iterator()
+ }
+ }
+ val prunedRDD = PartitionPruningRDD.create(rdd, {
+ x => if (x == 2) true else false
+ })
+ assert(prunedRDD.partitions.length == 1)
+ val p = prunedRDD.partitions(0)
+ assert(p.index == 0)
+ assert(p.asInstanceOf[PartitionPruningRDDPartition].parentSplit.index == 2)
+ }
+
+
+ test("Pruned Partitions can be unioned ") {
+
+ val rdd = new RDD[Int](sc, Nil) {
+ override protected def getPartitions = {
+ Array[Partition](
+ new TestPartition(0, 4),
+ new TestPartition(1, 5),
+ new TestPartition(2, 6))
+ }
+
+ def compute(split: Partition, context: TaskContext) = {
+ List(split.asInstanceOf[TestPartition].testValue).iterator
+ }
+ }
+ val prunedRDD1 = PartitionPruningRDD.create(rdd, {
+ x => if (x == 0) true else false
+ })
+
+ val prunedRDD2 = PartitionPruningRDD.create(rdd, {
+ x => if (x == 2) true else false
+ })
+
+ val merged = prunedRDD1 ++ prunedRDD2
+ assert(merged.count() == 2)
+ val take = merged.take(2)
+ assert(take.apply(0) == 4)
+ assert(take.apply(1) == 6)
+ }
+}
+
+class TestPartition(i: Int, value: Int) extends Partition with Serializable {
+ def index = i
+
+ def testValue = this.value
+
+}
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 6fd1d0d150306..4056e9c15db2b 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -37,6 +37,8 @@ System Properties:
* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
* 'spark.yarn.submit.file.replication', the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
* 'spark.yarn.preserve.staging.files', set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
+* 'spark.yarn.scheduler.heartbeat.interval-ms', the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds.
+* 'spark.yarn.max.worker.failures', the maximum number of worker failures before failing the application. Default is the number of workers requested times 2 with minimum of 3.
# Launching Spark on YARN
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index 529709c2f9538..a1199809923bc 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -32,13 +32,13 @@ object BroadcastTest {
System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory")
System.setProperty("spark.broadcast.blockSize", blockSize)
- val sc = new SparkContext(args(0), "Broadcast Test 2",
+ val sc = new SparkContext(args(0), "Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
- var arr1 = new Array[Int](num)
+ val arr1 = new Array[Int](num)
for (i <- 0 until arr1.length) {
arr1(i) = i
}
@@ -48,9 +48,9 @@ object BroadcastTest {
println("===========")
val startTime = System.nanoTime
val barr1 = sc.broadcast(arr1)
- sc.parallelize(1 to 10, slices).foreach {
- i => println(barr1.value.size)
- }
+ val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.size)
+ // Collect the small RDD so we can print the observed sizes locally.
+ observedSizes.collect().foreach(i => println(i))
println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
index 4af45b2b4a067..83db8b9e26411 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
@@ -120,7 +120,7 @@ object LocalALS {
System.exit(1)
}
}
- printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
+ printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
val R = generateR()
diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
index f79f0142b8679..e1afc29f9aca9 100644
--- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
@@ -18,35 +18,38 @@
package org.apache.spark.examples
import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
object MultiBroadcastTest {
def main(args: Array[String]) {
if (args.length == 0) {
- System.err.println("Usage: BroadcastTest [] [numElem]")
+ System.err.println("Usage: MultiBroadcastTest [] [numElem]")
System.exit(1)
}
- val sc = new SparkContext(args(0), "Broadcast Test",
+ val sc = new SparkContext(args(0), "Multi-Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
- var arr1 = new Array[Int](num)
+ val arr1 = new Array[Int](num)
for (i <- 0 until arr1.length) {
arr1(i) = i
}
- var arr2 = new Array[Int](num)
+ val arr2 = new Array[Int](num)
for (i <- 0 until arr2.length) {
arr2(i) = i
}
val barr1 = sc.broadcast(arr1)
val barr2 = sc.broadcast(arr2)
- sc.parallelize(1 to 10, slices).foreach {
- i => println(barr1.value.size + barr2.value.size)
+ val observedSizes: RDD[(Int, Int)] = sc.parallelize(1 to 10, slices).map { _ =>
+ (barr1.value.size, barr2.value.size)
}
+ // Collect the small RDD so we can print the observed sizes locally.
+ observedSizes.collect().foreach(i => println(i))
System.exit(0)
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
index 5a7a9d1bd8f74..8543ce0e3285e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala
@@ -65,7 +65,7 @@ object SparkTC {
oldCount = nextCount
// Perform the join, obtaining an RDD of (y, (z, x)) pairs,
// then project the result to obtain the new (x, z) paths.
- tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache();
+ tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache()
nextCount = tc.count()
} while (nextCount != oldCount)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index cd3423a07b6b7..af52b7e9a12f1 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -120,7 +120,7 @@ object FeederActor {
println("Feeder started as:" + feeder)
- actorSystem.awaitTermination();
+ actorSystem.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index af698a01d5118..ff332a0282129 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -54,12 +54,12 @@ object MQTTPublisher {
client.connect()
- val msgtopic: MqttTopic = client.getTopic(topic);
+ val msgtopic: MqttTopic = client.getTopic(topic)
val msg: String = "hello mqtt demo for spark streaming"
while (true) {
val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes())
- msgtopic.publish(message);
+ msgtopic.publish(message)
println("Published data. topic: " + msgtopic.getName() + " Message: " + message)
}
client.disconnect()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index bb9febad38d03..9271914eb536a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -94,7 +94,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
fs.delete(file, false)
fs.rename(writeFile, file)
- val finishTime = System.currentTimeMillis();
+ val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds")
return
@@ -124,7 +124,9 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
def stop() {
synchronized {
- if (stopped) return ;
+ if (stopped) {
+ return
+ }
stopped = true
}
executor.shutdown()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index cf30b541e1f92..7f9dab0ef9982 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Long => JLong, Integer => JInt}
+import java.lang.{Integer => JInt}
import java.io.InputStream
import java.util.{Map => JMap, List => JList}
@@ -33,10 +33,9 @@ import twitter4j.auth.Authorization
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
-import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD}
+import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream._
-import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy}
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@@ -311,7 +310,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
implicit val cmf: ClassManifest[F] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]]
- ssc.fileStream[K, V, F](directory);
+ ssc.fileStream[K, V, F](directory)
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
index 18de772946805..a0189eca043c6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala
@@ -137,8 +137,8 @@ class FlumeReceiver(
protected override def onStart() {
val responder = new SpecificResponder(
- classOf[AvroSourceProtocol], new FlumeEventServer(this));
- val server = new NettyServer(responder, new InetSocketAddress(host, port));
+ classOf[AvroSourceProtocol], new FlumeEventServer(this))
+ val server = new NettyServer(responder, new InetSocketAddress(host, port))
blockGenerator.start()
server.start()
logInfo("Flume receiver started")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index a559db468a771..7dc82decefd6e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -124,9 +124,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
- val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort));
+ val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
val client = SpecificRequestor.getClient(
- classOf[AvroSourceProtocol], transceiver);
+ classOf[AvroSourceProtocol], transceiver)
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index be140699c2964..8c8c359e6e865 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -251,7 +251,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
Thread.sleep(500) // Give some time for the forgetting old RDDs to complete
} catch {
- case e: Exception => e.printStackTrace(); throw e;
+ case e: Exception => {e.printStackTrace(); throw e}
} finally {
ssc.stop()
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e4f3d3ef64189..9c43a7287d6ee 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -57,7 +57,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
-
+ // default to numWorkers * 2, with minimum of 3
+ private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+ math.max(args.numWorkers * 2, 3).toString()).toInt
def run() {
// Setup the directories so things go to yarn approved directories rather
@@ -68,7 +70,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
appAttemptId = getApplicationAttemptId()
- isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts;
+ isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
resourceManager = registerWithResourceManager()
// Workaround until hadoop moves to something which has
@@ -203,7 +205,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
successed = true
} finally {
logDebug("finishing main")
- isLastAMRetry = true;
+ isLastAMRetry = true
if (successed) {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
} else {
@@ -267,9 +269,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Exists the loop if the user thread exits.
while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
- val numContainersToAllocate = math.max(
- args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)
- this.yarnAllocator.allocateContainers(numContainersToAllocate)
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
+ yarnAllocator.allocateContainers(
+ math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
@@ -286,21 +291,28 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (userThread.isAlive) {
// Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // Must be <= timeoutInterval/ 2.
- // On other hand, also ensure that we are reasonably responsive without causing too many
- // requests to RM. So, at least 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+
+ // we want to be reasonably responsive without causing too many requests to RM.
+ val schedulerInterval =
+ System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+
+ // must be <= timeoutInterval / 2.
+ val interval = math.min(timeoutInterval / 2, schedulerInterval)
+
launchReporterThread(interval)
}
}
- // TODO: We might want to extend this to allocate more containers in case they die.
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
val t = new Thread {
override def run() {
while (userThread.isAlive) {
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
@@ -340,7 +352,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
*/
- def finishApplicationMaster(status: FinalApplicationStatus) {
+ def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
synchronized {
if (isFinished) {
return
@@ -353,6 +365,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
// Set tracking url to empty since we don't have a history server.
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 08699cc5f8fc1..68527fbdc7d24 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
+import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
@@ -27,6 +27,7 @@ import scala.collection.mutable.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
@@ -60,6 +61,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
def run() {
+ validateArgs()
+
init(yarnConf)
start()
logClusterResourceDetails()
@@ -84,6 +87,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
System.exit(0)
}
+ def validateArgs() = {
+ Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
+ (args.userJar == null) -> "Error: You must specify a user jar!",
+ (args.userClass == null) -> "Error: You must specify a user class!",
+ (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+ (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
+ ("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD),
+ (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
+ ("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString()))
+ .foreach { case(cond, errStr) =>
+ if (cond) {
+ logError(errStr)
+ args.printUsageAndExit(1)
+ }
+ }
+ }
+
def getAppStagingDir(appId: ApplicationId): String = {
SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
}
@@ -103,14 +123,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
queueInfo.getChildQueues.size)
}
-
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
// If we have requested more then the clusters max for a single resource then exit.
if (args.workerMemory > maxMem) {
- logError("the worker size is to large to run on this cluster " + args.workerMemory);
+ logError("the worker size is to large to run on this cluster " + args.workerMemory)
System.exit(1)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
@@ -145,8 +164,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var dstHost = dstUri.getHost()
if ((srcHost != null) && (dstHost != null)) {
try {
- srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
- dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
} catch {
case e: UnknownHostException =>
return false
@@ -163,7 +182,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
if (srcUri.getPort() != dstUri.getPort()) {
return false
}
- return true;
+ return true
}
/** Copy the file into HDFS if needed. */
@@ -173,13 +192,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
replication: Short,
setPerms: Boolean = false): Path = {
val fs = FileSystem.get(conf)
- val remoteFs = originalPath.getFileSystem(conf);
+ val remoteFs = originalPath.getFileSystem(conf)
var newPath = originalPath
if (! compareFs(remoteFs, fs)) {
newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
- FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf);
- fs.setReplication(newPath, replication);
+ FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
+ fs.setReplication(newPath, replication)
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
}
// Resolve any symlinks in the URI path so using a "current" symlink to point to a specific
@@ -196,7 +215,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// local resources to the AM.
val fs = FileSystem.get(conf)
- val delegTokenRenewer = Master.getMasterPrincipal(conf);
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
logError("Can't get Master Kerberos principal for use as renewer")
@@ -208,18 +227,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
- dstFs.addDelegationTokens(delegTokenRenewer, credentials);
+ dstFs.addDelegationTokens(delegTokenRenewer, credentials)
}
val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
- logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
- System.exit(1)
- }
-
Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
@@ -273,7 +287,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
- UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ UserGroupInformation.getCurrentUser().addCredentials(credentials)
return localResources
}
@@ -359,18 +373,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
- // Command for the ApplicationMaster.
- var javaCommand = "java";
+ // Command for the ApplicationMaster
+ var javaCommand = "java"
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}
- if (args.userClass == null) {
- logError("Error: You must specify a user class!")
- System.exit(1)
- }
-
val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
@@ -448,6 +457,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val args = new ClientArguments(argStrings)
+
new Client(args).run
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 674c8f8112b86..5f159b073f537 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -197,11 +197,11 @@ class ClientDistributedCacheManager() extends Logging {
*/
def checkPermissionOfOther(fs: FileSystem, path: Path,
action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
- val status = getFileStatus(fs, path.toUri(), statCache);
+ val status = getFileStatus(fs, path.toUri(), statCache)
val perms = status.getPermission()
val otherAction = perms.getOtherAction()
if (otherAction.implies(action)) {
- return true;
+ return true
}
return false
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index d9eabf3bae5f0..6a90cc51cfbaf 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
@@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
import org.apache.spark.Logging
-import org.apache.spark.util.Utils
class WorkerRunnable(
@@ -119,7 +118,7 @@ class WorkerRunnable(
credentials.writeTokenStorageToStream(dob)
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- var javaCommand = "java";
+ var javaCommand = "java"
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
@@ -217,8 +216,8 @@ class WorkerRunnable(
// Use doAs and remoteUser here so we can add the container token and not pollute the current
// users credentials with all of the individual container tokens
- val user = UserGroupInformation.createRemoteUser(container.getId().toString());
- val containerToken = container.getContainerToken();
+ val user = UserGroupInformation.createRemoteUser(container.getId().toString())
+ val containerToken = container.getContainerToken()
if (containerToken != null) {
user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
}
@@ -229,8 +228,8 @@ class WorkerRunnable(
return rpc.getProxy(classOf[ContainerManager],
cmAddress, conf).asInstanceOf[ContainerManager]
}
- });
- return proxy;
+ })
+ proxy
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index a9fbc27613903..2a08255bf300e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -87,9 +87,12 @@ private[yarn] class YarnAllocationHandler(
// Used to generate a unique id per worker
private val workerIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
+ private val numWorkersFailed = new AtomicInteger()
def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ def getNumWorkersFailed: Int = numWorkersFailed.intValue
+
def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
}
@@ -272,8 +275,16 @@ private[yarn] class YarnAllocationHandler(
else {
// Simply decrement count - next iteration of ReporterThread will take care of allocating.
numWorkersRunning.decrementAndGet()
- logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState +
- " httpaddress: " + completedContainer.getDiagnostics)
+ logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
+ " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())
+
+ // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+ // there are some exit status' we shouldn't necessarily count against us, but for
+ // now I think its ok as none of the containers are expected to exit
+ if (completedContainer.getExitStatus() != 0) {
+ logInfo("Container marked as failed: " + containerId)
+ numWorkersFailed.incrementAndGet()
+ }
}
allocatedHostToContainersMap.synchronized {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index ca2f1e2565b9a..2ba2366ead171 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,13 +18,10 @@
package org.apache.spark.deploy.yarn
import org.apache.spark.deploy.SparkHadoopUtil
-import collection.mutable.HashMap
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import java.security.PrivilegedExceptionAction
/**
* Contains util methods to interact with Hadoop from spark.
@@ -40,7 +37,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
override def addCredentials(conf: JobConf) {
- val jobCreds = conf.getCredentials();
+ val jobCreds = conf.getCredentials()
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
}