Skip to content

Commit

Permalink
Merge pull request apache-spark-on-k8s#137 from palantir/resync-kube-…
Browse files Browse the repository at this point in the history
…upstream

Resync kube upstream
  • Loading branch information
ash211 authored Mar 20, 2017
2 parents 9876abe + 3c76993 commit c500658
Show file tree
Hide file tree
Showing 21 changed files with 1,250 additions and 617 deletions.
1 change: 1 addition & 0 deletions dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,4 @@ circle.yml
publish.sh
structured-streaming/*
kafka-source-initial-offset-version-2.1.0.bin
org.apache.spark.deploy.rest.kubernetes.DriverServiceManager
24 changes: 24 additions & 0 deletions docs/running-on-kubernetes-cloud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
layout: global
title: Running Spark in the cloud with Kubernetes
---

For general information about running Spark on Kubernetes, refer to [running Spark on Kubernetes](running-on-kubernetes.md).

A Kubernetes cluster may be brought up on different cloud providers or on premise. It is commonly provisioned through [Google Container Engine](https://cloud.google.com/container-engine/), or using [kops](https://github.com/kubernetes/kops) on AWS, or on premise using [kubeadm](https://kubernetes.io/docs/getting-started-guides/kubeadm/).

## Running on Google Container Engine (GKE)

* Create a GKE [container cluster](https://cloud.google.com/container-engine/docs/clusters/operations).
* Obtain kubectl and [configure](https://cloud.google.com/container-engine/docs/clusters/operations#configuring_kubectl) it appropriately.
* Find the identity of the master associated with this project.

> kubectl cluster-info
Kubernetes master is running at https://<master-ip>:443
* Run spark-submit with the master option set to `k8s://https://<master-ip>:443`. The instructions for running spark-submit are provided in the [running on kubernetes](running-on-kubernetes.md) tutorial.
* Check that your driver pod, and subsequently your executor pods are launched using `kubectl get pods`.
* Read the stdout and stderr of the driver pod using `kubectl logs <name-of-driver-pod>`, or stream the logs using `kubectl logs -f <name-of-driver-pod>`.

Known issues:
* If you face OAuth token expiry errors when you run spark-submit, it is likely because the token needs to be refreshed. The easiest way to fix this is to run any `kubectl` command, say, `kubectl version` and then retry your submission.
166 changes: 122 additions & 44 deletions docs/running-on-kubernetes.md

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions resource-managers/kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ Afterwards, the integration tests can be executed with Maven or your IDE. Note t
`pre-integration-test` phase must be run every time the Spark main code changes. When running tests from the
command line, the `pre-integration-test` phase should automatically be invoked if the `integration-test` phase is run.

After the above step, the integration test can be run using the following command:

```sh
build/mvn integration-test \
-Pkubernetes -Pkubernetes-integration-tests \
-pl resource-managers/kubernetes/integration-tests -am
```

# Preserve the Minikube VM

The integration tests make use of [Minikube](https://github.com/kubernetes/minikube), which fires up a virtual machine
Expand Down
4 changes: 4 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
</dependency>
<!-- End of shaded deps. -->

</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, KubernetesCredentials, RemoteAppResource, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ShutdownHookManager, Utils}
Expand All @@ -52,7 +52,7 @@ private[spark] class Client(
.getOrElse("spark")
private val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
private val secretName = s"$SUBMISSION_APP_SECRET_PREFIX-$kubernetesAppId"
private val secretDirectory = s"$DRIVER_CONTAINER_SECRETS_BASE_DIR/$kubernetesAppId"
private val secretDirectory = s"$DRIVER_CONTAINER_SUBMISSION_SECRETS_BASE_DIR/$kubernetesAppId"
private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE)
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)
Expand Down Expand Up @@ -118,19 +118,23 @@ private[spark] class Client(
customAnnotations,
KUBERNETES_DRIVER_ANNOTATIONS.key,
"annotations")
val driverPodKubernetesCredentials = new DriverPodKubernetesCredentialsProvider(sparkConf).get()
var k8ConfBuilder = new K8SConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withNamespace(namespace)
sparkConf.get(KUBERNETES_CA_CERT_FILE).foreach {
sparkConf.get(KUBERNETES_SUBMIT_CA_CERT_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withCaCertFile(f)
}
sparkConf.get(KUBERNETES_CLIENT_KEY_FILE).foreach {
sparkConf.get(KUBERNETES_SUBMIT_CLIENT_KEY_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withClientKeyFile(f)
}
sparkConf.get(KUBERNETES_CLIENT_CERT_FILE).foreach {
sparkConf.get(KUBERNETES_SUBMIT_CLIENT_CERT_FILE).foreach {
f => k8ConfBuilder = k8ConfBuilder.withClientCertFile(f)
}
sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { token =>
k8ConfBuilder = k8ConfBuilder.withOauthToken(token)
}

val k8ClientConfig = k8ConfBuilder.build
Utils.tryWithResource(new DefaultKubernetesClient(k8ClientConfig)) { kubernetesClient =>
Expand All @@ -157,7 +161,7 @@ private[spark] class Client(
driverServiceManager.handleSubmissionError(
new SparkException("Submission shutting down early...")))
try {
val sslConfigurationProvider = new SslConfigurationProvider(
val sslConfigurationProvider = new DriverSubmitSslConfigurationProvider(
sparkConf, kubernetesAppId, kubernetesClient, kubernetesResourceCleaner)
val submitServerSecret = kubernetesClient.secrets().createNew()
.withNewMetadata()
Expand All @@ -168,11 +172,6 @@ private[spark] class Client(
.done()
kubernetesResourceCleaner.registerOrUpdateResource(submitServerSecret)
val sslConfiguration = sslConfigurationProvider.getSslConfiguration()
val driverKubernetesSelectors = (Map(
SPARK_DRIVER_LABEL -> kubernetesAppId,
SPARK_APP_ID_LABEL -> kubernetesAppId,
SPARK_APP_NAME_LABEL -> appName)
++ parsedCustomLabels)
val (driverPod, driverService) = launchDriverKubernetesComponents(
kubernetesClient,
driverServiceManager,
Expand All @@ -183,7 +182,7 @@ private[spark] class Client(
configureOwnerReferences(
kubernetesClient,
submitServerSecret,
sslConfiguration.sslSecrets,
sslConfiguration.sslSecret,
driverPod,
driverService)
submitApplicationToDriverServer(
Expand All @@ -192,7 +191,8 @@ private[spark] class Client(
sslConfiguration,
driverService,
submitterLocalFiles,
submitterLocalJars)
submitterLocalJars,
driverPodKubernetesCredentials)
// Now that the application has started, persist the components that were created beyond
// the shutdown hook. We still want to purge the one-time secrets, so do not unregister
// those.
Expand All @@ -209,7 +209,6 @@ private[spark] class Client(
Utils.tryLogNonFatalError {
driverServiceManager.stop()
}

// Remove the shutdown hooks that would be redundant
Utils.tryLogNonFatalError {
ShutdownHookManager.removeShutdownHook(resourceCleanShutdownHook)
Expand All @@ -236,10 +235,11 @@ private[spark] class Client(
private def submitApplicationToDriverServer(
kubernetesClient: KubernetesClient,
driverServiceManager: DriverServiceManager,
sslConfiguration: SslConfiguration,
sslConfiguration: DriverSubmitSslConfiguration,
driverService: Service,
submitterLocalFiles: Iterable[String],
submitterLocalJars: Iterable[String]): Unit = {
submitterLocalJars: Iterable[String],
driverPodKubernetesCredentials: KubernetesCredentials): Unit = {
sparkConf.getOption("spark.app.id").foreach { id =>
logWarning(s"Warning: Provided app id in spark.app.id as $id will be" +
s" overridden as $kubernetesAppId")
Expand All @@ -251,6 +251,12 @@ private[spark] class Client(
sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
sparkConf.setIfMissing("spark.blockmanager.port",
DEFAULT_BLOCKMANAGER_PORT.toString)
sparkConf.get(KUBERNETES_SUBMIT_OAUTH_TOKEN).foreach { _ =>
sparkConf.set(KUBERNETES_SUBMIT_OAUTH_TOKEN, "<present_but_redacted>")
}
sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).foreach { _ =>
sparkConf.set(KUBERNETES_DRIVER_OAUTH_TOKEN, "<present_but_redacted>")
}
val driverSubmitter = buildDriverSubmissionClient(
kubernetesClient,
driverServiceManager,
Expand All @@ -260,7 +266,10 @@ private[spark] class Client(
driverSubmitter.ping()
logInfo(s"Submitting local resources to driver pod for application " +
s"$kubernetesAppId ...")
val submitRequest = buildSubmissionRequest(submitterLocalFiles, submitterLocalJars)
val submitRequest = buildSubmissionRequest(
submitterLocalFiles,
submitterLocalJars,
driverPodKubernetesCredentials)
driverSubmitter.submitApplication(submitRequest)
logInfo("Successfully submitted local resources and driver configuration to" +
" driver pod.")
Expand Down Expand Up @@ -288,7 +297,7 @@ private[spark] class Client(
customLabels: Map[String, String],
customAnnotations: Map[String, String],
submitServerSecret: Secret,
sslConfiguration: SslConfiguration): (Pod, Service) = {
sslConfiguration: DriverSubmitSslConfiguration): (Pod, Service) = {
val driverKubernetesSelectors = (Map(
SPARK_DRIVER_LABEL -> kubernetesAppId,
SPARK_APP_ID_LABEL -> kubernetesAppId,
Expand Down Expand Up @@ -339,7 +348,7 @@ private[spark] class Client(
private def configureOwnerReferences(
kubernetesClient: KubernetesClient,
submitServerSecret: Secret,
sslSecrets: Array[Secret],
sslSecret: Option[Secret],
driverPod: Pod,
driverService: Service): Service = {
val driverPodOwnerRef = new OwnerReferenceBuilder()
Expand All @@ -349,7 +358,7 @@ private[spark] class Client(
.withKind(driverPod.getKind)
.withController(true)
.build()
sslSecrets.foreach(secret => {
sslSecret.foreach(secret => {
val updatedSecret = kubernetesClient.secrets().withName(secret.getMetadata.getName).edit()
.editMetadata()
.addToOwnerReferences(driverPodOwnerRef)
Expand Down Expand Up @@ -415,10 +424,10 @@ private[spark] class Client(
driverKubernetesSelectors: Map[String, String],
customAnnotations: Map[String, String],
submitServerSecret: Secret,
sslConfiguration: SslConfiguration): Pod = {
sslConfiguration: DriverSubmitSslConfiguration): Pod = {
val containerPorts = buildContainerPorts()
val probePingHttpGet = new HTTPGetActionBuilder()
.withScheme(if (sslConfiguration.sslOptions.enabled) "HTTPS" else "HTTP")
.withScheme(if (sslConfiguration.enabled) "HTTPS" else "HTTP")
.withPath("/v1/submissions/ping")
.withNewPort(SUBMISSION_SERVER_PORT_NAME)
.build()
Expand All @@ -442,8 +451,8 @@ private[spark] class Client(
.withSecretName(submitServerSecret.getMetadata.getName)
.endSecret()
.endVolume()
.addToVolumes(sslConfiguration.sslPodVolumes: _*)
.withServiceAccount(serviceAccount)
.addToVolumes(sslConfiguration.sslPodVolume.toSeq: _*)
.withServiceAccount(serviceAccount.getOrElse("default"))
.addNewContainer()
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
Expand All @@ -453,7 +462,7 @@ private[spark] class Client(
.withMountPath(secretDirectory)
.withReadOnly(true)
.endVolumeMount()
.addToVolumeMounts(sslConfiguration.sslPodVolumeMounts: _*)
.addToVolumeMounts(sslConfiguration.sslPodVolumeMount.toSeq: _*)
.addNewEnv()
.withName(ENV_SUBMISSION_SECRET_LOCATION)
.withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
Expand Down Expand Up @@ -619,7 +628,8 @@ private[spark] class Client(

private def buildSubmissionRequest(
submitterLocalFiles: Iterable[String],
submitterLocalJars: Iterable[String]): KubernetesCreateSubmissionRequest = {
submitterLocalJars: Iterable[String],
driverPodKubernetesCredentials: KubernetesCredentials): KubernetesCreateSubmissionRequest = {
val mainResourceUri = Utils.resolveURI(mainAppResource)
val resolvedAppResource: AppResource = Option(mainResourceUri.getScheme)
.getOrElse("file") match {
Expand All @@ -642,14 +652,15 @@ private[spark] class Client(
secret = secretBase64String,
sparkProperties = sparkConf.getAll.toMap,
uploadedJarsBase64Contents = uploadJarsBase64Contents,
uploadedFilesBase64Contents = uploadFilesBase64Contents)
uploadedFilesBase64Contents = uploadFilesBase64Contents,
driverPodKubernetesCredentials = driverPodKubernetesCredentials)
}

private def buildDriverSubmissionClient(
kubernetesClient: KubernetesClient,
driverServiceManager: DriverServiceManager,
service: Service,
sslConfiguration: SslConfiguration): KubernetesSparkRestApi = {
sslConfiguration: DriverSubmitSslConfiguration): KubernetesSparkRestApi = {
val serviceUris = driverServiceManager.getDriverServiceSubmissionServerUris(service)
require(serviceUris.nonEmpty, "No uris found to contact the driver!")
HttpClientUtil.createClient[KubernetesSparkRestApi](
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import java.io.File

import com.google.common.io.{BaseEncoding, Files}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.rest.KubernetesCredentials
import org.apache.spark.internal.config.OptionalConfigEntry

private[spark] class DriverPodKubernetesCredentialsProvider(sparkConf: SparkConf) {

def get(): KubernetesCredentials = {
sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME).foreach { _ =>
require(sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN).isEmpty,
"Cannot specify both a service account and a driver pod OAuth token.")
require(sparkConf.get(KUBERNETES_DRIVER_CA_CERT_FILE).isEmpty,
"Cannot specify both a service account and a driver pod CA cert file.")
require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_KEY_FILE).isEmpty,
"Cannot specify both a service account and a driver pod client key file.")
require(sparkConf.get(KUBERNETES_DRIVER_CLIENT_CERT_FILE).isEmpty,
"Cannot specify both a service account and a driver pod client cert file.")
}
val oauthToken = sparkConf.get(KUBERNETES_DRIVER_OAUTH_TOKEN)
val caCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CA_CERT_FILE,
s"Driver CA cert file provided at %s does not exist or is not a file.")
val clientKeyDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_KEY_FILE,
s"Driver client key file provided at %s does not exist or is not a file.")
val clientCertDataBase64 = safeFileConfToBase64(KUBERNETES_DRIVER_CLIENT_CERT_FILE,
s"Driver client cert file provided at %s does not exist or is not a file.")
val serviceAccountName = sparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
KubernetesCredentials(
oauthToken = oauthToken,
caCertDataBase64 = caCertDataBase64,
clientKeyDataBase64 = clientKeyDataBase64,
clientCertDataBase64 = clientCertDataBase64)
}

private def safeFileConfToBase64(
conf: OptionalConfigEntry[String],
fileNotFoundFormatString: String): Option[String] = {
sparkConf.get(conf)
.map(new File(_))
.map { file =>
require(file.isFile, String.format(fileNotFoundFormatString, file.getAbsolutePath))
BaseEncoding.base64().encode(Files.toByteArray(file))
}
}
}
Loading

0 comments on commit c500658

Please sign in to comment.