diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesCredentials.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesCredentials.scala new file mode 100644 index 0000000000000..aba94e6969529 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesCredentials.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes + +case class KubernetesCredentials( + oauthTokenBase64: Option[String], + caCertDataBase64: Option[String], + clientKeyDataBase64: Option[String], + clientCertDataBase64: Option[String]) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index f0a39fe359227..45e5a46a26258 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -120,14 +120,20 @@ package object config extends Logging { private[spark] val KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE = ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.caCertFile") .doc("Path on the driver pod's disk containing the CA cert file to use when authenticating" + - " against Kubernetes.") + " against Kubernetes. Typically this is configured by spark-submit from mounting a" + + " secret from the submitting machine into the pod, and hence this configuration is marked" + + " as internal, but this can also be set manually to use a certificate that is mounted" + + " into the driver pod via other means.") .stringConf .createOptional private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE = ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientKeyFile") .doc("Path on the driver pod's disk containing the client key file to use when" + - " authenticating against Kubernetes.") + " authenticating against Kubernetes. Typically this is configured by spark-submit from" + + " mounting a secret from the submitting machine into the pod, and hence this" + + " configuration is marked as internal, but this can also be set manually to" + + " use a key file that is mounted into the driver pod via other means.") .internal() .stringConf .createOptional @@ -135,7 +141,10 @@ package object config extends Logging { private[spark] val KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE = ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.clientCertFile") .doc("Path on the driver pod's disk containing the client cert file to use when" + - " authenticating against Kubernetes.") + " authenticating against Kubernetes. Typically this is configured by spark-submit from" + + " mounting a secret from the submitting machine into the pod, and hence this" + + " configuration is marked as internal, but this can also be set manually to" + + " use a certificate that is mounted into the driver pod via other means.") .internal() .stringConf .createOptional @@ -143,7 +152,10 @@ package object config extends Logging { private[spark] val KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN = ConfigBuilder(s"$APISERVER_DRIVER_CONF_PREFIX.mounted.oauthTokenFile") .doc("Path on the driver pod's disk containing the OAuth token file to use when" + - " authenticating against Kubernetes.") + " authenticating against Kubernetes. Typically this is configured by spark-submit from" + + " mounting a secret from the submitting machine into the pod, and hence this" + + " configuration is marked as internal, but this can also be set manually to" + + " use a token that is mounted into the driver pod via other means.") .internal() .stringConf .createOptional diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index 4c4f7b9fc3b23..8d0965078aaa8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -38,6 +38,22 @@ package object constants { private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets" private[spark] val SUBMISSION_SSL_KEY_PEM_SECRET_NAME = "spark-submission-server-key-pem" private[spark] val SUBMISSION_SSL_CERT_PEM_SECRET_NAME = "spark-submission-server-cert-pem" + private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR = + "/mnt/secrets/spark-kubernetes-credentials" + private[spark] val DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME = "ca-cert" + private[spark] val DRIVER_CREDENTIALS_CA_CERT_PATH = + s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME" + private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME = "client-key" + private[spark] val DRIVER_CREDENTIALS_CLIENT_KEY_PATH = + s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME" + private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME = "client-cert" + private[spark] val DRIVER_CREDENTIALS_CLIENT_CERT_PATH = + s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME" + private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME = "oauth-token" + private[spark] val DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH = + s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME" + private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials" + // Default and fixed ports private[spark] val SUBMISSION_SERVER_PORT = 7077 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverPodKubernetesCredentialsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsProvider.scala similarity index 88% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverPodKubernetesCredentialsProvider.scala rename to resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsProvider.scala index 112226dbe3fc1..404741520c059 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/DriverPodKubernetesCredentialsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesCredentialsProvider.scala @@ -14,15 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.deploy.kubernetes.submit.v1 +package org.apache.spark.deploy.kubernetes.submit import java.io.File +import com.google.common.base.Charsets import com.google.common.io.{BaseEncoding, Files} import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.KubernetesCredentials import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials import org.apache.spark.internal.config.OptionalConfigEntry private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) { @@ -38,7 +39,9 @@ private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_CERT_FILE).isEmpty, "Cannot specify both a service account and a driver pod client cert file.") } - val oauthToken = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN) + val oauthTokenBase64 = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).map { token => + BaseEncoding.base64().encode(token.getBytes(Charsets.UTF_8)) + } val caCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CA_CERT_FILE, s"Driver CA cert file provided at %s does not exist or is not a file.") val clientKeyDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_KEY_FILE, @@ -46,7 +49,7 @@ private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf val clientCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_CERT_FILE, s"Driver client cert file provided at %s does not exist or is not a file.") KubernetesCredentials( - oauthToken = oauthToken, + oauthTokenBase64 = oauthTokenBase64, caCertDataBase64 = caCertDataBase64, clientKeyDataBase64 = clientKeyDataBase64, clientCertDataBase64 = clientCertDataBase64) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala index 8f1e356bec8ca..fa3c97c6957b5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala @@ -30,11 +30,11 @@ import org.apache.commons.codec.binary.Base64 import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.kubernetes.CompressionUtils +import org.apache.spark.deploy.kubernetes.{CompressionUtils, KubernetesCredentials} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils -import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesCredentials, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource} +import org.apache.spark.deploy.kubernetes.submit.{DriverPodKubernetesCredentialsProvider, KubernetesFileUtils} +import org.apache.spark.deploy.rest.kubernetes.v1.{AppResource, ContainerAppResource, HttpClientUtil, KubernetesCreateSubmissionRequest, KubernetesSparkRestApi, RemoteAppResource, UploadedAppResource} import org.apache.spark.internal.Logging import org.apache.spark.util.{ShutdownHookManager, Utils} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala index a403a91840bd6..da08e17dee85b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/Client.scala @@ -48,7 +48,9 @@ private[spark] class Client( sparkJars: Seq[String], sparkFiles: Seq[String], kubernetesClientProvider: SubmissionKubernetesClientProvider, - initContainerComponentsProvider: DriverInitContainerComponentsProvider) extends Logging { + initContainerComponentsProvider: DriverInitContainerComponentsProvider, + kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider) + extends Logging { private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME) .getOrElse(kubernetesAppId) @@ -133,9 +135,6 @@ private[spark] class Client( .provideInitContainerBootstrap() .bootstrapInitContainerAndVolumes(driverContainer.getName, basePod) - val driverOwnedResources = Seq(initContainerConfigMap) ++ - maybeSubmittedDependenciesSecret.toSeq - val containerLocalizedFilesResolver = initContainerComponentsProvider .provideContainerLocalizedFilesResolver() val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars() @@ -143,8 +142,15 @@ private[spark] class Client( val executorInitContainerConfiguration = initContainerComponentsProvider .provideExecutorInitContainerConfiguration() - val resolvedSparkConf = executorInitContainerConfiguration + val sparkConfWithExecutorInit = executorInitContainerConfiguration .configureSparkConfForExecutorInitContainer(sparkConf) + val credentialsMounter = kubernetesCredentialsMounterProvider + .getDriverPodKubernetesCredentialsMounter() + val credentialsSecret = credentialsMounter.createCredentialsSecret() + val podWithInitContainerAndMountedCreds = credentialsMounter.mountDriverKubernetesCredentials( + podWithInitContainer, driverContainer.getName, credentialsSecret) + val resolvedSparkConf = credentialsMounter.setDriverPodKubernetesCredentialLocations( + sparkConfWithExecutorInit) if (resolvedSparkJars.nonEmpty) { resolvedSparkConf.set("spark.jars", resolvedSparkJars.mkString(",")) } @@ -166,7 +172,7 @@ private[spark] class Client( val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map { case (confKey, confValue) => s"-D$confKey=$confValue" }.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("") - val resolvedDriverPod = podWithInitContainer.editSpec() + val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec() .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName)) .addNewEnv() .withName(ENV_MOUNTED_CLASSPATH) @@ -181,6 +187,9 @@ private[spark] class Client( .build() val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) try { + val driverOwnedResources = Seq(initContainerConfigMap) ++ + maybeSubmittedDependenciesSecret.toSeq ++ + credentialsSecret.toSeq val driverPodOwnerReference = new OwnerReferenceBuilder() .withName(createdDriverPod.getMetadata.getName) .withApiVersion(createdDriverPod.getApiVersion) @@ -261,6 +270,8 @@ private[spark] object Client { val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl( sparkConf, kubernetesAppId, sparkJars, sparkFiles) val kubernetesClientProvider = new SubmissionKubernetesClientProviderImpl(sparkConf) + val kubernetesCredentialsMounterProvider = + new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId) new Client( appName, kubernetesAppId, @@ -270,6 +281,7 @@ private[spark] object Client { sparkJars, sparkFiles, kubernetesClientProvider, - initContainerComponentsProvider).run() + initContainerComponentsProvider, + kubernetesCredentialsMounterProvider).run() } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverPodKubernetesCredentialsMounter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverPodKubernetesCredentialsMounter.scala new file mode 100644 index 0000000000000..9759669335774 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverPodKubernetesCredentialsMounter.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.v2 + +import io.fabric8.kubernetes.api.model.{PodBuilder, Secret, SecretBuilder} +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.KubernetesCredentials +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.internal.config.OptionalConfigEntry + +private[spark] trait DriverPodKubernetesCredentialsMounter { + + /** + * Set fields on the Spark configuration that indicate where the driver pod is + * to find its Kubernetes credentials for requesting executors. + */ + def setDriverPodKubernetesCredentialLocations(sparkConf: SparkConf): SparkConf + + /** + * Create the Kubernetes secret object that correspond to the driver's credentials + * that have to be created and mounted into the driver pod. The single Secret + * object contains all of the data entries for the driver pod's Kubernetes + * credentials. Returns empty if no secrets are to be mounted. + */ + def createCredentialsSecret(): Option[Secret] + + /** + * Mount any Kubernetes credentials from the submitting machine's disk into the driver pod. The + * secret that is passed in here should have been created from createCredentialsSecret so that + * the implementation does not need to hold its state. + */ + def mountDriverKubernetesCredentials( + originalPodSpec: PodBuilder, + driverContainerName: String, + credentialsSecret: Option[Secret]): PodBuilder +} + +private[spark] class DriverPodKubernetesCredentialsMounterImpl( + kubernetesAppId: String, + submitterLocalDriverPodKubernetesCredentials: KubernetesCredentials, + maybeUserSpecifiedMountedClientKeyFile: Option[String], + maybeUserSpecifiedMountedClientCertFile: Option[String], + maybeUserSpecifiedMountedOAuthTokenFile: Option[String], + maybeUserSpecifiedMountedCaCertFile: Option[String]) + extends DriverPodKubernetesCredentialsMounter { + + override def setDriverPodKubernetesCredentialLocations(sparkConf: SparkConf): SparkConf = { + val resolvedMountedClientKeyFile = resolveSecretLocation( + maybeUserSpecifiedMountedClientKeyFile, + submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64, + DRIVER_CREDENTIALS_CLIENT_KEY_PATH) + val resolvedMountedClientCertFile = resolveSecretLocation( + maybeUserSpecifiedMountedClientCertFile, + submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64, + DRIVER_CREDENTIALS_CLIENT_CERT_PATH) + val resolvedMountedCaCertFile = resolveSecretLocation( + maybeUserSpecifiedMountedCaCertFile, + submitterLocalDriverPodKubernetesCredentials.caCertDataBase64, + DRIVER_CREDENTIALS_CA_CERT_PATH) + val resolvedMountedOAuthTokenFile = resolveSecretLocation( + maybeUserSpecifiedMountedOAuthTokenFile, + submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64, + DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH) + val sparkConfWithCredentialLocations = sparkConf.clone() + .setOption(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE, resolvedMountedCaCertFile) + .setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE, resolvedMountedClientKeyFile) + .setOption(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE, resolvedMountedClientCertFile) + .setOption(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN, resolvedMountedOAuthTokenFile) + sparkConfWithCredentialLocations.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ => + sparkConfWithCredentialLocations.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "") + } + sparkConfWithCredentialLocations.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ => + sparkConfWithCredentialLocations.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "") + } + sparkConfWithCredentialLocations + } + + override def createCredentialsSecret(): Option[Secret] = { + val allSecretData = + resolveSecretData( + maybeUserSpecifiedMountedClientKeyFile, + submitterLocalDriverPodKubernetesCredentials.clientKeyDataBase64, + DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME) ++ + resolveSecretData( + maybeUserSpecifiedMountedClientCertFile, + submitterLocalDriverPodKubernetesCredentials.clientCertDataBase64, + DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME) ++ + resolveSecretData( + maybeUserSpecifiedMountedCaCertFile, + submitterLocalDriverPodKubernetesCredentials.caCertDataBase64, + DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME) ++ + resolveSecretData( + maybeUserSpecifiedMountedOAuthTokenFile, + submitterLocalDriverPodKubernetesCredentials.oauthTokenBase64, + DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME) + if (allSecretData.isEmpty) { + None + } else { + Some(new SecretBuilder() + .withNewMetadata().withName(s"$kubernetesAppId-kubernetes-credentials").endMetadata() + .withData(allSecretData.asJava) + .build()) + } + } + + override def mountDriverKubernetesCredentials( + originalPodSpec: PodBuilder, + driverContainerName: String, + credentialsSecret: Option[Secret]): PodBuilder = { + credentialsSecret.map { secret => + originalPodSpec.editSpec() + .addNewVolume() + .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() + .endVolume() + .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainerName)) + .addNewVolumeMount() + .withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withMountPath(DRIVER_CREDENTIALS_SECRETS_BASE_DIR) + .endVolumeMount() + .endContainer() + .endSpec() + }.getOrElse(originalPodSpec) + } + + private def resolveSecretLocation( + mountedUserSpecified: Option[String], + valueMountedFromSubmitter: Option[String], + mountedCanonicalLocation: String): Option[String] = { + mountedUserSpecified.orElse(valueMountedFromSubmitter.map( _ => { + mountedCanonicalLocation + })) + } + + private def resolveSecretData( + mountedUserSpecified: Option[String], + valueMountedFromSubmitter: Option[String], + secretName: String): Map[String, String] = { + mountedUserSpecified.map { _ => Map.empty[String, String]} + .getOrElse { + valueMountedFromSubmitter.map { valueBase64 => + Map(secretName -> valueBase64) + }.getOrElse(Map.empty[String, String]) + } + } + + private implicit def augmentSparkConf(sparkConf: SparkConf): OptionSettableSparkConf = { + new OptionSettableSparkConf(sparkConf) + } +} + +private class OptionSettableSparkConf(sparkConf: SparkConf) { + def setOption[T](configEntry: OptionalConfigEntry[T], option: Option[T]): SparkConf = { + option.map( opt => { + sparkConf.set(configEntry, opt) + }).getOrElse(sparkConf) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverPodKubernetesCredentialsMounterProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverPodKubernetesCredentialsMounterProvider.scala new file mode 100644 index 0000000000000..e981c54d23a9d --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverPodKubernetesCredentialsMounterProvider.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.v2 + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.submit.DriverPodKubernetesCredentialsProvider + +private[spark] trait DriverPodKubernetesCredentialsMounterProvider { + + def getDriverPodKubernetesCredentialsMounter() + : DriverPodKubernetesCredentialsMounter +} + +private[spark] class DriverPodKubernetesCredentialsMounterProviderImpl( + sparkConf: SparkConf, + kubernetesAppId: String) + extends DriverPodKubernetesCredentialsMounterProvider { + + override def getDriverPodKubernetesCredentialsMounter() + : DriverPodKubernetesCredentialsMounter = { + val submitterLocalDriverPodKubernetesCredentials = + new DriverPodKubernetesCredentialsProvider(sparkConf).get() + new DriverPodKubernetesCredentialsMounterImpl( + kubernetesAppId, + submitterLocalDriverPodKubernetesCredentials, + sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE), + sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE), + sparkConf.get(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN), + sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE)) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyUploaderImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyUploaderImpl.scala index f22759d463cb7..5f98facfb691f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyUploaderImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v2/SubmittedDependencyUploaderImpl.scala @@ -25,9 +25,8 @@ import okhttp3.RequestBody import retrofit2.Call import org.apache.spark.{SparkException, SSLOptions} -import org.apache.spark.deploy.kubernetes.CompressionUtils +import org.apache.spark.deploy.kubernetes.{CompressionUtils, KubernetesCredentials} import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils -import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials import org.apache.spark.deploy.rest.kubernetes.v2.{ResourceStagingServiceRetrofit, RetrofitClientFactory} import org.apache.spark.util.Utils diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesRestProtocolMessages.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesRestProtocolMessages.scala index cd1f9dcdf5879..bdd4a85da8f85 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesRestProtocolMessages.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesRestProtocolMessages.scala @@ -19,15 +19,10 @@ package org.apache.spark.deploy.rest.kubernetes.v1 import com.fasterxml.jackson.annotation.{JsonIgnore, JsonSubTypes, JsonTypeInfo} import org.apache.spark.SPARK_VERSION +import org.apache.spark.deploy.kubernetes.KubernetesCredentials import org.apache.spark.deploy.rest.{SubmitRestProtocolRequest, SubmitRestProtocolResponse} import org.apache.spark.util.Utils -case class KubernetesCredentials( - oauthToken: Option[String], - caCertDataBase64: Option[String], - clientKeyDataBase64: Option[String], - clientCertDataBase64: Option[String]) - case class KubernetesCreateSubmissionRequest( appResource: AppResource, mainClass: String, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala index 7847ba2546594..52ca3ef956a79 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v1/KubernetesSparkRestServer.scala @@ -33,7 +33,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SecurityManager, SPARK_VERSION => sparkVersion, SparkConf, SparkException, SSLOptions} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.kubernetes.CompressionUtils +import org.apache.spark.deploy.kubernetes.{CompressionUtils, KubernetesCredentials} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.submit.KubernetesFileUtils import org.apache.spark.deploy.rest._ @@ -306,7 +306,10 @@ private[spark] class KubernetesSparkRestServer( + resolvedDirectory.getAbsolutePath) } val oauthTokenFile = writeRawStringCredentialAndGetConf("oauth-token.txt", resolvedDirectory, - KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN, kubernetesCredentials.oauthToken) + KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN, + kubernetesCredentials.oauthTokenBase64.map { base64 => + new String(BaseEncoding.base64().decode(base64), Charsets.UTF_8) + }) val caCertFile = writeBase64CredentialAndGetConf("ca.crt", resolvedDirectory, KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE, kubernetesCredentials.caCertDataBase64) val clientKeyFile = writeBase64CredentialAndGetConf("key.key", resolvedDirectory, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala index b7c6c4fb913da..5dbe55b72bd8b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingService.scala @@ -22,8 +22,8 @@ import javax.ws.rs.core.{MediaType, StreamingOutput} import org.glassfish.jersey.media.multipart.FormDataParam +import org.apache.spark.deploy.kubernetes.KubernetesCredentials import org.apache.spark.deploy.kubernetes.submit.v2.SubmittedResourceIdAndSecret -import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials /** * Service that receives application data that can be retrieved later on. This is primarily used diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala index 3dfa83c85e6dd..34c3192ae6780 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImpl.scala @@ -26,8 +26,8 @@ import com.google.common.io.{BaseEncoding, ByteStreams, Files} import scala.collection.concurrent.TrieMap import org.apache.spark.SparkException +import org.apache.spark.deploy.kubernetes.KubernetesCredentials import org.apache.spark.deploy.kubernetes.submit.v2.SubmittedResourceIdAndSecret -import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials import org.apache.spark.internal.Logging import org.apache.spark.util.Utils diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/DriverPodKubernetesClientProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/DriverPodKubernetesClientProvider.scala index b8c2b0c91bbeb..50f2c218c22c4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/DriverPodKubernetesClientProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/DriverPodKubernetesClientProvider.scala @@ -21,10 +21,13 @@ import java.io.File import com.google.common.base.Charsets import com.google.common.io.Files import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} +import io.fabric8.kubernetes.client.utils.HttpClientUtils +import okhttp3.Dispatcher import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.util.ThreadUtils private[spark] class DriverPodKubernetesClientProvider(sparkConf: SparkConf, namespace: String) { private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) @@ -78,6 +81,17 @@ private[spark] class DriverPodKubernetesClientProvider(sparkConf: SparkConf, nam } serviceAccountConfigBuilder } - new DefaultKubernetesClient(configBuilder.build) + // Disable the ping thread that is not daemon, in order to allow + // the driver main thread to shut down upon errors. Otherwise, the driver + // will hang indefinitely. + val config = configBuilder + .withWebsocketPingInterval(0) + .build() + val httpClient = HttpClientUtils.createHttpClient(config).newBuilder() + // Use a Dispatcher with a custom executor service that creates daemon threads. The default + // executor service used by Dispatcher creates non-daemon threads. + .dispatcher(new Dispatcher(ThreadUtils.newDaemonCachedThreadPool("spark-on-k8s"))) + .build() + new DefaultKubernetesClient(httpClient, config) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala deleted file mode 100644 index 31c6eda77d058..0000000000000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClientBuilder.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.scheduler.cluster.kubernetes - -import java.io.File - -import com.google.common.base.Charsets -import com.google.common.io.Files -import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient} -import io.fabric8.kubernetes.client.utils.HttpClientUtils -import okhttp3.Dispatcher - -import org.apache.spark.SparkConf -import org.apache.spark.deploy.kubernetes.config._ -import org.apache.spark.deploy.kubernetes.constants._ -import org.apache.spark.util.ThreadUtils - -private[spark] class KubernetesClientBuilder(sparkConf: SparkConf, namespace: String) { - private val SERVICE_ACCOUNT_TOKEN = new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH) - private val SERVICE_ACCOUNT_CA_CERT = new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) - private val oauthTokenFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN) - private val caCertFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE) - private val clientKeyFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE) - private val clientCertFile = sparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE) - - /** - * Creates a {@link KubernetesClient}, expecting to be from within the context of a pod. When - * doing so, service account token files can be picked up from canonical locations. - */ - def buildFromWithinPod(): DefaultKubernetesClient = { - val baseClientConfigBuilder = new ConfigBuilder() - .withApiVersion("v1") - .withMasterUrl(KUBERNETES_MASTER_INTERNAL_URL) - .withNamespace(namespace) - - val configBuilder = oauthTokenFile - .orElse(caCertFile) - .orElse(clientKeyFile) - .orElse(clientCertFile) - .map { _ => - var mountedAuthConfigBuilder = baseClientConfigBuilder - oauthTokenFile.foreach { tokenFilePath => - val tokenFile = new File(tokenFilePath) - mountedAuthConfigBuilder = mountedAuthConfigBuilder - .withOauthToken(Files.toString(tokenFile, Charsets.UTF_8)) - } - caCertFile.foreach { caFile => - mountedAuthConfigBuilder = mountedAuthConfigBuilder.withCaCertFile(caFile) - } - clientKeyFile.foreach { keyFile => - mountedAuthConfigBuilder = mountedAuthConfigBuilder.withClientKeyFile(keyFile) - } - clientCertFile.foreach { certFile => - mountedAuthConfigBuilder = mountedAuthConfigBuilder.withClientCertFile(certFile) - } - mountedAuthConfigBuilder - }.getOrElse { - var serviceAccountConfigBuilder = baseClientConfigBuilder - if (SERVICE_ACCOUNT_CA_CERT.isFile) { - serviceAccountConfigBuilder = serviceAccountConfigBuilder.withCaCertFile( - SERVICE_ACCOUNT_CA_CERT.getAbsolutePath) - } - - if (SERVICE_ACCOUNT_TOKEN.isFile) { - serviceAccountConfigBuilder = serviceAccountConfigBuilder.withOauthToken( - Files.toString(SERVICE_ACCOUNT_TOKEN, Charsets.UTF_8)) - } - serviceAccountConfigBuilder - } - // Disable the ping thread that is not daemon, in order to allow - // the driver main thread to shut down upon errors. Otherwise, the driver - // will hang indefinitely. - val config = configBuilder - .withWebsocketPingInterval(0) - .build() - val httpClient = HttpClientUtils.createHttpClient(config).newBuilder() - // Use a Dispatcher with a custom executor service that creates daemon threads. The default - // executor service used by Dispatcher creates non-daemon threads. - .dispatcher(new Dispatcher(ThreadUtils.newDaemonCachedThreadPool("spark-on-k8s"))) - .build() - new DefaultKubernetesClient(httpClient, config) - } -} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala index 4dc1e2e44980a..f0282dbb6d31a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/ClientV2Suite.scala @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, DoneablePod import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource} import org.hamcrest.{BaseMatcher, Description} -import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.{AdditionalAnswers, ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.Matchers.{any, anyVararg, argThat, eq => mockitoEq} import org.mockito.Mockito.{times, verify, when} import org.mockito.invocation.InvocationOnMock @@ -37,7 +37,6 @@ import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { - private val JARS_RESOURCE = SubmittedResourceIdAndSecret("jarsId", "jarsSecret") private val FILES_RESOURCE = SubmittedResourceIdAndSecret("filesId", "filesSecret") private val SUBMITTED_RESOURCES = SubmittedResources(JARS_RESOURCE, FILES_RESOURCE) @@ -53,9 +52,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { SPARK_APP_NAME_LABEL -> APP_NAME) private val CUSTOM_ANNOTATION_KEY = "customAnnotation" private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue" - private val SECRET_NAME = "secret" - private val SECRET_KEY = "secret-key" - private val SECRET_DATA = "secret-data" + private val INIT_CONTAINER_SECRET_NAME = "init-container-secret" + private val INIT_CONTAINER_SECRET_DATA = Map("secret-key" -> "secret-data") private val MAIN_CLASS = "org.apache.spark.examples.SparkPi" private val APP_ARGS = Array("3", "20") private val SPARK_JARS = Seq( @@ -70,22 +68,21 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { "hdfs://localhost:9000/app/files/file1.txt", "file:///var/data/spark-files/file2.txt") private val INIT_CONTAINER_SECRET = new SecretBuilder() .withNewMetadata() - .withName(SECRET_NAME) + .withName(INIT_CONTAINER_SECRET_NAME) .endMetadata() - .addToData(SECRET_KEY, SECRET_DATA) + .withData(INIT_CONTAINER_SECRET_DATA.asJava) .build() - private val CONFIG_MAP_NAME = "config-map" - private val CONFIG_MAP_KEY = "config-map-key" - private val CONFIG_MAP_DATA = "config-map-data" private val CUSTOM_JAVA_OPTION_KEY = "myappoption" private val CUSTOM_JAVA_OPTION_VALUE = "myappoptionvalue" private val DRIVER_JAVA_OPTIONS = s"-D$CUSTOM_JAVA_OPTION_KEY=$CUSTOM_JAVA_OPTION_VALUE" private val DRIVER_EXTRA_CLASSPATH = "/var/data/spark-app-custom/custom-jar.jar" + private val CONFIG_MAP_NAME = "config-map" + private val CONFIG_MAP_DATA = Map("config-map-key" -> "config-map-data") private val INIT_CONTAINER_CONFIG_MAP = new ConfigMapBuilder() .withNewMetadata() .withName(CONFIG_MAP_NAME) .endMetadata() - .addToData(CONFIG_MAP_KEY, CONFIG_MAP_DATA) + .withData(CONFIG_MAP_DATA.asJava) .build() private val CUSTOM_DRIVER_IMAGE = "spark-custom-driver:latest" private val DRIVER_MEMORY_MB = 512 @@ -104,6 +101,17 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private val DRIVER_POD_UID = "driver-pod-uid" private val DRIVER_POD_KIND = "pod" private val DRIVER_POD_API_VERSION = "v1" + private val CREDENTIALS_SECRET_NAME = "credentials-secret" + private val CREDENTIALS_SECRET_DATA = Map("credentials-secret-key" -> "credentials-secret-value") + private val CREDENTIALS_SECRET = new SecretBuilder() + .withNewMetadata() + .withName(CREDENTIALS_SECRET_NAME) + .endMetadata() + .withData(CREDENTIALS_SECRET_DATA.asJava) + .build() + private val CREDENTIALS_SET_CONF = "spark.kubernetes.driverCredentials.provided" + private val CREDENTIALS_SET_ANNOTATION = "credentials-set" + @Mock private var initContainerConfigMapBuilder: SparkInitContainerConfigMapBuilder = _ @Mock @@ -128,6 +136,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { HasMetadata, java.lang.Boolean] @Mock private var resourceListOps: ResourceListOps = _ + @Mock + private var credentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider = _ + @Mock + private var credentialsMounter: DriverPodKubernetesCredentialsMounter = _ before { MockitoAnnotations.initMocks(this) @@ -174,9 +186,12 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { when(executorInitContainerConfiguration.configureSparkConfForExecutorInitContainer(SPARK_CONF)) .thenReturn(SPARK_CONF_WITH_EXECUTOR_INIT_CONF) when(kubernetesClient.resourceList(anyVararg[HasMetadata]())).thenReturn(resourceListOps) + when(credentialsMounterProvider.getDriverPodKubernetesCredentialsMounter()) + .thenReturn(credentialsMounter) } test("Run with dependency uploader") { + expectationsForNoMountedCredentials() when(initContainerComponentsProvider .provideInitContainerSubmittedDependencyUploader(ALL_EXPECTED_LABELS)) .thenReturn(Some(submittedDependencyUploader)) @@ -194,8 +209,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { verifyCreatedResourcesHaveOwnerReferences(createdResources) assert(createdResources.exists { case secret: Secret => - val expectedSecretData = Map(SECRET_KEY -> SECRET_DATA) - secret.getMetadata.getName == SECRET_NAME && secret.getData.asScala == expectedSecretData + secret.getMetadata.getName == INIT_CONTAINER_SECRET_NAME && + secret.getData.asScala == INIT_CONTAINER_SECRET_DATA case _ => false }) verifyConfigMapWasCreated(createdResources) @@ -208,15 +223,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { } test("Run without dependency uploader") { - when(initContainerComponentsProvider - .provideInitContainerSubmittedDependencyUploader(ALL_EXPECTED_LABELS)) - .thenReturn(None) - when(initContainerComponentsProvider - .provideSubmittedDependenciesSecretBuilder(None)) - .thenReturn(None) - when(initContainerComponentsProvider - .provideInitContainerConfigMapBuilder(None)) - .thenReturn(initContainerConfigMapBuilder) + expectationsForNoMountedCredentials() + expectationsForNoDependencyUploader() runAndVerifyDriverPodHasCorrectProperties() val resourceListArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) verify(kubernetesClient).resourceList(resourceListArgumentCaptor.capture()) @@ -232,6 +240,65 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .provideSubmittedDependenciesSecretBuilder(None) } + test("Run with mounted credentials") { + expectationsForNoDependencyUploader() + when(credentialsMounter.createCredentialsSecret()).thenReturn(Some(CREDENTIALS_SECRET)) + when(credentialsMounter.mountDriverKubernetesCredentials( + any(), mockitoEq(DRIVER_CONTAINER_NAME), mockitoEq(Some(CREDENTIALS_SECRET)))) + .thenAnswer(new Answer[PodBuilder] { + override def answer(invocation: InvocationOnMock): PodBuilder = { + invocation.getArgumentAt(0, classOf[PodBuilder]).editMetadata() + .addToAnnotations(CREDENTIALS_SET_ANNOTATION, TRUE) + .endMetadata() + } + }) + when(credentialsMounter.setDriverPodKubernetesCredentialLocations(any())) + .thenAnswer(new Answer[SparkConf] { + override def answer(invocation: InvocationOnMock): SparkConf = { + invocation.getArgumentAt(0, classOf[SparkConf]).clone().set(CREDENTIALS_SET_CONF, TRUE) + } + }) + runAndVerifyPodMatchesPredicate { p => + Option(p) + .filter(pod => containerHasCorrectJvmOptions(pod, _(CREDENTIALS_SET_CONF) == TRUE)) + .exists { pod => + pod.getMetadata.getAnnotations.asScala(CREDENTIALS_SET_ANNOTATION) == TRUE + } + } + val resourceListArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) + verify(kubernetesClient).resourceList(resourceListArgumentCaptor.capture()) + val createdResources = resourceListArgumentCaptor.getAllValues.asScala + assert(createdResources.size === 2) + verifyCreatedResourcesHaveOwnerReferences(createdResources) + assert(createdResources.exists { + case secret: Secret => + secret.getMetadata.getName == CREDENTIALS_SECRET_NAME && + secret.getData.asScala == CREDENTIALS_SECRET_DATA + case _ => false + }) + } + + private def expectationsForNoDependencyUploader(): Unit = { + when(initContainerComponentsProvider + .provideInitContainerSubmittedDependencyUploader(ALL_EXPECTED_LABELS)) + .thenReturn(None) + when(initContainerComponentsProvider + .provideSubmittedDependenciesSecretBuilder(None)) + .thenReturn(None) + when(initContainerComponentsProvider + .provideInitContainerConfigMapBuilder(None)) + .thenReturn(initContainerConfigMapBuilder) + } + + private def expectationsForNoMountedCredentials(): Unit = { + when(credentialsMounter.setDriverPodKubernetesCredentialLocations(any())) + .thenAnswer(AdditionalAnswers.returnsFirstArg()) + when(credentialsMounter.createCredentialsSecret()).thenReturn(None) + when(credentialsMounter.mountDriverKubernetesCredentials( + any(), mockitoEq(DRIVER_CONTAINER_NAME), mockitoEq(None))) + .thenAnswer(AdditionalAnswers.returnsFirstArg()) + } + private def verifyCreatedResourcesHaveOwnerReferences( createdResources: mutable.Buffer[HasMetadata]): Unit = { assert(createdResources.forall { resource => @@ -248,14 +315,36 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private def verifyConfigMapWasCreated(createdResources: mutable.Buffer[HasMetadata]): Unit = { assert(createdResources.exists { case configMap: ConfigMap => - val expectedConfigMapData = Map(CONFIG_MAP_KEY -> CONFIG_MAP_DATA) configMap.getMetadata.getName == CONFIG_MAP_NAME && - configMap.getData.asScala == expectedConfigMapData + configMap.getData.asScala == CONFIG_MAP_DATA case _ => false }) } private def runAndVerifyDriverPodHasCorrectProperties(): Unit = { + val expectedOptions = SPARK_CONF.getAll + .filterNot(_._1 == org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS.key) + .toMap ++ + Map( + "spark.app.id" -> APP_ID, + KUBERNETES_DRIVER_POD_NAME.key -> APP_ID, + EXECUTOR_INIT_CONF_KEY -> TRUE, + CUSTOM_JAVA_OPTION_KEY -> CUSTOM_JAVA_OPTION_VALUE, + "spark.jars" -> RESOLVED_SPARK_JARS.mkString(","), + "spark.files" -> RESOLVED_SPARK_FILES.mkString(",")) + runAndVerifyPodMatchesPredicate { p => + Option(p) + .filter(_.getMetadata.getName == APP_ID) + .filter(podHasCorrectAnnotations) + .filter(_.getMetadata.getLabels.asScala == ALL_EXPECTED_LABELS) + .filter(containerHasCorrectBasicContainerConfiguration) + .filter(containerHasCorrectBasicEnvs) + .filter(containerHasCorrectMountedClasspath) + .exists(pod => containerHasCorrectJvmOptions(pod, _ == expectedOptions)) + } + } + + private def runAndVerifyPodMatchesPredicate(pred: (Pod => Boolean)): Unit = { new Client( APP_NAME, APP_ID, @@ -265,49 +354,31 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { SPARK_JARS, SPARK_FILES, kubernetesClientProvider, - initContainerComponentsProvider).run() + initContainerComponentsProvider, + credentialsMounterProvider).run() val podMatcher = new BaseMatcher[Pod] { override def matches(o: scala.Any): Boolean = { o match { - case p: Pod => - Option(p) - .filter(_.getMetadata.getName == APP_ID) - .filter(podHasCorrectAnnotations) - .filter(_.getMetadata.getLabels.asScala == ALL_EXPECTED_LABELS) - .filter(containerHasCorrectBasicContainerConfiguration) - .filter(containerHasCorrectBasicEnvs) - .filter(containerHasCorrectMountedClasspath) - .exists(containerHasCorrectJvmOptions) - case _ => - false + case p: Pod => pred(p) + case _ => false } } - override def describeTo(description: Description): Unit = {} } verify(podOps).create(argThat(podMatcher)) } - private def containerHasCorrectJvmOptions(pod: Pod): Boolean = { + private def containerHasCorrectJvmOptions( + pod: Pod, optionsCorrectnessPredicate: (Map[String, String] => Boolean)): Boolean = { val driverContainer = pod.getSpec.getContainers.asScala.head val envs = driverContainer.getEnv.asScala.map(env => (env.getName, env.getValue)) envs.toMap.get(ENV_DRIVER_JAVA_OPTS).exists { javaOptions => val splitOptions = javaOptions.split(" ") - val expectedOptions = SPARK_CONF.getAll - .filterNot(_._1 == org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS.key) - .toMap ++ - Map( - "spark.app.id" -> APP_ID, - KUBERNETES_DRIVER_POD_NAME.key -> APP_ID, - EXECUTOR_INIT_CONF_KEY -> TRUE, - CUSTOM_JAVA_OPTION_KEY -> CUSTOM_JAVA_OPTION_VALUE, - "spark.jars" -> RESOLVED_SPARK_JARS.mkString(","), - "spark.files" -> RESOLVED_SPARK_FILES.mkString(",")) splitOptions.forall(_.startsWith("-D")) && - splitOptions.map { option => + optionsCorrectnessPredicate(splitOptions.map { option => val withoutPrefix = option.substring(2) (withoutPrefix.split("=", 2)(0), withoutPrefix.split("=", 2)(1)) - }.toMap == expectedOptions + }.toMap) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverPodKubernetesCredentialsMounterSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverPodKubernetesCredentialsMounterSuite.scala new file mode 100644 index 0000000000000..d4413076fb092 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/v2/DriverPodKubernetesCredentialsMounterSuite.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit.v2 + +import io.fabric8.kubernetes.api.model.{PodBuilder, SecretBuilder} +import org.scalatest.prop.TableDrivenPropertyChecks +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ +import org.apache.spark.deploy.kubernetes.constants._ +import org.apache.spark.deploy.kubernetes.KubernetesCredentials + +class DriverPodKubernetesCredentialsMounterSuite + extends SparkFunSuite with TableDrivenPropertyChecks { + + private val CLIENT_KEY_DATA = "client-key-data" + private val CLIENT_CERT_DATA = "client-cert-data" + private val OAUTH_TOKEN_DATA = "oauth-token" + private val CA_CERT_DATA = "ca-cert-data" + private val SUBMITTER_LOCAL_DRIVER_KUBERNETES_CREDENTIALS = KubernetesCredentials( + caCertDataBase64 = Some(CA_CERT_DATA), + clientKeyDataBase64 = Some(CLIENT_KEY_DATA), + clientCertDataBase64 = Some(CLIENT_CERT_DATA), + oauthTokenBase64 = Some(OAUTH_TOKEN_DATA)) + private val APP_ID = "app-id" + private val USER_SPECIFIED_CLIENT_KEY_FILE = Some("/var/data/client-key.pem") + private val USER_SPECIFIED_CLIENT_CERT_FILE = Some("/var/data/client-cert.pem") + private val USER_SPECIFIED_OAUTH_TOKEN_FILE = Some("/var/data/token.txt") + private val USER_SPECIFIED_CA_CERT_FILE = Some("/var/data/ca.pem") + + // Different configurations of credentials mounters + private val credentialsMounterWithPreMountedFiles = + new DriverPodKubernetesCredentialsMounterImpl( + kubernetesAppId = APP_ID, + submitterLocalDriverPodKubernetesCredentials = SUBMITTER_LOCAL_DRIVER_KUBERNETES_CREDENTIALS, + maybeUserSpecifiedMountedClientKeyFile = USER_SPECIFIED_CLIENT_KEY_FILE, + maybeUserSpecifiedMountedClientCertFile = USER_SPECIFIED_CLIENT_CERT_FILE, + maybeUserSpecifiedMountedOAuthTokenFile = USER_SPECIFIED_OAUTH_TOKEN_FILE, + maybeUserSpecifiedMountedCaCertFile = USER_SPECIFIED_CA_CERT_FILE) + private val credentialsMounterWithoutPreMountedFiles = + new DriverPodKubernetesCredentialsMounterImpl( + kubernetesAppId = APP_ID, + submitterLocalDriverPodKubernetesCredentials = SUBMITTER_LOCAL_DRIVER_KUBERNETES_CREDENTIALS, + maybeUserSpecifiedMountedClientKeyFile = None, + maybeUserSpecifiedMountedClientCertFile = None, + maybeUserSpecifiedMountedOAuthTokenFile = None, + maybeUserSpecifiedMountedCaCertFile = None) + private val credentialsMounterWithoutAnyDriverCredentials = + new DriverPodKubernetesCredentialsMounterImpl( + APP_ID, KubernetesCredentials(None, None, None, None), None, None, None, None) + + // Test matrices + private val TEST_MATRIX_EXPECTED_SPARK_CONFS = Table( + ("Credentials Mounter Implementation", + "Expected client key file", + "Expected client cert file", + "Expected CA Cert file", + "Expected OAuth Token File"), + (credentialsMounterWithoutAnyDriverCredentials, + None, + None, + None, + None), + (credentialsMounterWithoutPreMountedFiles, + Some(DRIVER_CREDENTIALS_CLIENT_KEY_PATH), + Some(DRIVER_CREDENTIALS_CLIENT_CERT_PATH), + Some(DRIVER_CREDENTIALS_CA_CERT_PATH), + Some(DRIVER_CREDENTIALS_OAUTH_TOKEN_PATH)), + (credentialsMounterWithPreMountedFiles, + USER_SPECIFIED_CLIENT_KEY_FILE, + USER_SPECIFIED_CLIENT_CERT_FILE, + USER_SPECIFIED_CA_CERT_FILE, + USER_SPECIFIED_OAUTH_TOKEN_FILE)) + + private val TEST_MATRIX_EXPECTED_CREDENTIALS_SECRET = Table( + ("Credentials Mounter Implementation", "Expected Credentials Secret Data"), + (credentialsMounterWithoutAnyDriverCredentials, None), + (credentialsMounterWithoutPreMountedFiles, + Some(KubernetesSecretNameAndData( + data = Map[String, String]( + DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME -> CLIENT_KEY_DATA, + DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME -> CLIENT_CERT_DATA, + DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME -> CA_CERT_DATA, + DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME -> OAUTH_TOKEN_DATA + ), + name = s"$APP_ID-kubernetes-credentials"))), + (credentialsMounterWithPreMountedFiles, None)) + + test("Credentials mounter should set the driver's Kubernetes credentials locations") { + forAll(TEST_MATRIX_EXPECTED_SPARK_CONFS) { + case (credentialsMounter, + expectedClientKeyFile, + expectedClientCertFile, + expectedCaCertFile, + expectedOAuthTokenFile) => + val baseSparkConf = new SparkConf() + val resolvedSparkConf = + credentialsMounter.setDriverPodKubernetesCredentialLocations(baseSparkConf) + assert(resolvedSparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_KEY_FILE) === + expectedClientKeyFile) + assert(resolvedSparkConf.get(KUBERNETES_DRIVER_MOUNTED_CLIENT_CERT_FILE) === + expectedClientCertFile) + assert(resolvedSparkConf.get(KUBERNETES_DRIVER_MOUNTED_CA_CERT_FILE) === + expectedCaCertFile) + assert(resolvedSparkConf.get(KUBERNETES_DRIVER_MOUNTED_OAUTH_TOKEN) === + expectedOAuthTokenFile) + } + } + + test("Credentials mounter should create the correct credentials secret.") { + forAll(TEST_MATRIX_EXPECTED_CREDENTIALS_SECRET) { + case (credentialsMounter, expectedSecretNameAndData) => + val builtSecret = credentialsMounter.createCredentialsSecret() + val secretNameAndData = builtSecret.map { secret => + KubernetesSecretNameAndData(secret.getMetadata.getName, secret.getData.asScala.toMap) + } + assert(secretNameAndData === expectedSecretNameAndData) + } + } + + test("When credentials secret is provided, driver pod should mount the secret volume.") { + val credentialsSecret = new SecretBuilder() + .withNewMetadata().withName("secret").endMetadata() + .addToData("secretKey", "secretValue") + .build() + val originalPodSpec = new PodBuilder() + .withNewMetadata().withName("pod").endMetadata() + .withNewSpec() + .addNewContainer() + .withName("container") + .endContainer() + .endSpec() + val podSpecWithMountedDriverKubernetesCredentials = + credentialsMounterWithoutPreMountedFiles.mountDriverKubernetesCredentials( + originalPodSpec, "container", Some(credentialsSecret)).build() + val volumes = podSpecWithMountedDriverKubernetesCredentials.getSpec.getVolumes.asScala + assert(volumes.exists(_.getName == DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)) + volumes.find(_.getName == DRIVER_CREDENTIALS_SECRET_VOLUME_NAME).foreach { secretVolume => + assert(secretVolume.getSecret != null && secretVolume.getSecret.getSecretName == "secret") + } + } + + test("When credentials secret is absent, driver pod should not be changed.") { + val originalPodSpec = new PodBuilder() + val nonAdjustedPodSpec = + credentialsMounterWithoutAnyDriverCredentials.mountDriverKubernetesCredentials( + originalPodSpec, "driver", None) + assert(nonAdjustedPodSpec === originalPodSpec) + } +} + +private case class KubernetesSecretNameAndData(name: String, data: Map[String, String]) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala index 08be8af30b3bc..4ef12e8686bb0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServerSuite.scala @@ -27,8 +27,7 @@ import org.scalatest.BeforeAndAfter import retrofit2.Call import org.apache.spark.{SparkFunSuite, SSLOptions} -import org.apache.spark.deploy.kubernetes.SSLUtils -import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials +import org.apache.spark.deploy.kubernetes.{KubernetesCredentials, SSLUtils} import org.apache.spark.util.Utils /** diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala index 60850bb877540..9677d12681a16 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/kubernetes/v2/ResourceStagingServiceImplSuite.scala @@ -22,7 +22,7 @@ import java.nio.file.Paths import com.google.common.io.Files import org.apache.spark.SparkFunSuite -import org.apache.spark.deploy.rest.kubernetes.v1.KubernetesCredentials +import org.apache.spark.deploy.kubernetes.KubernetesCredentials import org.apache.spark.util.Utils /** diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala index 3be4507ac105a..ba9d088bfcfcc 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesV2Suite.scala @@ -135,6 +135,19 @@ private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend) runSparkPiAndVerifyCompletion(SparkLauncher.NO_RESOURCE) } + test("Use client key and client cert file when requesting executors") { + sparkConf.setJars(Seq( + KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE, + KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH)) + sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE, + kubernetesTestComponents.clientConfig.getClientKeyFile) + sparkConf.set(KUBERNETES_DRIVER_CLIENT_CERT_FILE, + kubernetesTestComponents.clientConfig.getClientCertFile) + sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE, + kubernetesTestComponents.clientConfig.getCaCertFile) + runSparkPiAndVerifyCompletion(SparkLauncher.NO_RESOURCE) + } + private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = { assume(testBackend.name == MINIKUBE_TEST_BACKEND)