From 4e8701a77dff729c4e8e0ad39c16e2717c2c32fe Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Fri, 30 Apr 2021 11:39:18 -0700 Subject: [PATCH] [SPARK-35280][K8S] Promote KubernetesUtils to DeveloperApi ### What changes were proposed in this pull request? Since SPARK-22757, `KubernetesUtils` has been used as an important utility class by all K8s modules and `ExternalClusterManager`s. This PR aims to promote `KubernetesUtils` to `DeveloperApi` in order to maintain it officially in a backward compatible way at Apache Spark 3.2.0. ### Why are the changes needed? Apache Spark 3.1.1 makes `Kubernetes` module GA and provides an extensible external cluster manager framework. To have `ExternalClusterManager` for K8s environment, `KubernetesUtils` class is crucial and needs to be stable. By promoting to a subset of K8s developer API, we can maintain these more sustainable way and give a better and stable functionality to K8s users. In this PR, `Since` annotations denote the last function signature changes because these are going to become public at Apache Spark 3.2.0. | Version | Function Name | |-|-| | 2.3.0 | parsePrefixedKeyValuePairs | | 2.3.0 | requireNandDefined | | 2.3.0 | parsePrefixedKeyValuePairs | | 2.4.0 | parseMasterUrl | | 3.0.0 | requireBothOrNeitherDefined | | 3.0.0 | requireSecondIfFirstIsDefined | | 3.0.0 | selectSparkContainer | | 3.0.0 | formatPairsBundle | | 3.0.0 | formatPodState | | 3.0.0 | containersDescription | | 3.0.0 | containerStatusDescription | | 3.0.0 | formatTime | | 3.0.0 | uniqueID | | 3.0.0 | buildResourcesQuantities | | 3.0.0 | uploadAndTransformFileUris | | 3.0.0 | uploadFileUri | | 3.0.0 | requireBothOrNeitherDefined | | 3.0.0 | buildPodWithServiceAccount | | 3.0.0 | isLocalAndResolvable | | 3.1.1 | renameMainAppResource | | 3.1.1 | addOwnerReference | | 3.2.0 | loadPodFromTemplate | ### Does this PR introduce _any_ user-facing change? Yes, but this is new API additions. ### How was this patch tested? Pass the CIs. Closes #32406 from dongjoon-hyun/SPARK-35280. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../spark/deploy/k8s/KubernetesUtils.scala | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 6bc31c2a0e682..0c8d9646a2b4e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -29,6 +29,7 @@ import org.apache.commons.codec.binary.Hex import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.annotation.{DeveloperApi, Since, Unstable} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH import org.apache.spark.internal.Logging @@ -38,7 +39,14 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} import org.apache.spark.util.DependencyUtils.downloadFile import org.apache.spark.util.Utils.getHadoopFileSystem -private[spark] object KubernetesUtils extends Logging { +/** + * :: DeveloperApi :: + * + * A utility class used for K8s operations internally and for implementing ExternalClusterManagers. + */ +@Unstable +@DeveloperApi +object KubernetesUtils extends Logging { private val systemClock = new SystemClock() private lazy val RNG = new SecureRandom() @@ -51,12 +59,14 @@ private[spark] object KubernetesUtils extends Logging { * @param prefix the given property name prefix * @return a Map storing the configuration property keys and values */ + @Since("2.3.0") def parsePrefixedKeyValuePairs( sparkConf: SparkConf, prefix: String): Map[String, String] = { sparkConf.getAllWithPrefix(prefix).toMap } + @Since("3.0.0") def requireBothOrNeitherDefined( opt1: Option[_], opt2: Option[_], @@ -66,6 +76,7 @@ private[spark] object KubernetesUtils extends Logging { requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) } + @Since("3.0.0") def requireSecondIfFirstIsDefined( opt1: Option[_], opt2: Option[_], @@ -75,11 +86,13 @@ private[spark] object KubernetesUtils extends Logging { } } + @Since("2.3.0") def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } opt2.foreach { _ => require(opt1.isEmpty, errMessage) } } + @Since("3.2.0") def loadPodFromTemplate( kubernetesClient: KubernetesClient, templateFileName: String, @@ -99,6 +112,7 @@ private[spark] object KubernetesUtils extends Logging { } } + @Since("3.0.0") def selectSparkContainer(pod: Pod, containerName: Option[String]): SparkPod = { def selectNamedContainer( containers: List[Container], name: String): Option[(Container, List[Container])] = @@ -125,8 +139,10 @@ private[spark] object KubernetesUtils extends Logging { }.getOrElse(SparkPod(pod, new ContainerBuilder().build())) } + @Since("2.4.0") def parseMasterUrl(url: String): String = url.substring("k8s://".length) + @Since("3.0.0") def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = { // Use more loggable format if value is null or empty val indentStr = "\t" * indent @@ -141,6 +157,7 @@ private[spark] object KubernetesUtils extends Logging { * @param pod Pod * @return Human readable pod state */ + @Since("3.0.0") def formatPodState(pod: Pod): String = { val details = Seq[(String, String)]( // pod metadata @@ -164,6 +181,7 @@ private[spark] object KubernetesUtils extends Logging { formatPairsBundle(details) } + @Since("3.0.0") def containersDescription(p: Pod, indent: Int = 1): String = { p.getStatus.getContainerStatuses.asScala.map { status => Seq( @@ -173,6 +191,7 @@ private[spark] object KubernetesUtils extends Logging { }.map(p => formatPairsBundle(p, indent)).mkString("\n\n") } + @Since("3.0.0") def containerStatusDescription(containerStatus: ContainerStatus) : Seq[(String, String)] = { val state = containerStatus.getState @@ -200,6 +219,7 @@ private[spark] object KubernetesUtils extends Logging { }.getOrElse(Seq(("container state", "N/A"))) } + @Since("3.0.0") def formatTime(time: String): String = { if (time != null) time else "N/A" } @@ -212,6 +232,7 @@ private[spark] object KubernetesUtils extends Logging { * This avoids using a UUID for uniqueness (too long), and relying solely on the current time * (not unique enough). */ + @Since("3.0.0") def uniqueID(clock: Clock = systemClock): String = { val random = new Array[Byte](3) synchronized { @@ -228,6 +249,7 @@ private[spark] object KubernetesUtils extends Logging { * It assumes we can use the Kubernetes device plugin format: vendor-domain/resource. * It returns a set with a tuple of vendor-domain/resource and Quantity for each resource. */ + @Since("3.0.0") def buildResourcesQuantities( componentName: String, sparkConf: SparkConf): Map[String, Quantity] = { @@ -247,6 +269,7 @@ private[spark] object KubernetesUtils extends Logging { /** * Upload files and modify their uris */ + @Since("3.0.0") def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None) : Iterable[String] = { fileUris.map { uri => @@ -261,11 +284,13 @@ private[spark] object KubernetesUtils extends Logging { } } + @Since("3.0.0") def isLocalAndResolvable(resource: String): Boolean = { resource != SparkLauncher.NO_RESOURCE && isLocalDependency(Utils.resolveURI(resource)) } + @Since("3.1.1") def renameMainAppResource( resource: String, conf: Option[SparkConf] = None, @@ -281,6 +306,7 @@ private[spark] object KubernetesUtils extends Logging { } } + @Since("3.0.0") def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = { conf match { case Some(sConf) => @@ -325,6 +351,7 @@ private[spark] object KubernetesUtils extends Logging { } } + @Since("3.0.0") def buildPodWithServiceAccount(serviceAccount: Option[String], pod: SparkPod): Option[Pod] = { serviceAccount.map { account => new PodBuilder(pod.pod) @@ -338,6 +365,7 @@ private[spark] object KubernetesUtils extends Logging { // Add a OwnerReference to the given resources making the pod an owner of them so when // the pod is deleted, the resources are garbage collected. + @Since("3.1.1") def addOwnerReference(pod: Pod, resources: Seq[HasMetadata]): Unit = { if (pod != null) { val reference = new OwnerReferenceBuilder()