Skip to content

Commit

Permalink
Introduce blocking submit to kubernetes by default (apache#53)
Browse files Browse the repository at this point in the history
* Introduce blocking submit to kubernetes by default

Two new configuration settings:
- spark.kubernetes.submit.waitAppCompletion
- spark.kubernetes.report.interval

* Minor touchups

* More succinct logging for pod state

* Fix import order

* Switch to watch-based logging

* Spaces in comma-joined volumes, labels, and containers

* Use CountDownLatch instead of SettableFuture

* Match parallel ConfigBuilder style

* Disable logging in fire-and-forget mode

Which is enabled with spark.kubernetes.submit.waitAppCompletion=false
(default: true)

* Additional log line for when application is launched

* Minor wording changes

* More logging

* Drop log to DEBUG
  • Loading branch information
ash211 authored and mccheah committed Feb 3, 2017
1 parent 43c918c commit 5ffbac2
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ package org.apache.spark.deploy.kubernetes

import java.io.{File, FileInputStream}
import java.security.{KeyStore, SecureRandom}
import java.util.concurrent.{TimeoutException, TimeUnit}
import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import javax.net.ssl.{SSLContext, TrustManagerFactory, X509TrustManager}

import com.google.common.base.Charsets
import com.google.common.io.Files
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.{ConfigBuilder => K8SConfigBuilder, DefaultKubernetesClient, KubernetesClient, KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.apache.commons.codec.binary.Base64
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -67,6 +67,8 @@ private[spark] class Client(
private val uiPort = sparkConf.getInt("spark.ui.port", DEFAULT_UI_PORT)
private val driverSubmitTimeoutSecs = sparkConf.get(KUBERNETES_DRIVER_SUBMIT_TIMEOUT)

private val waitForAppCompletion: Boolean = sparkConf.get(WAIT_FOR_APP_COMPLETION)

private val secretBase64String = {
val secretBytes = new Array[Byte](128)
SECURE_RANDOM.nextBytes(secretBytes)
Expand All @@ -81,9 +83,11 @@ private[spark] class Client(
ThreadUtils.newDaemonSingleThreadExecutor("kubernetes-client-retryable-futures"))

def run(): Unit = {
logInfo(s"Starting application $kubernetesAppId in Kubernetes...")
val (driverSubmitSslOptions, isKeyStoreLocalFile) = parseDriverSubmitSslOptions()

val parsedCustomLabels = parseCustomLabels(customLabels)
var k8ConfBuilder = new ConfigBuilder()
var k8ConfBuilder = new K8SConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(master)
.withNamespace(namespace)
Expand Down Expand Up @@ -116,73 +120,97 @@ private[spark] class Client(
SPARK_APP_NAME_LABEL -> appName)
++ parsedCustomLabels).asJava
val containerPorts = buildContainerPorts()
val submitCompletedFuture = SettableFuture.create[Boolean]
val submitPending = new AtomicBoolean(false)
val podWatcher = new DriverPodWatcher(
submitCompletedFuture,
submitPending,
kubernetesClient,
driverSubmitSslOptions,
Array(submitServerSecret) ++ sslSecrets,
driverKubernetesSelectors)

// start outer watch for status logging of driver pod
val driverPodCompletedLatch = new CountDownLatch(1)
// only enable interval logging if in waitForAppCompletion mode
val loggingInterval = if (waitForAppCompletion) sparkConf.get(REPORT_INTERVAL) else 0
val loggingWatch = new LoggingPodStatusWatcher(driverPodCompletedLatch, kubernetesAppId,
loggingInterval)
Utils.tryWithResource(kubernetesClient
.pods()
.withLabels(driverKubernetesSelectors)
.watch(podWatcher)) { _ =>
kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
.watch(loggingWatch)) { _ =>

// launch driver pod with inner watch to upload jars when it's ready
val submitCompletedFuture = SettableFuture.create[Boolean]
val submitPending = new AtomicBoolean(false)
val podWatcher = new DriverPodWatcher(
submitCompletedFuture,
submitPending,
kubernetesClient,
driverSubmitSslOptions,
Array(submitServerSecret) ++ sslSecrets,
driverKubernetesSelectors)
Utils.tryWithResource(kubernetesClient
.pods()
.withLabels(driverKubernetesSelectors)
.endMetadata()
.withNewSpec()
.withRestartPolicy("OnFailure")
.addNewVolume()
.withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(submitServerSecret.getMetadata.getName)
.endSecret()
.endVolume
.addToVolumes(sslVolumes: _*)
.withServiceAccount(serviceAccount)
.addNewContainer()
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
.withImagePullPolicy("IfNotPresent")
.addNewVolumeMount()
.watch(podWatcher)) { _ =>
kubernetesClient.pods().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
.withLabels(driverKubernetesSelectors)
.endMetadata()
.withNewSpec()
.withRestartPolicy("OnFailure")
.addNewVolume()
.withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
.withMountPath(secretDirectory)
.withReadOnly(true)
.endVolumeMount()
.addToVolumeMounts(sslVolumeMounts: _*)
.addNewEnv()
.withName(ENV_SUBMISSION_SECRET_LOCATION)
.withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
.endEnv()
.addNewEnv()
.withName(ENV_SUBMISSION_SERVER_PORT)
.withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
.addToEnv(sslEnvs: _*)
.withPorts(containerPorts.asJava)
.endContainer()
.endSpec()
.done()
var submitSucceeded = false
try {
submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS)
submitSucceeded = true
} catch {
case e: TimeoutException =>
val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e)
logError(finalErrorMessage, e)
throw new SparkException(finalErrorMessage, e)
} finally {
if (!submitSucceeded) {
Utils.tryLogNonFatalError {
kubernetesClient.pods.withName(kubernetesAppId).delete()
.withNewSecret()
.withSecretName(submitServerSecret.getMetadata.getName)
.endSecret()
.endVolume
.addToVolumes(sslVolumes: _*)
.withServiceAccount(serviceAccount)
.addNewContainer()
.withName(DRIVER_CONTAINER_NAME)
.withImage(driverDockerImage)
.withImagePullPolicy("IfNotPresent")
.addNewVolumeMount()
.withName(SUBMISSION_APP_SECRET_VOLUME_NAME)
.withMountPath(secretDirectory)
.withReadOnly(true)
.endVolumeMount()
.addToVolumeMounts(sslVolumeMounts: _*)
.addNewEnv()
.withName(ENV_SUBMISSION_SECRET_LOCATION)
.withValue(s"$secretDirectory/$SUBMISSION_APP_SECRET_NAME")
.endEnv()
.addNewEnv()
.withName(ENV_SUBMISSION_SERVER_PORT)
.withValue(SUBMISSION_SERVER_PORT.toString)
.endEnv()
.addToEnv(sslEnvs: _*)
.withPorts(containerPorts.asJava)
.endContainer()
.endSpec()
.done()
var submitSucceeded = false
try {
submitCompletedFuture.get(driverSubmitTimeoutSecs, TimeUnit.SECONDS)
submitSucceeded = true
logInfo(s"Finished launching local resources to application $kubernetesAppId")
} catch {
case e: TimeoutException =>
val finalErrorMessage: String = buildSubmitFailedErrorMessage(kubernetesClient, e)
logError(finalErrorMessage, e)
throw new SparkException(finalErrorMessage, e)
} finally {
if (!submitSucceeded) {
Utils.tryLogNonFatalError {
kubernetesClient.pods.withName(kubernetesAppId).delete()
}
}
}
}

// wait if configured to do so
if (waitForAppCompletion) {
logInfo(s"Waiting for application $kubernetesAppId to finish...")
driverPodCompletedLatch.await()
logInfo(s"Application $kubernetesAppId finished.")
} else {
logInfo(s"Application $kubernetesAppId successfully launched.")
}
}
} finally {
Utils.tryLogNonFatalError {
Expand Down Expand Up @@ -377,6 +405,8 @@ private[spark] class Client(
Future {
sparkConf.set("spark.driver.host", pod.getStatus.getPodIP)
val submitRequest = buildSubmissionRequest()
logInfo(s"Submitting local resources to driver pod for application " +
s"$kubernetesAppId ...")
driverSubmitter.submitApplication(submitRequest)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.internal.Logging

/**
* A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on
* every state change and also at an interval for liveness.
*
* @param podCompletedFuture a CountDownLatch that is set to true when the watched pod finishes
* @param appId
* @param interval ms between each state request. If set to 0 or a negative number, the periodic
* logging will be disabled.
*/
private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownLatch,
appId: String,
interval: Long)
extends Watcher[Pod] with Logging {

// start timer for periodic logging
private val scheduler = Executors.newScheduledThreadPool(1)
private val logRunnable: Runnable = new Runnable {
override def run() = logShortStatus()
}
if (interval > 0) {
scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS)
}

private var pod: Option[Pod] = Option.empty
private var prevPhase: String = null
private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown")

override def eventReceived(action: Action, pod: Pod): Unit = {
this.pod = Option(pod)

logShortStatus()
if (prevPhase != phase) {
logLongStatus()
}
prevPhase = phase

if (phase == "Succeeded" || phase == "Failed") {
podCompletedFuture.countDown()
}
}

override def onClose(e: KubernetesClientException): Unit = {
scheduler.shutdown()
logDebug(s"Stopped watching application $appId with last-observed phase $phase")
}

private def logShortStatus() = {
logInfo(s"Application status for $appId (phase: $phase)")
}

private def logLongStatus() = {
logInfo("Phase changed, new state: " + pod.map(formatPodState(_)).getOrElse("unknown"))
}

private def formatPodState(pod: Pod): String = {

val details = Seq[(String, String)](
// pod metadata
("pod name", pod.getMetadata.getName()),
("namespace", pod.getMetadata.getNamespace()),
("labels", pod.getMetadata.getLabels().asScala.mkString(", ")),
("pod uid", pod.getMetadata.getUid),
("creation time", pod.getMetadata.getCreationTimestamp()),

// spec details
("service account name", pod.getSpec.getServiceAccountName()),
("volumes", pod.getSpec.getVolumes().asScala.map(_.getName).mkString(", ")),
("node name", pod.getSpec.getNodeName()),

// status
("start time", pod.getStatus.getStartTime),
("container images",
pod.getStatus.getContainerStatuses()
.asScala
.map(_.getImage)
.mkString(", ")),
("phase", pod.getStatus.getPhase())
)

// Use more loggable format if value is null or empty
details.map { case (k, v) =>
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
s"\n\t $k: $newValue"
}.mkString("")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,23 @@ package object config {
.internal()
.stringConf
.createOptional

private[spark] val WAIT_FOR_APP_COMPLETION =
ConfigBuilder("spark.kubernetes.submit.waitAppCompletion")
.doc(
"""
| In cluster mode, whether to wait for the application to finish before exiting the
| launcher process.
""".stripMargin)
.booleanConf
.createWithDefault(true)

private[spark] val REPORT_INTERVAL =
ConfigBuilder("spark.kubernetes.report.interval")
.doc(
"""
| Interval between reports of the current app status in cluster mode.
""".stripMargin)
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")
}

0 comments on commit 5ffbac2

Please sign in to comment.