Skip to content

Commit

Permalink
A few more style fixes in yarn package.
Browse files Browse the repository at this point in the history
  • Loading branch information
harveyfeng committed Nov 24, 2013
1 parent 9eae80f commit a67ebf4
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
driverUp = true
} catch {
case e: Exception => {
logWarning("Failed to connect to driver at %s:%s, retrying ...").
format(driverHost, driverPort)
logWarning("Failed to connect to driver at %s:%s, retrying ...".
format(driverHost, driverPort))
Thread.sleep(100)
tries = tries + 1
}
Expand Down
17 changes: 9 additions & 8 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}

def validateArgs() = {
Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
Map(
(System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
(args.userJar == null) -> "Error: You must specify a user jar!",
(args.userClass == null) -> "Error: You must specify a user class!",
(args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD),
(args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString()))
.foreach { case(cond, errStr) =>
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be " +
"greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
(args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " +
"must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD)
).foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
args.printUsageAndExit(1)
Expand All @@ -120,7 +121,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
queueInfo.getCurrentCapacity,
queueInfo.getMaximumCapacity,
queueInfo.getApplications.size,
queueInfo.getChildQueues.size)
queueInfo.getChildQueues.size))
}

def verifyClusterResources(app: GetNewApplicationResponse) = {
Expand Down Expand Up @@ -242,7 +243,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var localURI = new URI(localPath)
// if not specified assume these are in the local filesystem to keep behavior like Hadoop
if (localURI.getScheme() == null) {
localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString())
localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString)
}
val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
// make it more proactive and decoupled.

// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for more info
// on how we are requesting for containers.
// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
// more info on how we are requesting for containers.
private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val resourceManager: AMRMProtocol,
Expand Down Expand Up @@ -105,13 +105,20 @@ private[yarn] class YarnAllocationHandler(
val amResp = allocateWorkerResources(workersToRequest).getAMResponse

val _allocatedContainers = amResp.getAllocatedContainers()
if (_allocatedContainers.size > 0) {


logDebug("Allocated " + _allocatedContainers.size + " containers, current count " +
numWorkersRunning.get() + ", to-be-released " + releasedContainerList +
", pendingReleaseContainers : " + pendingReleaseContainers)
logDebug("Cluster Resources: " + amResp.getAvailableResources)
if (_allocatedContainers.size > 0) {
logDebug("""
Allocated containers: %d
Current worker count: %d
Containers released: %s
Containers to be released: %s
Cluster resources: %s
""".format(
_allocatedContainers.size,
numWorkersRunning.get(),
releasedContainerList,
pendingReleaseContainers,
amResp.getAvailableResources))

val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()

Expand Down Expand Up @@ -150,9 +157,10 @@ private[yarn] class YarnAllocationHandler(
}
else if (requiredHostCount > 0) {
// Container list has more containers than we need for data locality.
// Split into two : data local container count of (remainingContainers.size - requiredHostCount)
// and rest as remainingContainer
val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount)
// Split into two : data local container count of (remainingContainers.size -
// requiredHostCount) and rest as remainingContainer
val (dataLocal, remaining) = remainingContainers.splitAt(
remainingContainers.size - requiredHostCount)
dataLocalContainers.put(candidateHost, dataLocal)
// remainingContainers = remaining

Expand Down Expand Up @@ -181,8 +189,8 @@ private[yarn] class YarnAllocationHandler(
}
else if (requiredRackCount > 0) {
// container list has more containers than we need for data locality.
// Split into two : data local container count of (remainingContainers.size - requiredRackCount)
// and rest as remainingContainer
// Split into two : data local container count of (remainingContainers.size -
// requiredRackCount) and rest as remainingContainer
val (rackLocal, remaining) = remainingContainers.splitAt(
remainingContainers.size - requiredRackCount)
val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
Expand Down Expand Up @@ -216,11 +224,12 @@ private[yarn] class YarnAllocationHandler(
val workerHostname = container.getNodeId.getHost
val containerId = container.getId

assert(container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
assert(
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))

if (numWorkersRunningNow > maxWorkers) {
logInfo("Ignoring container " + containerId + " at host " + workerHostname +
" .. we already have required number of containers")
logInfo("""Ignoring container %s at host %s, since we already have the required number of
containers for it.""".format(containerId, workerHostname))
releasedContainerList.add(containerId)
// reset counter back to old value.
numWorkersRunning.decrementAndGet()
Expand All @@ -245,7 +254,9 @@ private[yarn] class YarnAllocationHandler(

containerSet += containerId
allocatedContainerToHostMap.put(containerId, workerHostname)
if (rack != null) allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
if (rack != null) {
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}

new Thread(
Expand All @@ -254,17 +265,23 @@ private[yarn] class YarnAllocationHandler(
).start()
}
}
logDebug("After allocated " + allocatedContainers.size + " containers (orig : " +
_allocatedContainers.size + "), current count " + numWorkersRunning.get() +
", to-be-released " + releasedContainerList + ", pendingReleaseContainers : " + pendingReleaseContainers)
logDebug("""
Finished processing %d containers.
Current number of workers running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
allocatedContainers.size,
numWorkersRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}


val completedContainers = amResp.getCompletedContainersStatuses()
if (completedContainers.size > 0){
logDebug("Completed " + completedContainers.size + " containers, current count " + numWorkersRunning.get() +
", to-be-released " + releasedContainerList + ", pendingReleaseContainers : " + pendingReleaseContainers)

logDebug("Completed %d containers, to-be-released: %s".format(
completedContainers.size, releasedContainerList))
for (completedContainer <- completedContainers){
val containerId = completedContainer.getContainerId

Expand All @@ -275,9 +292,10 @@ private[yarn] class YarnAllocationHandler(
else {
// Simply decrement count - next iteration of ReporterThread will take care of allocating.
numWorkersRunning.decrementAndGet()
logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
" httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())

logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,
completedContainer.getState,
completedContainer.getExitStatus()))
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit
Expand Down Expand Up @@ -311,9 +329,16 @@ private[yarn] class YarnAllocationHandler(
}
}
}
logDebug("After completed " + completedContainers.size + " containers, current count " +
numWorkersRunning.get() + ", to-be-released " + releasedContainerList +
", pendingReleaseContainers : " + pendingReleaseContainers)
logDebug("""
Finished processing %d completed containers.
Current number of workers running: %d,
releasedContainerList: %s,
pendingReleaseContainers: %s
""".format(
completedContainers.size,
numWorkersRunning.get(),
releasedContainerList,
pendingReleaseContainers))
}
}

Expand Down Expand Up @@ -367,7 +392,7 @@ private[yarn] class YarnAllocationHandler(

// default.
if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
logDebug("numWorkers: " + numWorkers + ", host preferences ? " + preferredHostToCount.isEmpty)
logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
resourceRequests = List(
createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
}
Expand Down Expand Up @@ -397,7 +422,7 @@ private[yarn] class YarnAllocationHandler(
YarnAllocationHandler.PRIORITY)

val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
hostContainerRequests.size() + rackContainerRequests.size() + 1)
hostContainerRequests.size + rackContainerRequests.size + 1)

containerRequests ++= hostContainerRequests
containerRequests ++= rackContainerRequests
Expand All @@ -416,20 +441,20 @@ private[yarn] class YarnAllocationHandler(
req.addAllReleases(releasedContainerList)

if (numWorkers > 0) {
logInfo("Allocating %d worker containers with %d of memory each.").format(numWorkers,
workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
}
else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
}

for (request <- resourceRequests) {
logInfo("ResourceRequest (host : %s, num containers: %d, priority = %d , capability : %s)").
logInfo("ResourceRequest (host : %s, num containers: %d, priority = %s , capability : %s)".
format(
request.getHostName,
request.getNumContainers,
request.getPriority,
request.getCapability)
request.getCapability))
}
resourceManager.allocate(req)
}
Expand Down

0 comments on commit a67ebf4

Please sign in to comment.