Skip to content

Commit

Permalink
Merge branch 'master' into yarn-cleanup
Browse files Browse the repository at this point in the history
Conflicts:
	yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
	yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
	yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
	yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
  • Loading branch information
harveyfeng committed Nov 21, 2013
2 parents a98f5a0 + 2fead51 commit 9eae80f
Show file tree
Hide file tree
Showing 30 changed files with 241 additions and 161 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] object ShuffleCopier extends Logging {
extends FileClientHandler with Logging {

override def handle(ctx: ChannelHandlerContext, in: ByteBuf, header: FileHeader) {
logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)");
logDebug("Received Block: " + header.blockId + " (" + header.fileLen + "B)")
resultCollectCallBack(header.blockId, header.fileLen.toLong, in.readBytes(header.fileLen))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
override def compute(split: Partition, context: TaskContext) = {
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
}

override def getDependencies: Seq[Dependency[_]] = List(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
extends NarrowDependency[T](rdd) {

@transient
val partitions: Array[Partition] = rdd.partitions.zipWithIndex
.filter(s => partitionFilterFunc(s._2))
val partitions: Array[Partition] = rdd.partitions
.filter(s => partitionFilterFunc(s.index)).zipWithIndex
.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }

override def getParents(partitionId: Int) = List(partitions(partitionId).index)
override def getParents(partitionId: Int) = {
List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
}

override def stop() {
stopExecutors()
try {
if (driverActor != null) {
val future = driverActor.ask(StopDriver)(timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ private[spark] class SimrSchedulerBackend(
val conf = new Configuration()
val fs = FileSystem.get(conf)
fs.delete(new Path(driverFilePath), false)
super.stopExecutors()
super.stop()
}
}
23 changes: 12 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
</tr>
}

val execInfo = for (b <- 0 until storageStatusList.size) yield getExecInfo(b)
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execTable = UIUtils.listingTable(execHead, execRow, execInfo)

val content =
Expand All @@ -99,16 +99,17 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
UIUtils.headerSparkPage(content, sc, "Executors (" + execInfo.size + ")", Executors)
}

def getExecInfo(a: Int): Seq[String] = {
val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId
val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort
val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size).getOrElse(0)
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0)
def getExecInfo(statusId: Int): Seq[String] = {
val status = sc.getExecutorStorageStatus(statusId)
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
val rddBlocks = status.blocks.size.toString
val memUsed = status.memUsed().toString
val maxMem = status.maxMem.toString
val diskUsed = status.diskUsed().toString
val activeTasks = listener.executorToTasksActive.getOrElse(execId, HashSet.empty[Long]).size
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks

Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
summary ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Tasks</h4> ++ taskTable;
<h4>Tasks</h4> ++ taskTable

headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trait LocalSparkContext extends BeforeAndAfterEach with BeforeAndAfterAll { self
@transient var sc: SparkContext = _

override def beforeAll() {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory())
super.beforeAll()
}

Expand Down

This file was deleted.

10 changes: 5 additions & 5 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ class PartitioningSuite extends FunSuite with SharedSparkContext {
.filter(_ >= 0.0)

// Run the partitions, including the consecutive empty ones, through StatCounter
val stats: StatCounter = rdd.stats();
assert(abs(6.0 - stats.sum) < 0.01);
assert(abs(6.0/2 - rdd.mean) < 0.01);
assert(abs(1.0 - rdd.variance) < 0.01);
assert(abs(1.0 - rdd.stdev) < 0.01);
val stats: StatCounter = rdd.stats()
assert(abs(6.0 - stats.sum) < 0.01)
assert(abs(6.0/2 - rdd.mean) < 0.01)
assert(abs(1.0 - rdd.variance) < 0.01)
assert(abs(1.0 - rdd.stdev) < 0.01)

// Add other tests here for classes that should be able to handle empty partitions correctly
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.rdd

import org.scalatest.FunSuite
import org.apache.spark.{TaskContext, Partition, SharedSparkContext}


class PartitionPruningRDDSuite extends FunSuite with SharedSparkContext {


test("Pruned Partitions inherit locality prefs correctly") {

val rdd = new RDD[Int](sc, Nil) {
override protected def getPartitions = {
Array[Partition](
new TestPartition(0, 1),
new TestPartition(1, 1),
new TestPartition(2, 1))
}

def compute(split: Partition, context: TaskContext) = {
Iterator()
}
}
val prunedRDD = PartitionPruningRDD.create(rdd, {
x => if (x == 2) true else false
})
assert(prunedRDD.partitions.length == 1)
val p = prunedRDD.partitions(0)
assert(p.index == 0)
assert(p.asInstanceOf[PartitionPruningRDDPartition].parentSplit.index == 2)
}


test("Pruned Partitions can be unioned ") {

val rdd = new RDD[Int](sc, Nil) {
override protected def getPartitions = {
Array[Partition](
new TestPartition(0, 4),
new TestPartition(1, 5),
new TestPartition(2, 6))
}

def compute(split: Partition, context: TaskContext) = {
List(split.asInstanceOf[TestPartition].testValue).iterator
}
}
val prunedRDD1 = PartitionPruningRDD.create(rdd, {
x => if (x == 0) true else false
})

val prunedRDD2 = PartitionPruningRDD.create(rdd, {
x => if (x == 2) true else false
})

val merged = prunedRDD1 ++ prunedRDD2
assert(merged.count() == 2)
val take = merged.take(2)
assert(take.apply(0) == 4)
assert(take.apply(1) == 6)
}
}

class TestPartition(i: Int, value: Int) extends Partition with Serializable {
def index = i

def testValue = this.value

}
2 changes: 2 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ System Properties:
* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
* 'spark.yarn.submit.file.replication', the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
* 'spark.yarn.preserve.staging.files', set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
* 'spark.yarn.scheduler.heartbeat.interval-ms', the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds.
* 'spark.yarn.max.worker.failures', the maximum number of worker failures before failing the application. Default is the number of workers requested times 2 with minimum of 3.

# Launching Spark on YARN

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ object BroadcastTest {
System.setProperty("spark.broadcast.factory", "org.apache.spark.broadcast." + bcName + "BroadcastFactory")
System.setProperty("spark.broadcast.blockSize", blockSize)

val sc = new SparkContext(args(0), "Broadcast Test 2",
val sc = new SparkContext(args(0), "Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))

val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000

var arr1 = new Array[Int](num)
val arr1 = new Array[Int](num)
for (i <- 0 until arr1.length) {
arr1(i) = i
}
Expand All @@ -48,9 +48,9 @@ object BroadcastTest {
println("===========")
val startTime = System.nanoTime
val barr1 = sc.broadcast(arr1)
sc.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size)
}
val observedSizes = sc.parallelize(1 to 10, slices).map(_ => barr1.value.size)
// Collect the small RDD so we can print the observed sizes locally.
observedSizes.collect().foreach(i => println(i))
println("Iteration %d took %.0f milliseconds".format(i, (System.nanoTime - startTime) / 1E6))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object LocalALS {
System.exit(1)
}
}
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)

val R = generateR()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,38 @@
package org.apache.spark.examples

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object MultiBroadcastTest {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: BroadcastTest <master> [<slices>] [numElem]")
System.err.println("Usage: MultiBroadcastTest <master> [<slices>] [numElem]")
System.exit(1)
}

val sc = new SparkContext(args(0), "Broadcast Test",
val sc = new SparkContext(args(0), "Multi-Broadcast Test",
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))

val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000

var arr1 = new Array[Int](num)
val arr1 = new Array[Int](num)
for (i <- 0 until arr1.length) {
arr1(i) = i
}

var arr2 = new Array[Int](num)
val arr2 = new Array[Int](num)
for (i <- 0 until arr2.length) {
arr2(i) = i
}

val barr1 = sc.broadcast(arr1)
val barr2 = sc.broadcast(arr2)
sc.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size + barr2.value.size)
val observedSizes: RDD[(Int, Int)] = sc.parallelize(1 to 10, slices).map { _ =>
(barr1.value.size, barr2.value.size)
}
// Collect the small RDD so we can print the observed sizes locally.
observedSizes.collect().foreach(i => println(i))

System.exit(0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object SparkTC {
oldCount = nextCount
// Perform the join, obtaining an RDD of (y, (z, x)) pairs,
// then project the result to obtain the new (x, z) paths.
tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache();
tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct().cache()
nextCount = tc.count()
} while (nextCount != oldCount)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object FeederActor {

println("Feeder started as:" + feeder)

actorSystem.awaitTermination();
actorSystem.awaitTermination()
}
}

Expand Down
Loading

0 comments on commit 9eae80f

Please sign in to comment.