Skip to content

Commit

Permalink
Support driver pod kubernetes credentials mounting in V2 submission (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
mccheah authored and foxish committed Jul 24, 2017
1 parent 76c865d commit a6cebcb
Show file tree
Hide file tree
Showing 20 changed files with 632 additions and 181 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

case class KubernetesCredentials(
oauthTokenBase64: Option[String],
caCertDataBase64: Option[String],
clientKeyDataBase64: Option[String],
clientCertDataBase64: Option[String])
Original file line number Diff line number Diff line change
Expand Up @@ -120,30 +120,42 @@ package object config extends Logging {
private[spark] val KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE =
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.caCertFile")
.doc("Path on the driver pod's disk containing the CA cert file to use when authenticating" +
" against Kubernetes.")
" against Kubernetes. Typically this is configured by spark-submit from mounting a" +
" secret from the submitting machine into the pod, and hence this configuration is marked" +
" as internal, but this can also be set manually to use a certificate that is mounted" +
" into the driver pod via other means.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE =
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientKeyFile")
.doc("Path on the driver pod's disk containing the client key file to use when" +
" authenticating against Kubernetes.")
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
" mounting a secret from the submitting machine into the pod, and hence this" +
" configuration is marked as internal, but this can also be set manually to" +
" use a key file that is mounted into the driver pod via other means.")
.internal()
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE =
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientCertFile")
.doc("Path on the driver pod's disk containing the client cert file to use when" +
" authenticating against Kubernetes.")
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
" mounting a secret from the submitting machine into the pod, and hence this" +
" configuration is marked as internal, but this can also be set manually to" +
" use a certificate that is mounted into the driver pod via other means.")
.internal()
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN =
ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.oauthTokenFile")
.doc("Path on the driver pod's disk containing the OAuth token file to use when" +
" authenticating against Kubernetes.")
" authenticating against Kubernetes. Typically this is configured by spark-submit from" +
" mounting a secret from the submitting machine into the pod, and hence this" +
" configuration is marked as internal, but this can also be set manually to" +
" use a token that is mounted into the driver pod via other means.")
.internal()
.stringConf
.createOptional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ package object constants {
private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets"
private[spark] val SUBMISSION_SSL_KEY_PEM_SECRET_NAME = "spark-submission-server-key-pem"
private[spark] val SUBMISSION_SSL_CERT_PEM_SECRET_NAME = "spark-submission-server-cert-pem"
private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
"/mnt/secrets/spark-kubernetes-credentials"
private[spark] val DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME = "ca-cert"
private[spark] val DRIVER_CREDENTIALS_CA_CERT_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME = "client-key"
private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME = "client-cert"
private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME = "oauth-token"
private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH =
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"


// Default and fixed ports
private[spark] val SUBMISSION_SERVER_PORT = 7077
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v1
package org.apache.spark.deploy.kubernetes.submit

import java.io.File

import com.google.common.base.Charsets
import com.google.common.io.{BaseEncoding, Files}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.KubernetesCredentials
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials
import org.apache.spark.internal.config.OptionalConfigEntry

private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) {
Expand All @@ -38,15 +39,17 @@ private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf
require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_CERT_FILE).isEmpty,
"Cannot specify both a service account and a driver pod client cert file.")
}
val oauthToken = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN)
val oauthTokenBase64 = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).map { token =>
BaseEncoding.base64().encode(token.getBytes(Charsets.UTF_8))
}
val caCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CA_CERT_FILE,
s"Driver CA cert file provided at %s does not exist or is not a file.")
val clientKeyDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_KEY_FILE,
s"Driver client key file provided at %s does not exist or is not a file.")
val clientCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_CERT_FILE,
s"Driver client cert file provided at %s does not exist or is not a file.")
KubernetesCredentials(
oauthToken = oauthToken,
oauthTokenBase64 = oauthTokenBase64,
caCertDataBase64 = caCertDataBase64,
clientKeyDataBase64 = clientKeyDataBase64,
clientCertDataBase64 = clientCertDataBase64)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ import org.apache.commons.codec.binary.Base64
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.CompressionUtils
import org.apache.spark.deploy.kubernetes.{CompressionUtils, KubernetesCredentials}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils
import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesCredentials, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.kubernetes.submit.{DriverPodKubernetesCredentialsProvider, KubernetesFileUtils}
import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource}
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ShutdownHookManager, Utils}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ private[spark] class Client(
sparkJars: Seq[String],
sparkFiles: Seq[String],
kubernetesClientProvider: SubmissionKubernetesClientProvider,
initContainerComponentsProvider: DriverInitContainerComponentsProvider) extends Logging {
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider)
extends Logging {

private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME)
.getOrElse(kubernetesAppId)
Expand Down Expand Up @@ -133,18 +135,22 @@ private[spark] class Client(
.provideInitContainerBootstrap()
.bootstrapInitContainerAndVolumes(driverContainer.getName, basePod)

val driverOwnedResources = Seq(initContainerConfigMap) ++
maybeSubmittedDependenciesSecret.toSeq

val containerLocalizedFilesResolver = initContainerComponentsProvider
.provideContainerLocalizedFilesResolver()
val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars()
val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles()

val executorInitContainerConfiguration = initContainerComponentsProvider
.provideExecutorInitContainerConfiguration()
val resolvedSparkConf = executorInitContainerConfiguration
val sparkConfWithExecutorInit = executorInitContainerConfiguration
.configureSparkConfForExecutorInitContainer(sparkConf)
val credentialsMounter = kubernetesCredentialsMounterProvider
.getDriverPodKubernetesCredentialsMounter()
val credentialsSecret = credentialsMounter.createCredentialsSecret()
val podWithInitContainerAndMountedCreds = credentialsMounter.mountDriverKubernetesCredentials(
podWithInitContainer, driverContainer.getName, credentialsSecret)
val resolvedSparkConf = credentialsMounter.setDriverPodKubernetesCredentialLocations(
sparkConfWithExecutorInit)
if (resolvedSparkJars.nonEmpty) {
resolvedSparkConf.set("spark.jars", resolvedSparkJars.mkString(","))
}
Expand All @@ -166,7 +172,7 @@ private[spark] class Client(
val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map {
case (confKey, confValue) => s"-D$confKey=$confValue"
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
val resolvedDriverPod = podWithInitContainer.editSpec()
val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec()
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName))
.addNewEnv()
.withName(ENV_MOUNTED_CLASSPATH)
Expand All @@ -181,6 +187,9 @@ private[spark] class Client(
.build()
val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
try {
val driverOwnedResources = Seq(initContainerConfigMap) ++
maybeSubmittedDependenciesSecret.toSeq ++
credentialsSecret.toSeq
val driverPodOwnerReference = new OwnerReferenceBuilder()
.withName(createdDriverPod.getMetadata.getName)
.withApiVersion(createdDriverPod.getApiVersion)
Expand Down Expand Up @@ -261,6 +270,8 @@ private[spark] object Client {
val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl(
sparkConf, kubernetesAppId, sparkJars, sparkFiles)
val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf)
val kubernetesCredentialsMounterProvider =
new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId)
new Client(
appName,
kubernetesAppId,
Expand All @@ -270,6 +281,7 @@ private[spark] object Client {
sparkJars,
sparkFiles,
kubernetesClientProvider,
initContainerComponentsProvider).run()
initContainerComponentsProvider,
kubernetesCredentialsMounterProvider).run()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2

import io.fabric8.kubernetes.api.model.{PodBuilder, Secret, SecretBuilder}
import scala.collection.JavaConverters._

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.KubernetesCredentials
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.internal.config.OptionalConfigEntry

private[spark] trait DriverPodKubernetesCredentialsMounter {

/**
* Set fields on the Spark configuration that indicate where the driver pod is
* to find its Kubernetes credentials for requesting executors.
*/
def setDriverPodKubernetesCredentialLocations(sparkConf: SparkConf): SparkConf

/**
* Create the Kubernetes secret object that correspond to the driver's credentials
* that have to be created and mounted into the driver pod. The single Secret
* object contains all of the data entries for the driver pod's Kubernetes
* credentials. Returns empty if no secrets are to be mounted.
*/
def createCredentialsSecret(): Option[Secret]

/**
* Mount any Kubernetes credentials from the submitting machine's disk into the driver pod. The
* secret that is passed in here should have been created from createCredentialsSecret so that
* the implementation does not need to hold its state.
*/
def mountDriverKubernetesCredentials(
originalPodSpec: PodBuilder,
driverContainerName: String,
credentialsSecret: Option[Secret]): PodBuilder
}

private[spark] class DriverPodKubernetesCredentialsMounterImpl(
kubernetesAppId: String,
submitterLocalDriverPodKubernetesCredentials: KubernetesCredentials,
maybeUserSpecifiedMountedClientKeyFile: Option[String],
maybeUserSpecifiedMountedClientCertFile: Option[String],
maybeUserSpecifiedMountedOAuthTokenFile: Option[String],
maybeUserSpecifiedMountedCaCertFile: Option[String])
extends DriverPodKubernetesCredentialsMounter {

override def setDriverPodKubernetesCredentialLocations(sparkConf: SparkConf): SparkConf = {
val resolvedMountedClientKeyFile = resolveSecretLocation(
maybeUserSpecifiedMountedClientKeyFile,
submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64,
DRIVER_CREDENTIALS_CLIENT_KEY_PATH)
val resolvedMountedClientCertFile = resolveSecretLocation(
maybeUserSpecifiedMountedClientCertFile,
submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64,
DRIVER_CREDENTIALS_CLIENT_CERT_PATH)
val resolvedMountedCaCertFile = resolveSecretLocation(
maybeUserSpecifiedMountedCaCertFile,
submitterLocalDriverPodKubernetesCredentials.caCertDataBase64,
DRIVER_CREDENTIALS_CA_CERT_PATH)
val resolvedMountedOAuthTokenFile = resolveSecretLocation(
maybeUserSpecifiedMountedOAuthTokenFile,
submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64,
DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)
val sparkConfWithCredentialLocations = sparkConf.clone()
.setOption(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE, resolvedMountedCaCertFile)
.setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE, resolvedMountedClientKeyFile)
.setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE, resolvedMountedClientCertFile)
.setOption(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN, resolvedMountedOAuthTokenFile)
sparkConfWithCredentialLocations.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ =>
sparkConfWithCredentialLocations.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "<present_but_redacted>")
}
sparkConfWithCredentialLocations.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ =>
sparkConfWithCredentialLocations.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "<present_but_redacted>")
}
sparkConfWithCredentialLocations
}

override def createCredentialsSecret(): Option[Secret] = {
val allSecretData =
resolveSecretData(
maybeUserSpecifiedMountedClientKeyFile,
submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64,
DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++
resolveSecretData(
maybeUserSpecifiedMountedClientCertFile,
submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64,
DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++
resolveSecretData(
maybeUserSpecifiedMountedCaCertFile,
submitterLocalDriverPodKubernetesCredentials.caCertDataBase64,
DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++
resolveSecretData(
maybeUserSpecifiedMountedOAuthTokenFile,
submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64,
DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME)
if (allSecretData.isEmpty) {
None
} else {
Some(new SecretBuilder()
.withNewMetadata().withName(s"$kubernetesAppId-kubernetes-credentials").endMetadata()
.withData(allSecretData.asJava)
.build())
}
}

override def mountDriverKubernetesCredentials(
originalPodSpec: PodBuilder,
driverContainerName: String,
credentialsSecret: Option[Secret]): PodBuilder = {
credentialsSecret.map { secret =>
originalPodSpec.editSpec()
.addNewVolume()
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
.endVolume()
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainerName))
.addNewVolumeMount()
.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
.withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR)
.endVolumeMount()
.endContainer()
.endSpec()
}.getOrElse(originalPodSpec)
}

private def resolveSecretLocation(
mountedUserSpecified: Option[String],
valueMountedFromSubmitter: Option[String],
mountedCanonicalLocation: String): Option[String] = {
mountedUserSpecified.orElse(valueMountedFromSubmitter.map( _ => {
mountedCanonicalLocation
}))
}

private def resolveSecretData(
mountedUserSpecified: Option[String],
valueMountedFromSubmitter: Option[String],
secretName: String): Map[String, String] = {
mountedUserSpecified.map { _ => Map.empty[String, String]}
.getOrElse {
valueMountedFromSubmitter.map { valueBase64 =>
Map(secretName -> valueBase64)
}.getOrElse(Map.empty[String, String])
}
}

private implicit def augmentSparkConf(sparkConf: SparkConf): OptionSettableSparkConf = {
new OptionSettableSparkConf(sparkConf)
}
}

private class OptionSettableSparkConf(sparkConf: SparkConf) {
def setOption[T](configEntry: OptionalConfigEntry[T], option: Option[T]): SparkConf = {
option.map( opt => {
sparkConf.set(configEntry, opt)
}).getOrElse(sparkConf)
}
}
Loading

0 comments on commit a6cebcb

Please sign in to comment.