Skip to content

Commit

Permalink
[SPARK-35280][K8S] Promote KubernetesUtils to DeveloperApi
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Since SPARK-22757, `KubernetesUtils` has been used as an important utility class by all K8s modules and `ExternalClusterManager`s. This PR aims to promote `KubernetesUtils` to `DeveloperApi` in order to maintain it officially in a backward compatible way at Apache Spark 3.2.0.

### Why are the changes needed?

Apache Spark 3.1.1 makes `Kubernetes` module GA and provides an extensible external cluster manager framework. To have `ExternalClusterManager` for K8s environment, `KubernetesUtils` class is crucial and needs to be stable. By promoting to a subset of K8s developer API, we can maintain these more sustainable way and give a better and stable functionality to K8s users.

In this PR, `Since` annotations denote the last function signature changes because these are going to become public at Apache Spark 3.2.0.

| Version | Function Name |
|-|-|
| 2.3.0 | parsePrefixedKeyValuePairs |
| 2.3.0 | requireNandDefined |
| 2.3.0 | parsePrefixedKeyValuePairs |
| 2.4.0 | parseMasterUrl |
| 3.0.0 | requireBothOrNeitherDefined |
| 3.0.0 | requireSecondIfFirstIsDefined |
| 3.0.0 | selectSparkContainer |
| 3.0.0 | formatPairsBundle |
| 3.0.0 | formatPodState |
| 3.0.0 | containersDescription |
| 3.0.0 | containerStatusDescription |
| 3.0.0 | formatTime |
| 3.0.0 | uniqueID |
| 3.0.0 | buildResourcesQuantities |
| 3.0.0 | uploadAndTransformFileUris |
| 3.0.0 | uploadFileUri |
| 3.0.0 | requireBothOrNeitherDefined |
| 3.0.0 | buildPodWithServiceAccount |
| 3.0.0 | isLocalAndResolvable |
| 3.1.1 | renameMainAppResource |
| 3.1.1 | addOwnerReference |
| 3.2.0 | loadPodFromTemplate |

### Does this PR introduce _any_ user-facing change?

Yes, but this is new API additions.

### How was this patch tested?

Pass the CIs.

Closes #32406 from dongjoon-hyun/SPARK-35280.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
dongjoon-hyun committed Apr 30, 2021
1 parent 39889df commit 4e8701a
Showing 1 changed file with 29 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Since, Unstable}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
import org.apache.spark.internal.Logging
Expand All @@ -38,7 +39,14 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.DependencyUtils.downloadFile
import org.apache.spark.util.Utils.getHadoopFileSystem

private[spark] object KubernetesUtils extends Logging {
/**
* :: DeveloperApi ::
*
* A utility class used for K8s operations internally and for implementing ExternalClusterManagers.
*/
@Unstable
@DeveloperApi
object KubernetesUtils extends Logging {

private val systemClock = new SystemClock()
private lazy val RNG = new SecureRandom()
Expand All @@ -51,12 +59,14 @@ private[spark] object KubernetesUtils extends Logging {
* @param prefix the given property name prefix
* @return a Map storing the configuration property keys and values
*/
@Since("2.3.0")
def parsePrefixedKeyValuePairs(
sparkConf: SparkConf,
prefix: String): Map[String, String] = {
sparkConf.getAllWithPrefix(prefix).toMap
}

@Since("3.0.0")
def requireBothOrNeitherDefined(
opt1: Option[_],
opt2: Option[_],
Expand All @@ -66,6 +76,7 @@ private[spark] object KubernetesUtils extends Logging {
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
}

@Since("3.0.0")
def requireSecondIfFirstIsDefined(
opt1: Option[_],
opt2: Option[_],
Expand All @@ -75,11 +86,13 @@ private[spark] object KubernetesUtils extends Logging {
}
}

@Since("2.3.0")
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
opt2.foreach { _ => require(opt1.isEmpty, errMessage) }
}

@Since("3.2.0")
def loadPodFromTemplate(
kubernetesClient: KubernetesClient,
templateFileName: String,
Expand All @@ -99,6 +112,7 @@ private[spark] object KubernetesUtils extends Logging {
}
}

@Since("3.0.0")
def selectSparkContainer(pod: Pod, containerName: Option[String]): SparkPod = {
def selectNamedContainer(
containers: List[Container], name: String): Option[(Container, List[Container])] =
Expand All @@ -125,8 +139,10 @@ private[spark] object KubernetesUtils extends Logging {
}.getOrElse(SparkPod(pod, new ContainerBuilder().build()))
}

@Since("2.4.0")
def parseMasterUrl(url: String): String = url.substring("k8s://".length)

@Since("3.0.0")
def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = {
// Use more loggable format if value is null or empty
val indentStr = "\t" * indent
Expand All @@ -141,6 +157,7 @@ private[spark] object KubernetesUtils extends Logging {
* @param pod Pod
* @return Human readable pod state
*/
@Since("3.0.0")
def formatPodState(pod: Pod): String = {
val details = Seq[(String, String)](
// pod metadata
Expand All @@ -164,6 +181,7 @@ private[spark] object KubernetesUtils extends Logging {
formatPairsBundle(details)
}

@Since("3.0.0")
def containersDescription(p: Pod, indent: Int = 1): String = {
p.getStatus.getContainerStatuses.asScala.map { status =>
Seq(
Expand All @@ -173,6 +191,7 @@ private[spark] object KubernetesUtils extends Logging {
}.map(p => formatPairsBundle(p, indent)).mkString("\n\n")
}

@Since("3.0.0")
def containerStatusDescription(containerStatus: ContainerStatus)
: Seq[(String, String)] = {
val state = containerStatus.getState
Expand Down Expand Up @@ -200,6 +219,7 @@ private[spark] object KubernetesUtils extends Logging {
}.getOrElse(Seq(("container state", "N/A")))
}

@Since("3.0.0")
def formatTime(time: String): String = {
if (time != null) time else "N/A"
}
Expand All @@ -212,6 +232,7 @@ private[spark] object KubernetesUtils extends Logging {
* This avoids using a UUID for uniqueness (too long), and relying solely on the current time
* (not unique enough).
*/
@Since("3.0.0")
def uniqueID(clock: Clock = systemClock): String = {
val random = new Array[Byte](3)
synchronized {
Expand All @@ -228,6 +249,7 @@ private[spark] object KubernetesUtils extends Logging {
* It assumes we can use the Kubernetes device plugin format: vendor-domain/resource.
* It returns a set with a tuple of vendor-domain/resource and Quantity for each resource.
*/
@Since("3.0.0")
def buildResourcesQuantities(
componentName: String,
sparkConf: SparkConf): Map[String, Quantity] = {
Expand All @@ -247,6 +269,7 @@ private[spark] object KubernetesUtils extends Logging {
/**
* Upload files and modify their uris
*/
@Since("3.0.0")
def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None)
: Iterable[String] = {
fileUris.map { uri =>
Expand All @@ -261,11 +284,13 @@ private[spark] object KubernetesUtils extends Logging {
}
}

@Since("3.0.0")
def isLocalAndResolvable(resource: String): Boolean = {
resource != SparkLauncher.NO_RESOURCE &&
isLocalDependency(Utils.resolveURI(resource))
}

@Since("3.1.1")
def renameMainAppResource(
resource: String,
conf: Option[SparkConf] = None,
Expand All @@ -281,6 +306,7 @@ private[spark] object KubernetesUtils extends Logging {
}
}

@Since("3.0.0")
def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
conf match {
case Some(sConf) =>
Expand Down Expand Up @@ -325,6 +351,7 @@ private[spark] object KubernetesUtils extends Logging {
}
}

@Since("3.0.0")
def buildPodWithServiceAccount(serviceAccount: Option[String], pod: SparkPod): Option[Pod] = {
serviceAccount.map { account =>
new PodBuilder(pod.pod)
Expand All @@ -338,6 +365,7 @@ private[spark] object KubernetesUtils extends Logging {

// Add a OwnerReference to the given resources making the pod an owner of them so when
// the pod is deleted, the resources are garbage collected.
@Since("3.1.1")
def addOwnerReference(pod: Pod, resources: Seq[HasMetadata]): Unit = {
if (pod != null) {
val reference = new OwnerReferenceBuilder()
Expand Down

0 comments on commit 4e8701a

Please sign in to comment.