Skip to content

Commit

Permalink
Enable testing against GCE clusters (apache#243)
Browse files Browse the repository at this point in the history
* Part 1: making test code cluster-agnostic

* Final checked

* Move all test code into KubernetesTestComponents

* Addressed comments

* Fixed doc

* Restructure the test backends (apache#248)

* Restructured the test backends

* Address comments

* var -> val

* Comments

* removed deadcode
  • Loading branch information
foxish committed Jul 24, 2017
1 parent 7c29732 commit 2c753de
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 92 deletions.
8 changes: 8 additions & 0 deletions resource-managers/kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ build/mvn integration-test \
-pl resource-managers/kubernetes/integration-tests -am
```

# Running against an arbitrary cluster

In order to run against any cluster, use the following:
build/mvn integration-test \
-Pkubernetes -Pkubernetes-integration-tests \
-pl resource-managers/kubernetes/integration-tests -am
-DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https://<master> -Dspark.docker.test.driverImage=<driver-image> -Dspark.docker.test.executorImage=<executor-image>"

# Preserve the Minikube VM

The integration tests make use of [Minikube](https://github.com/kubernetes/minikube), which fires up a virtual machine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
import scala.collection.JavaConverters._

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.internal.Logging

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,23 @@ import org.scalatest.concurrent.PatienceConfiguration
import org.scalatest.time.{Minutes, Seconds, Span}

import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.kubernetes.integrationtest.docker.SparkDockerImageBuilder
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory}

private[spark] class KubernetesSuite extends SparkFunSuite {
private val testBackend: IntegrationTestBackend = IntegrationTestBackendFactory.getTestBackend()

override def beforeAll(): Unit = {
Minikube.startMinikube()
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
testBackend.initialize()
}

override def afterAll(): Unit = {
if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) {
Minikube.deleteMinikube()
}
testBackend.cleanUp()
}

override def nestedSuites: scala.collection.immutable.IndexedSeq[Suite] = {
Vector(
new KubernetesV1Suite,
new KubernetesV2Suite)
new KubernetesV1Suite(testBackend),
new KubernetesV2Suite(testBackend))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,37 @@
package org.apache.spark.deploy.kubernetes.integrationtest

import java.util.UUID
import javax.net.ssl.X509TrustManager

import org.scalatest.concurrent.Eventually
import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.internal.SSLUtils
import org.scalatest.concurrent.Eventually

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.rest.kubernetes.v1.HttpClientUtil

private[spark] class KubernetesTestComponents {
private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) {

val namespace = UUID.randomUUID().toString.replaceAll("-", "")
val kubernetesClient = Minikube.getKubernetesClient.inNamespace(namespace)
val kubernetesClient = defaultClient.inNamespace(namespace)
val clientConfig = kubernetesClient.getConfiguration

def createNamespace(): Unit = {
Minikube.getKubernetesClient.namespaces.createNew()
defaultClient.namespaces.createNew()
.withNewMetadata()
.withName(namespace)
.endMetadata()
.done()
}

def deleteNamespace(): Unit = {
Minikube.getKubernetesClient.namespaces.withName(namespace).delete()
defaultClient.namespaces.withName(namespace).delete()
Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
val namespaceList = Minikube.getKubernetesClient
val namespaceList = defaultClient
.namespaces()
.list()
.getItems()
Expand All @@ -53,13 +58,12 @@ private[spark] class KubernetesTestComponents {

def newSparkConf(): SparkConf = {
new SparkConf(true)
.setMaster(s"k8s://https://${Minikube.getMinikubeIp}:8443")
.set(KUBERNETES_SUBMIT_CA_CERT_FILE, clientConfig.getCaCertFile)
.set(KUBERNETES_SUBMIT_CLIENT_KEY_FILE, clientConfig.getClientKeyFile)
.set(KUBERNETES_SUBMIT_CLIENT_CERT_FILE, clientConfig.getClientCertFile)
.setMaster(s"k8s://${kubernetesClient.getMasterUrl}")
.set(KUBERNETES_NAMESPACE, namespace)
.set(DRIVER_DOCKER_IMAGE, "spark-driver:latest")
.set(EXECUTOR_DOCKER_IMAGE, "spark-executor:latest")
.set(DRIVER_DOCKER_IMAGE,
System.getProperty("spark.docker.test.driverImage", "spark-driver:latest"))
.set(EXECUTOR_DOCKER_IMAGE,
System.getProperty("spark.docker.test.executorImage", "spark-executor:latest"))
.setJars(Seq(KubernetesSuite.HELPER_JAR_FILE.getAbsolutePath))
.set("spark.executor.memory", "500m")
.set("spark.executor.cores", "1")
Expand All @@ -69,4 +73,26 @@ private[spark] class KubernetesTestComponents {
.set("spark.testing", "false")
.set(WAIT_FOR_APP_COMPLETION, false)
}
}

def getService[T: ClassTag](
serviceName: String,
namespace: String,
servicePortName: String,
servicePath: String = ""): T = synchronized {
val kubernetesMaster = s"${defaultClient.getMasterUrl}"

val url = s"${
Array[String](
s"${kubernetesClient.getMasterUrl}",
"api", "v1", "proxy",
"namespaces", namespace,
"services", serviceName).mkString("/")
}" +
s":$servicePortName$servicePath"
val userHome = System.getProperty("user.home")
val kubernetesConf = kubernetesClient.getConfiguration
val sslContext = SSLUtils.sslContext(kubernetesConf)
val trustManager = SSLUtils.trustManagers(kubernetesConf)(0).asInstanceOf[X509TrustManager]
HttpClientUtil.createClient[T](Set(url), 5, sslContext.getSocketFactory, trustManager)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,37 @@ package org.apache.spark.deploy.kubernetes.integrationtest

import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import com.google.common.collect.ImmutableList
import com.google.common.util.concurrent.SettableFuture
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action
import org.scalatest.{BeforeAndAfter, DoNotDiscover}
import org.scalatest.concurrent.Eventually
import scala.collection.JavaConverters._

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.SSLUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.constants.{GCE_TEST_BACKEND, MINIKUBE_TEST_BACKEND}
import org.apache.spark.deploy.kubernetes.integrationtest.restapis.SparkRestApiV1
import org.apache.spark.deploy.kubernetes.submit.v1.{Client, ExternalSuppliedUrisDriverServiceManager}
import org.apache.spark.status.api.v1.{ApplicationStatus, StageStatus}
import org.apache.spark.util.Utils

@DoNotDiscover
private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter {
private[spark] class KubernetesV1Suite(testBackend: IntegrationTestBackend)
extends SparkFunSuite with BeforeAndAfter {

private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkConf: SparkConf = _

override def beforeAll(): Unit = {
kubernetesTestComponents = new KubernetesTestComponents()
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
kubernetesTestComponents.createNamespace()
}

Expand Down Expand Up @@ -85,7 +89,7 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
.get(0)
.getMetadata
.getName
Minikube.getService[SparkRestApiV1](serviceName,
kubernetesTestComponents.getService[SparkRestApiV1](serviceName,
kubernetesTestComponents.namespace, "spark-ui-port")
}

Expand Down Expand Up @@ -168,6 +172,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
}

test("Enable SSL on the driver submit server") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyStoreFile, trustStoreFile) = SSLUtils.generateKeyStoreTrustStorePair(
Minikube.getMinikubeIp,
"changeit",
Expand All @@ -188,6 +194,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
}

test("Enable SSL on the driver submit server using PEM files") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyPem, certPem) = SSLUtils.generateKeyCertPemPair(Minikube.getMinikubeIp)
sparkConf.set(DRIVER_SUBMIT_SSL_KEY_PEM, s"file://${keyPem.getAbsolutePath}")
sparkConf.set(DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM, s"file://${certPem.getAbsolutePath}")
Expand All @@ -201,6 +209,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
}

test("Added files should exist on the driver.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.set("spark.files", KubernetesSuite.TEST_EXISTENCE_FILE.getAbsolutePath)
sparkConf.setAppName("spark-file-existence-test")
val podCompletedFuture = SettableFuture.create[Boolean]
Expand Down Expand Up @@ -257,6 +267,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
}

test("Use external URI provider") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val externalUriProviderWatch =
new ExternalUriProviderWatch(kubernetesTestComponents.kubernetesClient)
Utils.tryWithResource(kubernetesTestComponents.kubernetesClient.services()
Expand Down Expand Up @@ -288,6 +300,8 @@ private[spark] class KubernetesV1Suite extends SparkFunSuite with BeforeAndAfter
}

test("Mount the Kubernetes credentials onto the driver pod") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.set(KUBERNETES_DRIVER_CA_CERT_FILE,
kubernetesTestComponents.clientConfig.getCaCertFile)
sparkConf.set(KUBERNETES_DRIVER_CLIENT_KEY_FILE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@ import java.util.UUID
import org.scalatest.{BeforeAndAfter, DoNotDiscover}
import org.scalatest.concurrent.Eventually

import org.apache.spark.{SparkConf, SparkFunSuite, SSLOptions}
import org.apache.spark._
import org.apache.spark.deploy.kubernetes.SSLUtils
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.integrationtest.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.backend.IntegrationTestBackend
import org.apache.spark.deploy.kubernetes.integrationtest.backend.minikube.Minikube
import org.apache.spark.deploy.kubernetes.integrationtest.constants.MINIKUBE_TEST_BACKEND
import org.apache.spark.deploy.kubernetes.submit.v2.{MountedDependencyManagerProviderImpl, SubmissionKubernetesClientProviderImpl}

@DoNotDiscover
private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter {
private[spark] class KubernetesV2Suite(testBackend: IntegrationTestBackend)
extends SparkFunSuite with BeforeAndAfter {

private val APP_LOCATOR_LABEL = UUID.randomUUID().toString.replaceAll("-", "")
private var kubernetesTestComponents: KubernetesTestComponents = _
private var sparkConf: SparkConf = _
private var resourceStagingServerLauncher: ResourceStagingServerLauncher = _

override def beforeAll(): Unit = {
kubernetesTestComponents = new KubernetesTestComponents
kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient)
resourceStagingServerLauncher = new ResourceStagingServerLauncher(
kubernetesTestComponents.kubernetesClient.inNamespace(kubernetesTestComponents.namespace))
}
Expand All @@ -54,11 +57,15 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter
}

test("Use submission v2.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

launchStagingServer(SSLOptions())
runSparkAppAndVerifyCompletion(KubernetesSuite.SUBMITTER_LOCAL_MAIN_APP_RESOURCE)
}

test("Enable SSL on the submission server") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val (keyStore, trustStore) = SSLUtils.generateKeyStoreTrustStorePair(
ipAddress = Minikube.getMinikubeIp,
keyStorePassword = "keyStore",
Expand All @@ -81,13 +88,17 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter
}

test("Use container-local resources without the resource staging server") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.setJars(Seq(
KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE,
KubernetesSuite.CONTAINER_LOCAL_HELPER_JAR_PATH))
runSparkAppAndVerifyCompletion(KubernetesSuite.CONTAINER_LOCAL_MAIN_APP_RESOURCE)
}

private def launchStagingServer(resourceStagingServerSslOptions: SSLOptions): Unit = {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

val resourceStagingServerPort = resourceStagingServerLauncher.launchStagingServer(
resourceStagingServerSslOptions)
val resourceStagingServerUriScheme = if (resourceStagingServerSslOptions.enabled) {
Expand All @@ -96,7 +107,8 @@ private[spark] class KubernetesV2Suite extends SparkFunSuite with BeforeAndAfter
"http"
}
sparkConf.set(RESOURCE_STAGING_SERVER_URI,
s"$resourceStagingServerUriScheme://${Minikube.getMinikubeIp}:$resourceStagingServerPort")
s"$resourceStagingServerUriScheme://" +
s"${Minikube.getMinikubeIp}:$resourceStagingServerPort")
}

private def runSparkAppAndVerifyCompletion(appResource: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.integrationtest

import java.io.{BufferedReader, InputStreamReader}
import java.util.concurrent.TimeUnit

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

object ProcessUtils extends Logging {
/**
* executeProcess is used to run a command and return the output if it
* completes within timeout seconds.
*/
def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = {
val pb = new ProcessBuilder().command(fullCommand: _*)
pb.redirectErrorStream(true)
val proc = pb.start()
val outputLines = new ArrayBuffer[String]

Utils.tryWithResource(new InputStreamReader(proc.getInputStream)) { procOutput =>
Utils.tryWithResource(new BufferedReader(procOutput)) { (bufferedOutput: BufferedReader) =>
var line: String = null
do {
line = bufferedOutput.readLine()
if (line != null) {
logInfo(line)
outputLines += line
}
} while (line != null)
}
}
assert(proc.waitFor(timeout, TimeUnit.SECONDS),
s"Timed out while executing ${fullCommand.mkString(" ")}")
assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}")
outputLines.toSeq
}
}
Loading

0 comments on commit 2c753de

Please sign in to comment.