Skip to content

Commit

Permalink
[SPARK-6050] [yarn] Relax matching of vcore count in received contain…
Browse files Browse the repository at this point in the history
…ers.

Some YARN configurations return a vcore count for allocated
containers that does not match the requested resource. That means
Spark would always ignore those containers. So relax the the matching
of the vcore count to allow the Spark jobs to run.

Author: Marcelo Vanzin <[email protected]>

Closes #4818 from vanzin/SPARK-6050 and squashes the following commits:

991c803 [Marcelo Vanzin] Remove config option, standardize on legacy behavior (no vcore matching).
8c9c346 [Marcelo Vanzin] Restrict lax matching to vcores only.
3359692 [Marcelo Vanzin] [SPARK-6050] [yarn] Add config option to do lax resource matching.
  • Loading branch information
Marcelo Vanzin authored and tgravescs committed Mar 2, 2015
1 parent 582e5a2 commit 6b348d9
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,14 @@ private[yarn] class YarnAllocator(
location: String,
containersToUse: ArrayBuffer[Container],
remaining: ArrayBuffer[Container]): Unit = {
// SPARK-6050: certain Yarn configurations return a virtual core count that doesn't match the
// request; for example, capacity scheduler + DefaultResourceCalculator. So match on requested
// memory, but use the asked vcore count for matching, effectively disabling matching on vcore
// count.
val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory,
resource.getVirtualCores)
val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
allocatedContainer.getResource)
matchingResource)

// Match the allocation to a request
if (!matchingRequests.isEmpty) {
Expand All @@ -318,7 +324,7 @@ private[yarn] class YarnAllocator(
assert(container.getResource.getMemory >= resource.getMemory)

logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
executorIdToContainer(executorId) = container
executorIdToContainer(executorId) = container

val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])
Expand Down

0 comments on commit 6b348d9

Please sign in to comment.