Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24615-NG2]ResourceInformation API design #14

Open
wants to merge 1 commit into
base: SPARK-24615-NG1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Developer API to describe the information of resources, like GPU, FPGA and so on. This
* information will be provided to TaskInfo, task could leverage such information to schedule
* embedded jobs like MPI which requires additional resource information.
*
* @param tpe The type of resource, like "/cpu", "/gpu/k80", "/gpu/p100".
* @param id The id of resource if provided, such as id of GPU card.
* @param spec The detailed information of resource if provided, such as GPU spec, VRAM size
* and so on.
*/
@DeveloperApi
case class ResourceInformation(tpe: String, id: String = "N/A", spec: String = "N/A") {

private[spark] var occupiedByTask: Long = ResourceInformation.UNUSED

override def toString: String = s"$tpe id: $id and spec: $spec"
}

private[spark] object ResourceInformation {
val UNUSED = -1L
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster

import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
import org.apache.spark.scheduler.ResourceInformation

/**
* Grouping of data for an executor used by CoarseGrainedSchedulerBackend.
Expand All @@ -29,10 +30,12 @@ import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
* @param totalCores The total number of cores available to the executor
*/
private[cluster] class ExecutorData(
val executorEndpoint: RpcEndpointRef,
val executorAddress: RpcAddress,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
val executorEndpoint: RpcEndpointRef,
val executorAddress: RpcAddress,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String],
override val resources: Array[ResourceInformation] = Array.empty)
extends ExecutorInfo(executorHost, totalCores, logUrlMap, resources) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@
package org.apache.spark.scheduler.cluster

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler.ResourceInformation

/**
* :: DeveloperApi ::
* Stores information about an executor to pass from the scheduler to SparkListeners.
*/
@DeveloperApi
class ExecutorInfo(
val executorHost: String,
val totalCores: Int,
val logUrlMap: Map[String, String]) {
val executorHost: String,
val totalCores: Int,
val logUrlMap: Map[String, String],
val resources: Array[ResourceInformation] = Array.empty) {

def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]

Expand All @@ -35,12 +37,13 @@ class ExecutorInfo(
(that canEqual this) &&
executorHost == that.executorHost &&
totalCores == that.totalCores &&
logUrlMap == that.logUrlMap
logUrlMap == that.logUrlMap &&
resources.toSet == that.resources.toSet
case _ => false
}

override def hashCode(): Int = {
val state = Seq(executorHost, totalCores, logUrlMap)
val state = Seq(executorHost, totalCores, logUrlMap) ++ resources
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
21 changes: 19 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,14 @@ private[spark] object JsonProtocol {
def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
("Host" -> executorInfo.executorHost) ~
("Total Cores" -> executorInfo.totalCores) ~
("Log Urls" -> mapToJson(executorInfo.logUrlMap))
("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
("Resources" -> JArray(executorInfo.resources.map(resourceInformationToJson).toList))
}

def resourceInformationToJson(res: ResourceInformation): JValue = {
("Type" -> res.tpe) ~
("ID" -> res.id) ~
("Spec" -> res.spec)
}

def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = {
Expand Down Expand Up @@ -1012,7 +1019,17 @@ private[spark] object JsonProtocol {
val executorHost = (json \ "Host").extract[String]
val totalCores = (json \ "Total Cores").extract[Int]
val logUrls = mapFromJson(json \ "Log Urls").toMap
new ExecutorInfo(executorHost, totalCores, logUrls)
val resources = jsonOption(json \ "Resources").map { l =>
l.extract[List[JValue]].map(resourceInformationFromJson).toArray
}.getOrElse(Array.empty)
new ExecutorInfo(executorHost, totalCores, logUrls, resources)
}

def resourceInformationFromJson(json: JValue): ResourceInformation = {
val tpe = (json \ "Type").extract[String]
val id = (json \ "ID").extract[String]
val spec = (json \ "Spec").extract[String]
ResourceInformation(tpe, id, spec)
}

def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {
Expand Down
34 changes: 30 additions & 4 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ class JsonProtocolSuite extends SparkFunSuite {
val applicationStartWithLogs = SparkListenerApplicationStart("The winner of all", Some("appId"),
42L, "Garfield", Some("appAttempt"), Some(logUrlMap))
val applicationEnd = SparkListenerApplicationEnd(42L)
val resources = Array(ResourceInformation("/gpu/k80", "0"),
ResourceInformation("/gpu/p100", "1"))
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, resources))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
val executorBlacklisted = SparkListenerExecutorBlacklisted(executorBlacklistedTime, "exec1", 22)
val executorUnblacklisted =
Expand Down Expand Up @@ -134,7 +136,9 @@ class JsonProtocolSuite extends SparkFunSuite {
testTaskMetrics(makeTaskMetrics(
33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap))
val resources = Array(ResourceInformation("/gpu/k80", "0"),
ResourceInformation("/gpu/p100", "1"))
testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap, resources))

// StorageLevel
testStorageLevel(StorageLevel.NONE)
Expand Down Expand Up @@ -436,9 +440,18 @@ class JsonProtocolSuite extends SparkFunSuite {
testAccumValue(Some("anything"), 123, JString("123"))
}

test("ExecutorInfo backward compatibility") {
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val resources = Array(ResourceInformation("/gpu/k80", "0"),
ResourceInformation("/gpu/p100", "1"))
val executorInfo = new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, resources)
val oldExecutorInfo = JsonProtocol.executorInfoToJson(executorInfo)
.removeField({_._1 == "Resources"})
val expectedExecutorInfo = new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, Array.empty)
assertEquals(expectedExecutorInfo, JsonProtocol.executorInfoFromJson(oldExecutorInfo))
}
}


private[spark] object JsonProtocolSuite extends Assertions {
import InternalAccumulator._

Expand Down Expand Up @@ -620,6 +633,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) {
assert(info1.executorHost == info2.executorHost)
assert(info1.totalCores == info2.totalCores)
assert(info1.resources.toSet === info2.resources.toSet)
}

private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
Expand Down Expand Up @@ -1782,7 +1796,19 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Log Urls" : {
| "stderr" : "mystderr",
| "stdout" : "mystdout"
| }
| },
| "Resources": [
| {
| "Type": "/gpu/k80",
| "ID": "0",
| "Spec": "N/A"
| },
| {
| "Type": "/gpu/p100",
| "ID": "1",
| "Spec": "N/A"
| }
| ]
| }
|}
""".stripMargin
Expand Down