Skip to content

Commit

Permalink
Misc style changes in the 'yarn' package.
Browse files Browse the repository at this point in the history
  • Loading branch information
harveyfeng committed Nov 18, 2013
1 parent e2ebc3a commit a98f5a0
Show file tree
Hide file tree
Showing 4 changed files with 376 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,50 @@ import java.io.IOException
import java.net.Socket
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}

import scala.collection.JavaConversions._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.hadoop.yarn.api._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.Utils
import scala.collection.JavaConversions._


class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {

def this(args: ApplicationMasterArguments) = this(args, new Configuration())

private var rpc: YarnRPC = YarnRPC.create(conf)
private var resourceManager: AMRMProtocol = null
private var appAttemptId: ApplicationAttemptId = null
private var userThread: Thread = null
private var resourceManager: AMRMProtocol = _
private var appAttemptId: ApplicationAttemptId = _
private var userThread: Thread = _
private val yarnConf: YarnConfiguration = new YarnConfiguration(conf)
private val fs = FileSystem.get(yarnConf)

private var yarnAllocator: YarnAllocationHandler = null
private var isFinished:Boolean = false
private var uiAddress: String = ""
private var yarnAllocator: YarnAllocationHandler = _
private var isFinished: Boolean = false
private var uiAddress: String = _
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true


def run() {
// setup the directories so things go to yarn approved directories rather
// then user specified and /tmp
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
System.setProperty("spark.local.dir", getLocalDirs())

// use priority 30 as its higher then HDFS. Its same priority as MapReduce is using
// Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using.
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)

appAttemptId = getApplicationAttemptId()
Expand All @@ -68,9 +73,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e

// Workaround until hadoop moves to something which has
// https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line)
// ignore result
// ignore result.
// This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times
// Hence args.workerCores = numCore disabled above. Any better option ?
// Hence args.workerCores = numCore disabled above. Any better option?

// Compute number of threads for akka
//val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory()
Expand All @@ -96,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e

waitForSparkContextInitialized()

// do this after spark master is up and SparkContext is created so that we can register UI Url
// Do this after spark master is up and SparkContext is created so that we can register UI Url
val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster()

// Allocate all containers
Expand All @@ -115,12 +120,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.getOrElse(Option(System.getenv("LOCAL_DIRS"))
.getOrElse(""))
.getOrElse(""))

if (localDirs.isEmpty()) {
throw new Exception("Yarn Local dirs can't be empty")
}
return localDirs
localDirs
}

private def getApplicationAttemptId(): ApplicationAttemptId = {
Expand All @@ -129,28 +134,29 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
val containerId = ConverterUtils.toContainerId(containerIdString)
val appAttemptId = containerId.getApplicationAttemptId()
logInfo("ApplicationAttemptId: " + appAttemptId)
return appAttemptId
appAttemptId
}

private def registerWithResourceManager(): AMRMProtocol = {
val rmAddress = NetUtils.createSocketAddr(yarnConf.get(
YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
logInfo("Connecting to ResourceManager at " + rmAddress)
return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
}

private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
logInfo("Registering the ApplicationMaster")
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId)
// Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
// Setting this to master host,port - so that the ApplicationReport at client has some
// sensible info.
// Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
appMasterRequest.setTrackingUrl(uiAddress)
return resourceManager.registerApplicationMaster(appMasterRequest)
resourceManager.registerApplicationMaster(appMasterRequest)
}

private def waitForSparkMaster() {
Expand All @@ -164,21 +170,25 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
try {
val socket = new Socket(driverHost, driverPort.toInt)
socket.close()
logInfo("Driver now available: " + driverHost + ":" + driverPort)
logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
driverUp = true
} catch {
case e: Exception =>
logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying")
Thread.sleep(100)
tries = tries + 1
case e: Exception => {
logWarning("Failed to connect to driver at %s:%s, retrying ...").
format(driverHost, driverPort)
Thread.sleep(100)
tries = tries + 1
}
}
}
}

private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader)
.getMethod("main", classOf[Array[String]])
val mainMethod = Class.forName(
args.userClass,
false /* initialize */,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
var successed = false
Expand All @@ -203,7 +213,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
}
t.start()
return t
t
}

// this need to happen before allocateWorkers
Expand All @@ -225,12 +235,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e

if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIAddress
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
sparkContext.preferredNodeLocationData)
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
resourceManager,
appAttemptId,
args,
sparkContext.preferredNodeLocationData)
} else {
logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
", numTries = " + numTries)
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".
format(count * waitTime, numTries))
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
resourceManager,
appAttemptId,
args)
}
}
} finally {
Expand All @@ -246,35 +264,37 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
// If user thread exists, then quit !
userThread.isAlive) {

this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
// Exists the loop if the user thread exits.
while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) {
val numContainersToAllocate = math.max(
args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)
this.yarnAllocator.allocateContainers(numContainersToAllocate)
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
} finally {
// in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
// so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All workers have launched.")

// Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
// Launch a progress reporter thread, else the app will get killed after expiration
// (def: 10mins) timeout.
// TODO(harvey): Verify the timeout
if (userThread.isAlive) {
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.

// Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// must be <= timeoutInterval/ 2.
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
// Must be <= timeoutInterval/ 2.
// On other hand, also ensure that we are reasonably responsive without causing too many
// requests to RM. So, at least 1 minute or timeoutInterval / 10 - whichever is higher.
val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
launchReporterThread(interval)
}
}

// TODO: We might want to extend this to allocate more containers in case they die !
// TODO: We might want to extend this to allocate more containers in case they die.
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime

Expand All @@ -283,24 +303,25 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
while (userThread.isAlive) {
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingWorkerCount))
yarnAllocator.allocateContainers(missingWorkerCount)
}
else sendProgress()
Thread.sleep(sleepTime)
}
}
}
// setting to daemon status, though this is usually not a good idea.
// Setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.start()
logInfo("Started progress reporter thread - sleep time : " + sleepTime)
return t
t
}

private def sendProgress() {
logDebug("Sending progress")
// simulated with an allocate request with no nodes requested ...
// Simulated with an allocate request with no nodes requested ...
yarnAllocator.allocateContainers(0)
}

Expand All @@ -320,7 +341,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
*/

def finishApplicationMaster(status: FinalApplicationStatus) {

synchronized {
if (isFinished) {
return
Expand All @@ -333,14 +353,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
// set tracking url to empty since we don't have a history server
// Set tracking url to empty since we don't have a history server.
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)

}

/**
* clean up the staging directory.
* Clean up the staging directory.
*/
private def cleanupStagingDir() {
var stagingDirPath: Path = null
Expand All @@ -356,13 +375,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
fs.delete(stagingDirPath, true)
}
} catch {
case e: IOException =>
logError("Failed to cleanup staging dir " + stagingDirPath, e)
case ioe: IOException =>
logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
}
}

// The shutdown hook that runs when a signal is received AND during normal
// close of the JVM.
// The shutdown hook that runs when a signal is received AND during normal close of the JVM.
class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable {

def run() {
Expand All @@ -372,15 +390,14 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir()
}
}

}

object ApplicationMaster {
// number of times to wait for the allocator loop to complete.
// each loop iteration waits for 100ms, so maximum of 3 seconds.
// Number of times to wait for the allocator loop to complete.
// Each loop iteration waits for 100ms, so maximum of 3 seconds.
// This is to ensure that we have reasonable number of containers before we start
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be optimal as more
// containers are available. Might need to handle this better.
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better.
private val ALLOCATOR_LOOP_WAIT_COUNT = 30
def incrementAllocatorLoop(by: Int) {
val count = yarnAllocatorLoop.getAndAdd(by)
Expand All @@ -398,7 +415,8 @@ object ApplicationMaster {
applicationMasters.add(master)
}

val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null)
val sparkContextRef: AtomicReference[SparkContext] =
new AtomicReference[SparkContext](null /* initialValue */)
val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)

def sparkContextInitialized(sc: SparkContext): Boolean = {
Expand All @@ -408,27 +426,29 @@ object ApplicationMaster {
sparkContextRef.notifyAll()
}

// Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit
// Should not really have to do this, but it helps yarn to evict resources earlier.
// not to mention, prevent Client declaring failure even though we exit'ed properly.
// Note that this will unfortunately not properly clean up the staging files because it gets called to
// late and the filesystem is already shutdown.
// Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
// System.exit.
// Should not really have to do this, but it helps YARN to evict resources earlier.
// Not to mention, prevent the Client from declaring failure even though we exited properly.
// Note that this will unfortunately not properly clean up the staging files because it gets
// called too late, after the filesystem is already shutdown.
if (modified) {
Runtime.getRuntime().addShutdownHook(new Thread with Logging {
// This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run'
// This is not only logs, but also ensures that log system is initialized for this instance
// when we are actually 'run'-ing.
logInfo("Adding shutdown hook for context " + sc)
override def run() {
logInfo("Invoking sc stop from shutdown hook")
sc.stop()
// best case ...
// Best case ...
for (master <- applicationMasters) {
master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
}
}
} )
}

// Wait for initialization to complete and atleast 'some' nodes can get allocated
// Wait for initialization to complete and atleast 'some' nodes can get allocated.
yarnAllocatorLoop.synchronized {
while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
yarnAllocatorLoop.wait(1000L)
Expand Down
Loading

0 comments on commit a98f5a0

Please sign in to comment.