From b60839e56a335d0e30578d1c4cad5b0319d565df Mon Sep 17 00:00:00 2001 From: BlackNiuza Date: Sun, 17 Nov 2013 21:38:57 +0800 Subject: [PATCH 1/9] correct number of tasks in ExecutorsUI --- .../apache/spark/ui/exec/ExecutorsUI.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 42e9be6e19254..ba198b211db2c 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -100,15 +100,16 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { } def getExecInfo(a: Int): Seq[String] = { - val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId - val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort - val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString - val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString - val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString - val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString - val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size).getOrElse(0) - val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0) - val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0) + val status = sc.getExecutorStorageStatus(a) + val execId = status.blockManagerId.executorId + val hostPort = status.blockManagerId.hostPort + val rddBlocks = status.blocks.size.toString + val memUsed = status.memUsed().toString + val maxMem = status.maxMem.toString + val diskUsed = status.diskUsed().toString + val activeTasks = listener.executorToTasksActive.getOrElse(execId, Seq[Long]()).size + val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) + val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks Seq( From ecfbaf24426948a9c09225190e71bc1148a9944b Mon Sep 17 00:00:00 2001 From: BlackNiuza Date: Mon, 18 Nov 2013 09:51:40 +0800 Subject: [PATCH 2/9] rename "a" to "statusId" --- .../main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index ba198b211db2c..26245a6540f85 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -76,7 +76,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { } - val execInfo = for (b <- 0 until storageStatusList.size) yield getExecInfo(b) + val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId) val execTable = UIUtils.listingTable(execHead, execRow, execInfo) val content = @@ -99,8 +99,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors) } - def getExecInfo(a: Int): Seq[String] = { - val status = sc.getExecutorStorageStatus(a) + def getExecInfo(statusId: Int): Seq[String] = { + val status = sc.getExecutorStorageStatus(statusId) val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort val rddBlocks = status.blocks.size.toString From eda05fa43953f601d14853f3416e99e012a1bbba Mon Sep 17 00:00:00 2001 From: "shiyun.wxm" Date: Mon, 18 Nov 2013 13:31:14 +0800 Subject: [PATCH 3/9] use HashSet.empty[Long] instead of Seq[Long] --- core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala index 26245a6540f85..e596690bc3df8 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala @@ -107,7 +107,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { val memUsed = status.memUsed().toString val maxMem = status.maxMem.toString val diskUsed = status.diskUsed().toString - val activeTasks = listener.executorToTasksActive.getOrElse(execId, Seq[Long]()).size + val activeTasks = listener.executorToTasksActive.getOrElse(execId, HashSet.empty[Long]).size val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks From 13b9bf494b0d1d0e65dc357efe832763127aefd2 Mon Sep 17 00:00:00 2001 From: Matthew Taylor Date: Mon, 18 Nov 2013 06:41:21 +0000 Subject: [PATCH 4/9] PartitionPruningRDD is using index from parent --- .../spark/rdd/PartitionPruningRDD.scala | 6 +- .../spark/PartitionPruningRDDSuite.scala | 70 ++++++++++++++++--- 2 files changed, 63 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala index 165cd412fcfb8..2738a0089409a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala @@ -34,10 +34,12 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo @transient val partitions: Array[Partition] = rdd.partitions.zipWithIndex - .filter(s => partitionFilterFunc(s._2)) + .filter(s => partitionFilterFunc(s._2)).map(_._1).zipWithIndex .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition } - override def getParents(partitionId: Int) = List(partitions(partitionId).index) + override def getParents(partitionId: Int) = { + List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index) + } } diff --git a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala index 21f16ef2c6ece..28e71e835fa2c 100644 --- a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala @@ -19,27 +19,75 @@ package org.apache.spark import org.scalatest.FunSuite import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.{RDD, PartitionPruningRDD} +import org.apache.spark.rdd.{PartitionPruningRDDPartition, RDD, PartitionPruningRDD} class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext { + test("Pruned Partitions inherit locality prefs correctly") { - class TestPartition(i: Int) extends Partition { - def index = i - } + val rdd = new RDD[Int](sc, Nil) { override protected def getPartitions = { Array[Partition]( - new TestPartition(1), - new TestPartition(2), - new TestPartition(3)) + new TestPartition(0, 1), + new TestPartition(1, 1), + new TestPartition(2, 1)) + } + + def compute(split: Partition, context: TaskContext) = { + Iterator() } - def compute(split: Partition, context: TaskContext) = {Iterator()} } - val prunedRDD = PartitionPruningRDD.create(rdd, {x => if (x==2) true else false}) - val p = prunedRDD.partitions(0) - assert(p.index == 2) + val prunedRDD = PartitionPruningRDD.create(rdd, { + x => if (x == 2) true else false + }) assert(prunedRDD.partitions.length == 1) + val p = prunedRDD.partitions(0) + assert(p.index == 0) + assert(p.asInstanceOf[PartitionPruningRDDPartition].parentSplit.index == 2) } + + + test("Pruned Partitions can be merged ") { + + val rdd = new RDD[Int](sc, Nil) { + override protected def getPartitions = { + Array[Partition]( + new TestPartition(0, 4), + new TestPartition(1, 5), + new TestPartition(2, 6)) + } + + def compute(split: Partition, context: TaskContext) = { + List(split.asInstanceOf[TestPartition].testValue).iterator + } + } + val prunedRDD1 = PartitionPruningRDD.create(rdd, { + x => if (x == 0) true else false + }) + + val prunedRDD2 = PartitionPruningRDD.create(rdd, { + x => if (x == 2) true else false + }) + + val merged = prunedRDD1 ++ prunedRDD2 + + assert(merged.count() == 2) + val take = merged.take(2) + + assert(take.apply(0) == 4) + + assert(take.apply(1) == 6) + + + } + } + +class TestPartition(i: Int, value: Int) extends Partition with Serializable { + def index = i + + def testValue = this.value + +} \ No newline at end of file From 50fd8d98c00f7db6aa34183705c9269098c62486 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Mon, 18 Nov 2013 22:51:35 -0800 Subject: [PATCH 5/9] Enable the Broadcast examples to work in a cluster setting Since they rely on println to display results, we need to first collect those results to the driver to have them actually display locally. --- .../org/apache/spark/examples/BroadcastTest.scala | 10 +++++----- .../spark/examples/MultiBroadcastTest.scala | 15 +++++++++------ 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala index 529709c2f9538..a1199809923bc 100644 --- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala @@ -32,13 +32,13 @@ object BroadcastTest { System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory") System.setProperty("spark.broadcast.blockSize", blockSize) - val sc = new SparkContext(args(0), "Broadcast Test 2", + val sc = new SparkContext(args(0), "Broadcast Test", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 - var arr1 = new Array[Int](num) + val arr1 = new Array[Int](num) for (i <- 0 until arr1.length) { arr1(i) = i } @@ -48,9 +48,9 @@ object BroadcastTest { println("===========") val startTime = System.nanoTime val barr1 = sc.broadcast(arr1) - sc.parallelize(1 to 10, slices).foreach { - i => println(barr1.value.size) - } + val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.size) + // Collect the small RDD so we can print the observed sizes locally. + observedSizes.collect().foreach(i => println(i)) println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6)) } diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala index f79f0142b8679..e1afc29f9aca9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala @@ -18,35 +18,38 @@ package org.apache.spark.examples import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD object MultiBroadcastTest { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: BroadcastTest [] [numElem]") + System.err.println("Usage: MultiBroadcastTest [] [numElem]") System.exit(1) } - val sc = new SparkContext(args(0), "Broadcast Test", + val sc = new SparkContext(args(0), "Multi-Broadcast Test", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 - var arr1 = new Array[Int](num) + val arr1 = new Array[Int](num) for (i <- 0 until arr1.length) { arr1(i) = i } - var arr2 = new Array[Int](num) + val arr2 = new Array[Int](num) for (i <- 0 until arr2.length) { arr2(i) = i } val barr1 = sc.broadcast(arr1) val barr2 = sc.broadcast(arr2) - sc.parallelize(1 to 10, slices).foreach { - i => println(barr1.value.size + barr2.value.size) + val observedSizes: RDD[(Int, Int)] = sc.parallelize(1 to 10, slices).map { _ => + (barr1.value.size, barr2.value.size) } + // Collect the small RDD so we can print the observed sizes locally. + observedSizes.collect().foreach(i => println(i)) System.exit(0) } From f639b65eabcc8666b74af8f13a37c5fdf7e0185f Mon Sep 17 00:00:00 2001 From: Matthew Taylor Date: Tue, 19 Nov 2013 10:48:48 +0000 Subject: [PATCH 6/9] PartitionPruningRDD is using index from parent(review changes) --- .../apache/spark/rdd/PartitionPruningRDD.scala | 4 ++-- .../{ => rdd}/PartitionPruningRDDSuite.scala | 15 ++++----------- 2 files changed, 6 insertions(+), 13 deletions(-) rename core/src/test/scala/org/apache/spark/{ => rdd}/PartitionPruningRDDSuite.scala (92%) diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala index 2738a0089409a..574dd4233fb27 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala @@ -33,8 +33,8 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo extends NarrowDependency[T](rdd) { @transient - val partitions: Array[Partition] = rdd.partitions.zipWithIndex - .filter(s => partitionFilterFunc(s._2)).map(_._1).zipWithIndex + val partitions: Array[Partition] = rdd.partitions + .filter(s => partitionFilterFunc(s.index)).zipWithIndex .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition } override def getParents(partitionId: Int) = { diff --git a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala similarity index 92% rename from core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala rename to core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala index 28e71e835fa2c..53a7b7c44df1c 100644 --- a/core/src/test/scala/org/apache/spark/PartitionPruningRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PartitionPruningRDDSuite.scala @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.spark.rdd import org.scalatest.FunSuite -import org.apache.spark.SparkContext._ -import org.apache.spark.rdd.{PartitionPruningRDDPartition, RDD, PartitionPruningRDD} +import org.apache.spark.{TaskContext, Partition, SharedSparkContext} class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext { @@ -49,7 +48,7 @@ class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext { } - test("Pruned Partitions can be merged ") { + test("Pruned Partitions can be unioned ") { val rdd = new RDD[Int](sc, Nil) { override protected def getPartitions = { @@ -72,17 +71,11 @@ class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext { }) val merged = prunedRDD1 ++ prunedRDD2 - assert(merged.count() == 2) val take = merged.take(2) - assert(take.apply(0) == 4) - assert(take.apply(1) == 6) - - } - } class TestPartition(i: Int, value: Int) extends Partition with Serializable { @@ -90,4 +83,4 @@ class TestPartition(i: Int, value: Int) extends Partition with Serializable { def testValue = this.value -} \ No newline at end of file +} From 9c934b640f76b17097f2cae87fef30b05ce854b7 Mon Sep 17 00:00:00 2001 From: Henry Saputra Date: Tue, 19 Nov 2013 10:19:03 -0800 Subject: [PATCH 7/9] Remove the semicolons at the end of Scala code to make it more pure Scala code. Also remove unused imports as I found them along the way. Remove return statements when returning value in the Scala code. Passing compile and tests. --- .../spark/deploy/FaultToleranceTest.scala | 28 +++++++++---------- .../org/apache/spark/rdd/CartesianRDD.scala | 2 +- .../org/apache/spark/LocalSparkContext.scala | 2 +- .../org/apache/spark/PartitioningSuite.scala | 10 +++---- .../org/apache/spark/examples/LocalALS.scala | 2 +- .../org/apache/spark/examples/SparkTC.scala | 2 +- .../streaming/examples/ActorWordCount.scala | 2 +- .../streaming/examples/MQTTWordCount.scala | 4 +-- .../apache/spark/streaming/Checkpoint.scala | 2 +- .../api/java/JavaStreamingContext.scala | 7 ++--- .../streaming/dstream/FlumeInputDStream.scala | 4 +-- .../spark/streaming/InputStreamsSuite.scala | 4 +-- .../spark/streaming/TestSuiteBase.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 4 ++- .../org/apache/spark/deploy/yarn/Client.scala | 27 ++++++++---------- .../yarn/ClientDistributedCacheManager.scala | 2 +- .../spark/deploy/yarn/WorkerRunnable.scala | 9 +++--- .../deploy/yarn/YarnSparkHadoopUtil.scala | 5 +--- 18 files changed, 56 insertions(+), 62 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index 668032a3a2680..0aa8852649e05 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -1,19 +1,19 @@ /* * - * * Licensed to the Apache Software Foundation (ASF) under one or more - * * contributor license agreements. See the NOTICE file distributed with - * * this work for additional information regarding copyright ownership. - * * The ASF licenses this file to You under the Apache License, Version 2.0 - * * (the "License"); you may not use this file except in compliance with - * * the License. You may obtain a copy of the License at - * * - * * http://www.apache.org/licenses/LICENSE-2.0 - * * - * * Unless required by applicable law or agreed to in writing, software - * * distributed under the License is distributed on an "AS IS" BASIS, - * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * * See the License for the specific language governing permissions and - * * limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 9b0c882481cfc..0de22f0e06e49 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -70,7 +70,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( override def compute(split: Partition, context: TaskContext) = { val currSplit = split.asInstanceOf[CartesianPartition] for (x <- rdd1.iterator(currSplit.s1, context); - y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) + y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) } override def getDependencies: Seq[Dependency[_]] = List( diff --git a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala index 459e257d79a36..8dd5786da6ff5 100644 --- a/core/src/test/scala/org/apache/spark/LocalSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/LocalSparkContext.scala @@ -30,7 +30,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self @transient var sc: SparkContext = _ override def beforeAll() { - InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()); + InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()) super.beforeAll() } diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 7d938917f2650..1374d01774693 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -142,11 +142,11 @@ class PartitioningSuite extends FunSuite with SharedSparkContext { .filter(_ >= 0.0) // Run the partitions, including the consecutive empty ones, through StatCounter - val stats: StatCounter = rdd.stats(); - assert(abs(6.0 - stats.sum) < 0.01); - assert(abs(6.0/2 - rdd.mean) < 0.01); - assert(abs(1.0 - rdd.variance) < 0.01); - assert(abs(1.0 - rdd.stdev) < 0.01); + val stats: StatCounter = rdd.stats() + assert(abs(6.0 - stats.sum) < 0.01) + assert(abs(6.0/2 - rdd.mean) < 0.01) + assert(abs(1.0 - rdd.variance) < 0.01) + assert(abs(1.0 - rdd.stdev) < 0.01) // Add other tests here for classes that should be able to handle empty partitions correctly } diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala index 4af45b2b4a067..83db8b9e26411 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala @@ -120,7 +120,7 @@ object LocalALS { System.exit(1) } } - printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS); + printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS) val R = generateR() diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala index 5a7a9d1bd8f74..8543ce0e3285e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTC.scala @@ -65,7 +65,7 @@ object SparkTC { oldCount = nextCount // Perform the join, obtaining an RDD of (y, (z, x)) pairs, // then project the result to obtain the new (x, z) paths. - tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache(); + tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache() nextCount = tc.count() } while (nextCount != oldCount) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index cd3423a07b6b7..af52b7e9a12f1 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -120,7 +120,7 @@ object FeederActor { println("Feeder started as:" + feeder) - actorSystem.awaitTermination(); + actorSystem.awaitTermination() } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index af698a01d5118..ff332a0282129 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -54,12 +54,12 @@ object MQTTPublisher { client.connect() - val msgtopic: MqttTopic = client.getTopic(topic); + val msgtopic: MqttTopic = client.getTopic(topic) val msg: String = "hello mqtt demo for spark streaming" while (true) { val message: MqttMessage = new MqttMessage(String.valueOf(msg).getBytes()) - msgtopic.publish(message); + msgtopic.publish(message) println("Published data. topic: " + msgtopic.getName() + " Message: " + message) } client.disconnect() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index bb9febad38d03..78a2c07204a00 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -94,7 +94,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { fs.delete(file, false) fs.rename(writeFile, file) - val finishTime = System.currentTimeMillis(); + val finishTime = System.currentTimeMillis() logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + file + "', took " + bytes.length + " bytes and " + (finishTime - startTime) + " milliseconds") return diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index cf30b541e1f92..7f9dab0ef9982 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.api.java -import java.lang.{Long => JLong, Integer => JInt} +import java.lang.{Integer => JInt} import java.io.InputStream import java.util.{Map => JMap, List => JList} @@ -33,10 +33,9 @@ import twitter4j.auth.Authorization import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} -import org.apache.spark.api.java.{JavaPairRDD, JavaRDDLike, JavaSparkContext, JavaRDD} +import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaRDD} import org.apache.spark.streaming._ import org.apache.spark.streaming.dstream._ -import org.apache.spark.streaming.receivers.{ActorReceiver, ReceiverSupervisorStrategy} /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic @@ -311,7 +310,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] implicit val cmf: ClassManifest[F] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]] - ssc.fileStream[K, V, F](directory); + ssc.fileStream[K, V, F](directory) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala index 18de772946805..a0189eca043c6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FlumeInputDStream.scala @@ -137,8 +137,8 @@ class FlumeReceiver( protected override def onStart() { val responder = new SpecificResponder( - classOf[AvroSourceProtocol], new FlumeEventServer(this)); - val server = new NettyServer(responder, new InetSocketAddress(host, port)); + classOf[AvroSourceProtocol], new FlumeEventServer(this)) + val server = new NettyServer(responder, new InetSocketAddress(host, port)) blockGenerator.start() server.start() logInfo("Flume receiver started") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index a559db468a771..7dc82decefd6e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -124,9 +124,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) Thread.sleep(1000) - val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)); + val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort)) val client = SpecificRequestor.getClient( - classOf[AvroSourceProtocol], transceiver); + classOf[AvroSourceProtocol], transceiver) for (i <- 0 until input.size) { val event = new AvroFlumeEvent diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index be140699c2964..8c8c359e6e865 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -251,7 +251,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { Thread.sleep(500) // Give some time for the forgetting old RDDs to complete } catch { - case e: Exception => e.printStackTrace(); throw e; + case e: Exception => {e.printStackTrace(); throw e} } finally { ssc.stop() } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 0e47bd7a10a21..3f6e151d8903d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -21,6 +21,7 @@ import java.io.IOException import java.net.Socket import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.Utils + import scala.collection.JavaConversions._ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { @@ -63,7 +65,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId() - isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts; + isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts resourceManager = registerWithResourceManager() // Workaround until hadoop moves to something which has diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c38bdd14ec9c5..038de30de5247 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,14 +17,13 @@ package org.apache.spark.deploy.yarn -import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI} +import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil} import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.mapred.Master -import org.apache.hadoop.net.NetUtils import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ @@ -40,9 +39,7 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.Map import scala.collection.JavaConversions._ -import org.apache.spark.Logging -import org.apache.spark.util.Utils -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.Logging class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging { @@ -105,7 +102,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // if we have requested more then the clusters max for a single resource then exit. if (args.workerMemory > maxMem) { - logError("the worker size is to large to run on this cluster " + args.workerMemory); + logError("the worker size is to large to run on this cluster " + args.workerMemory) System.exit(1) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD @@ -142,8 +139,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl var dstHost = dstUri.getHost() if ((srcHost != null) && (dstHost != null)) { try { - srcHost = InetAddress.getByName(srcHost).getCanonicalHostName(); - dstHost = InetAddress.getByName(dstHost).getCanonicalHostName(); + srcHost = InetAddress.getByName(srcHost).getCanonicalHostName() + dstHost = InetAddress.getByName(dstHost).getCanonicalHostName() } catch { case e: UnknownHostException => return false @@ -160,7 +157,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl if (srcUri.getPort() != dstUri.getPort()) { return false } - return true; + return true } /** @@ -172,13 +169,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl replication: Short, setPerms: Boolean = false): Path = { val fs = FileSystem.get(conf) - val remoteFs = originalPath.getFileSystem(conf); + val remoteFs = originalPath.getFileSystem(conf) var newPath = originalPath if (! compareFs(remoteFs, fs)) { newPath = new Path(dstDir, originalPath.getName()) logInfo("Uploading " + originalPath + " to " + newPath) - FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf); - fs.setReplication(newPath, replication); + FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf) + fs.setReplication(newPath, replication) if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION)) } // resolve any symlinks in the URI path so using a "current" symlink @@ -196,7 +193,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Add them as local resources to the AM val fs = FileSystem.get(conf) - val delegTokenRenewer = Master.getMasterPrincipal(conf); + val delegTokenRenewer = Master.getMasterPrincipal(conf) if (UserGroupInformation.isSecurityEnabled()) { if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { logError("Can't get Master Kerberos principal for use as renewer") @@ -208,7 +205,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl if (UserGroupInformation.isSecurityEnabled()) { val dstFs = dst.getFileSystem(conf) - dstFs.addDelegationTokens(delegTokenRenewer, credentials); + dstFs.addDelegationTokens(delegTokenRenewer, credentials) } val localResources = HashMap[String, LocalResource]() FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION)) @@ -273,7 +270,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } - UserGroupInformation.getCurrentUser().addCredentials(credentials); + UserGroupInformation.getCurrentUser().addCredentials(credentials) return localResources } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 674c8f8112b86..268ab950e83a0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -197,7 +197,7 @@ class ClientDistributedCacheManager() extends Logging { */ def checkPermissionOfOther(fs: FileSystem, path: Path, action: FsAction, statCache: Map[URI, FileStatus]): Boolean = { - val status = getFileStatus(fs, path.toUri(), statCache); + val status = getFileStatus(fs, path.toUri(), statCache) val perms = status.getPermission() val otherAction = perms.getOtherAction() if (otherAction.implies(action)) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 7a66532254c74..fb966b4784819 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import java.security.PrivilegedExceptionAction import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.net.NetUtils import org.apache.hadoop.security.UserGroupInformation @@ -38,7 +38,6 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import org.apache.spark.Logging -import org.apache.spark.util.Utils class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String, slaveId: String, hostname: String, workerMemory: Int, workerCores: Int) @@ -204,8 +203,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S // use doAs and remoteUser here so we can add the container token and not // pollute the current users credentials with all of the individual container tokens - val user = UserGroupInformation.createRemoteUser(container.getId().toString()); - val containerToken = container.getContainerToken(); + val user = UserGroupInformation.createRemoteUser(container.getId().toString()) + val containerToken = container.getContainerToken() if (containerToken != null) { user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress)) } @@ -217,7 +216,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S cmAddress, conf).asInstanceOf[ContainerManager] } }); - return proxy; + proxy } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index ca2f1e2565b9a..2ba2366ead171 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -18,13 +18,10 @@ package org.apache.spark.deploy.yarn import org.apache.spark.deploy.SparkHadoopUtil -import collection.mutable.HashMap import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import java.security.PrivilegedExceptionAction /** * Contains util methods to interact with Hadoop from spark. @@ -40,7 +37,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster override def addCredentials(conf: JobConf) { - val jobCreds = conf.getCredentials(); + val jobCreds = conf.getCredentials() jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } } From 4093e9393aef95793f2d1d77fd0bbe80c8bb8d11 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Tue, 19 Nov 2013 12:39:26 -0600 Subject: [PATCH 8/9] Impove Spark on Yarn Error handling --- .../CoarseGrainedSchedulerBackend.scala | 1 + .../cluster/SimrSchedulerBackend.scala | 1 - docs/running-on-yarn.md | 2 + .../spark/deploy/yarn/ApplicationMaster.scala | 39 ++++++++++++------- .../org/apache/spark/deploy/yarn/Client.scala | 32 +++++++++------ .../deploy/yarn/YarnAllocationHandler.scala | 16 ++++++-- 6 files changed, 61 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index a45bee536ca9d..d0ba5bf55dcfd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -199,6 +199,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac } override def stop() { + stopExecutors() try { if (driverActor != null) { val future = driverActor.ask(StopDriver)(timeout) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index 0ea35e2b7a311..e000531a26f7e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -62,7 +62,6 @@ private[spark] class SimrSchedulerBackend( val conf = new Configuration() val fs = FileSystem.get(conf) fs.delete(new Path(driverFilePath), false) - super.stopExecutors() super.stop() } } diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 6fd1d0d150306..4056e9c15db2b 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -37,6 +37,8 @@ System Properties: * 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10. * 'spark.yarn.submit.file.replication', the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives. * 'spark.yarn.preserve.staging.files', set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them. +* 'spark.yarn.scheduler.heartbeat.interval-ms', the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds. +* 'spark.yarn.max.worker.failures', the maximum number of worker failures before failing the application. Default is the number of workers requested times 2 with minimum of 3. # Launching Spark on YARN diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 0e47bd7a10a21..89b00415daa8e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -52,7 +52,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true - + // default to numWorkers * 2, with minimum of 3 + private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures", + math.max(args.numWorkers * 2, 3).toString()).toInt def run() { // setup the directories so things go to yarn approved directories rather @@ -225,12 +227,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e if (null != sparkContext) { uiAddress = sparkContext.ui.appUIAddress - this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, - sparkContext.preferredNodeLocationData) + this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, + appAttemptId, args, sparkContext.preferredNodeLocationData) } else { logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime + - ", numTries = " + numTries) - this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args) + ", numTries = " + numTries) + this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, + appAttemptId, args) } } } finally { @@ -249,8 +252,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e while(yarnAllocator.getNumWorkersRunning < args.numWorkers && // If user thread exists, then quit ! userThread.isAlive) { - - this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) + if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of worker failures reached") + } + yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } @@ -266,21 +272,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) launchReporterThread(interval) } } - // TODO: We might want to extend this to allocate more containers in case they die ! private def launchReporterThread(_sleepTime: Long): Thread = { val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime val t = new Thread { override def run() { while (userThread.isAlive) { + if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of worker failures reached") + } val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning if (missingWorkerCount > 0) { logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") @@ -319,7 +331,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } */ - def finishApplicationMaster(status: FinalApplicationStatus) { + def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") { synchronized { if (isFinished) { @@ -333,6 +345,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) + finishReq.setDiagnostics(diagnostics) // set tracking url to empty since we don't have a history server finishReq.setTrackingUrl("") resourceManager.finishApplicationMaster(finishReq) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c38bdd14ec9c5..1078d5b826f67 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -60,6 +60,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) def run() { + validateArgs() + init(yarnConf) start() logClusterResourceDetails() @@ -84,6 +86,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl System.exit(0) } + def validateArgs() = { + Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!", + (args.userJar == null) -> "Error: You must specify a user jar!", + (args.userClass == null) -> "Error: You must specify a user class!", + (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!", + (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> + ("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD), + (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> + ("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString())) + .foreach { case(cond, errStr) => + if (cond) { + logError(errStr) + args.printUsageAndExit(1) + } + } + } + def getAppStagingDir(appId: ApplicationId): String = { SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR } @@ -97,7 +116,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size + ", queueChildQueueCount=" + queueInfo.getChildQueues.size) } - def verifyClusterResources(app: GetNewApplicationResponse) = { val maxMem = app.getMaximumResourceCapability().getMemory() @@ -215,11 +233,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - if (System.getenv("SPARK_JAR") == null || args.userJar == null) { - logError("Error: You must set SPARK_JAR environment variable and specify a user jar!") - System.exit(1) - } - Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar, Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")) .foreach { case(destName, _localPath) => @@ -334,7 +347,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " - // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same // node, spark gc effects all other containers performance (which can also be other spark containers) @@ -360,11 +372,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl javaCommand = Environment.JAVA_HOME.$() + "/bin/java" } - if (args.userClass == null) { - logError("Error: You must specify a user class!") - System.exit(1) - } - val commands = List[String](javaCommand + " -server " + JAVA_OPTS + @@ -442,6 +449,7 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val args = new ClientArguments(argStrings) + new Client(args).run } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 25da9aa917d95..507a0743fd77a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM // Used to generate a unique id per worker private val workerIdCounter = new AtomicInteger() private val lastResponseId = new AtomicInteger() + private val numWorkersFailed = new AtomicInteger() def getNumWorkersRunning: Int = numWorkersRunning.intValue + def getNumWorkersFailed: Int = numWorkersFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) @@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM else { // simply decrement count - next iteration of ReporterThread will take care of allocating ! numWorkersRunning.decrementAndGet() - logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState + - " httpaddress: " + completedContainer.getDiagnostics) + logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState + + " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus()) + + // Hadoop 2.2.X added a ContainerExitStatus we should switch to use + // there are some exit status' we shouldn't necessarily count against us, but for + // now I think its ok as none of the containers are expected to exit + if (completedContainer.getExitStatus() != 0) { + logInfo("Container marked as failed: " + containerId) + numWorkersFailed.incrementAndGet() + } } allocatedHostToContainersMap.synchronized { @@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM val releasedContainerList = createReleasedContainerList() req.addAllReleases(releasedContainerList) - - if (numWorkers > 0) { logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.") } From 10be58f251b5e883295bd46383c0a9758555f8fc Mon Sep 17 00:00:00 2001 From: Henry Saputra Date: Tue, 19 Nov 2013 16:56:23 -0800 Subject: [PATCH 9/9] Another set of changes to remove unnecessary semicolon (;) from Scala code. Passed the sbt/sbt compile and test --- .../scala/org/apache/spark/network/netty/ShuffleCopier.scala | 2 +- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 4 +++- .../org/apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- .../spark/deploy/yarn/ClientDistributedCacheManager.scala | 2 +- .../scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala | 4 ++-- 7 files changed, 10 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala index 481ff8c3e02d2..b1e1576dadc1a 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala @@ -76,7 +76,7 @@ private[spark] object ShuffleCopier extends Logging { extends FileClientHandler with Logging { override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) { - logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)"); + logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)") resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen)) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index c1c7aa70e6c92..fbd822867fa0d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -133,7 +133,7 @@ private[spark] class StagePage(parent: JobProgressUI) { summary ++

Summary Metrics for {numCompleted} Completed Tasks

++
{summaryTable.getOrElse("No tasks have reported metrics yet.")}
++ -

Tasks

++ taskTable; +

Tasks

++ taskTable headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 78a2c07204a00..9271914eb536a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -124,7 +124,9 @@ class CheckpointWriter(checkpointDir: String) extends Logging { def stop() { synchronized { - if (stopped) return ; + if (stopped) { + return + } stopped = true } executor.shutdown() diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 3f6e151d8903d..997a6dc1ecc3a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -195,7 +195,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e successed = true } finally { logDebug("finishing main") - isLastAMRetry = true; + isLastAMRetry = true if (successed) { ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } else { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 038de30de5247..49a8cfde81fdd 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -351,7 +351,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } // Command for the ApplicationMaster - var javaCommand = "java"; + var javaCommand = "java" val javaHome = System.getenv("JAVA_HOME") if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { javaCommand = Environment.JAVA_HOME.$() + "/bin/java" diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 268ab950e83a0..5f159b073f537 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -201,7 +201,7 @@ class ClientDistributedCacheManager() extends Logging { val perms = status.getPermission() val otherAction = perms.getOtherAction() if (otherAction.implies(action)) { - return true; + return true } return false } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index fb966b4784819..a4d6e1d87d127 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -107,7 +107,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S credentials.writeTokenStorageToStream(dob) ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) - var javaCommand = "java"; + var javaCommand = "java" val javaHome = System.getenv("JAVA_HOME") if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { javaCommand = Environment.JAVA_HOME.$() + "/bin/java" @@ -215,7 +215,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S return rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager] } - }); + }) proxy }