diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResourceInformation.scala b/core/src/main/scala/org/apache/spark/scheduler/ResourceInformation.scala new file mode 100644 index 0000000000000..d255724d22e61 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/ResourceInformation.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + * Developer API to describe the information of resources, like GPU, FPGA and so on. This + * information will be provided to TaskInfo, task could leverage such information to schedule + * embedded jobs like MPI which requires additional resource information. + * + * @param tpe The type of resource, like "/cpu", "/gpu/k80", "/gpu/p100". + * @param id The id of resource if provided, such as id of GPU card. + * @param spec The detailed information of resource if provided, such as GPU spec, VRAM size + * and so on. + */ +@DeveloperApi +case class ResourceInformation(tpe: String, id: String = "N/A", spec: String = "N/A") { + + private[spark] var occupiedByTask: Long = ResourceInformation.UNUSED + + override def toString: String = s"$tpe id: $id and spec: $spec" +} + +private[spark] object ResourceInformation { + val UNUSED = -1L +} + diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index b25a4bfb501fb..4c8b6937a652b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler.cluster import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} +import org.apache.spark.scheduler.ResourceInformation /** * Grouping of data for an executor used by CoarseGrainedSchedulerBackend. @@ -29,10 +30,12 @@ import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} * @param totalCores The total number of cores available to the executor */ private[cluster] class ExecutorData( - val executorEndpoint: RpcEndpointRef, - val executorAddress: RpcAddress, - override val executorHost: String, - var freeCores: Int, - override val totalCores: Int, - override val logUrlMap: Map[String, String] -) extends ExecutorInfo(executorHost, totalCores, logUrlMap) + val executorEndpoint: RpcEndpointRef, + val executorAddress: RpcAddress, + override val executorHost: String, + var freeCores: Int, + override val totalCores: Int, + override val logUrlMap: Map[String, String], + override val resources: Array[ResourceInformation] = Array.empty) + extends ExecutorInfo(executorHost, totalCores, logUrlMap, resources) { +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala index 7f218566146a1..9bbe6db38acc1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.scheduler.ResourceInformation /** * :: DeveloperApi :: @@ -24,9 +25,10 @@ import org.apache.spark.annotation.DeveloperApi */ @DeveloperApi class ExecutorInfo( - val executorHost: String, - val totalCores: Int, - val logUrlMap: Map[String, String]) { + val executorHost: String, + val totalCores: Int, + val logUrlMap: Map[String, String], + val resources: Array[ResourceInformation] = Array.empty) { def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo] @@ -35,12 +37,13 @@ class ExecutorInfo( (that canEqual this) && executorHost == that.executorHost && totalCores == that.totalCores && - logUrlMap == that.logUrlMap + logUrlMap == that.logUrlMap && + resources.toSet == that.resources.toSet case _ => false } override def hashCode(): Int = { - val state = Seq(executorHost, totalCores, logUrlMap) + val state = Seq(executorHost, totalCores, logUrlMap) ++ resources state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) } } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 50c6461373dee..e5df90244626c 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -463,7 +463,14 @@ private[spark] object JsonProtocol { def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { ("Host" -> executorInfo.executorHost) ~ ("Total Cores" -> executorInfo.totalCores) ~ - ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) + ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~ + ("Resources" -> JArray(executorInfo.resources.map(resourceInformationToJson).toList)) + } + + def resourceInformationToJson(res: ResourceInformation): JValue = { + ("Type" -> res.tpe) ~ + ("ID" -> res.id) ~ + ("Spec" -> res.spec) } def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = { @@ -1012,7 +1019,17 @@ private[spark] object JsonProtocol { val executorHost = (json \ "Host").extract[String] val totalCores = (json \ "Total Cores").extract[Int] val logUrls = mapFromJson(json \ "Log Urls").toMap - new ExecutorInfo(executorHost, totalCores, logUrls) + val resources = jsonOption(json \ "Resources").map { l => + l.extract[List[JValue]].map(resourceInformationFromJson).toArray + }.getOrElse(Array.empty) + new ExecutorInfo(executorHost, totalCores, logUrls, resources) + } + + def resourceInformationFromJson(json: JValue): ResourceInformation = { + val tpe = (json \ "Type").extract[String] + val id = (json \ "ID").extract[String] + val spec = (json \ "Spec").extract[String] + ResourceInformation(tpe, id, spec) } def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = { diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 74b72d940eeef..a897f79e8828f 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -79,8 +79,10 @@ class JsonProtocolSuite extends SparkFunSuite { val applicationStartWithLogs = SparkListenerApplicationStart("The winner of all", Some("appId"), 42L, "Garfield", Some("appAttempt"), Some(logUrlMap)) val applicationEnd = SparkListenerApplicationEnd(42L) + val resources = Array(ResourceInformation("/gpu/k80", "0"), + ResourceInformation("/gpu/p100", "1")) val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1", - new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap)) + new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, resources)) val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason") val executorBlacklisted = SparkListenerExecutorBlacklisted(executorBlacklistedTime, "exec1", 22) val executorUnblacklisted = @@ -134,7 +136,9 @@ class JsonProtocolSuite extends SparkFunSuite { testTaskMetrics(makeTaskMetrics( 33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500)) - testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap)) + val resources = Array(ResourceInformation("/gpu/k80", "0"), + ResourceInformation("/gpu/p100", "1")) + testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap, resources)) // StorageLevel testStorageLevel(StorageLevel.NONE) @@ -436,9 +440,18 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), 123, JString("123")) } + test("ExecutorInfo backward compatibility") { + val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap + val resources = Array(ResourceInformation("/gpu/k80", "0"), + ResourceInformation("/gpu/p100", "1")) + val executorInfo = new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, resources) + val oldExecutorInfo = JsonProtocol.executorInfoToJson(executorInfo) + .removeField({_._1 == "Resources"}) + val expectedExecutorInfo = new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, Array.empty) + assertEquals(expectedExecutorInfo, JsonProtocol.executorInfoFromJson(oldExecutorInfo)) + } } - private[spark] object JsonProtocolSuite extends Assertions { import InternalAccumulator._ @@ -620,6 +633,7 @@ private[spark] object JsonProtocolSuite extends Assertions { private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) { assert(info1.executorHost == info2.executorHost) assert(info1.totalCores == info2.totalCores) + assert(info1.resources.toSet === info2.resources.toSet) } private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) { @@ -1782,7 +1796,19 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Log Urls" : { | "stderr" : "mystderr", | "stdout" : "mystdout" - | } + | }, + | "Resources": [ + | { + | "Type": "/gpu/k80", + | "ID": "0", + | "Spec": "N/A" + | }, + | { + | "Type": "/gpu/p100", + | "ID": "1", + | "Spec": "N/A" + | } + | ] | } |} """.stripMargin