Skip to content

Commit

Permalink
Replace submission v1 with submission v2. (apache#286)
Browse files Browse the repository at this point in the history
* Replace submission v1 with submission v2.

* Address documentation changes.

* Fix documentation
  • Loading branch information
mccheah authored and ash211 committed May 23, 2017
1 parent 408c65f commit 8f3d965
Show file tree
Hide file tree
Showing 67 changed files with 668 additions and 3,324 deletions.
63 changes: 63 additions & 0 deletions conf/kubernetes-resource-staging-server.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
---
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: spark-resource-staging-server
spec:
replicas: 1
template:
metadata:
labels:
resource-staging-server-instance: default
spec:
volumes:
- name: resource-staging-server-properties
configMap:
name: spark-resource-staging-server-config
containers:
- name: spark-resource-staging-server
image: kubespark/spark-resource-staging-server:v2.1.0-kubernetes-0.1.0-alpha.3
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 100m
memory: 256Mi
volumeMounts:
- name: resource-staging-server-properties
mountPath: '/etc/spark-resource-staging-server'
args:
- '/etc/spark-resource-staging-server/resource-staging-server.properties'
---
apiVersion: v1
kind: ConfigMap
metadata:
name: spark-resource-staging-server-config
data:
resource-staging-server.properties: |
spark.kubernetes.resourceStagingServer.port=10000
spark.ssl.kubernetes.resourceStagingServer.enabled=false
# Other possible properties are listed below, primarily for setting up TLS. The paths given by KeyStore, password, and PEM files here should correspond to
# files that are securely mounted into the resource staging server container, via e.g. secret volumes.
# spark.ssl.kubernetes.resourceStagingServer.keyStore=/mnt/secrets/resource-staging-server/keyStore.jks
# spark.ssl.kubernetes.resourceStagingServer.keyStorePassword=changeit
# spark.ssl.kubernetes.resourceStagingServer.keyPassword=changeit
# spark.ssl.kubernetes.resourceStagingServer.keyStorePasswordFile=/mnt/secrets/resource-staging-server/keystore-password.txt
# spark.ssl.kubernetes.resourceStagingServer.keyPasswordFile=/mnt/secrets/resource-staging-server/keystore-key-password.txt
# spark.ssl.kubernetes.resourceStagingServer.keyPem=/mnt/secrets/resource-staging-server/key.pem
# spark.ssl.kubernetes.resourceStagingServer.serverCertPem=/mnt/secrets/resource-staging-server/cert.pem
---
apiVersion: v1
kind: Service
metadata:
name: spark-resource-staging-service
spec:
type: NodePort
selector:
resource-staging-server-instance: default
ports:
- protocol: TCP
port: 10000
targetPort: 10000
nodePort: 31000
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ object SparkSubmit {
}

if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.submit.v1.Client"
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
childArgs += args.primaryResource
childArgs += args.mainClass
childArgs ++= args.childArgs
Expand Down
1 change: 0 additions & 1 deletion dev/.rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,3 @@ org.apache.spark.scheduler.ExternalClusterManager
org.apache.spark.deploy.yarn.security.ServiceCredentialProvider
spark-warehouse
structured-streaming/*
org.apache.spark.deploy.kubernetes.submit.v1.DriverServiceManager
416 changes: 266 additions & 150 deletions docs/running-on-kubernetes.md

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,24 @@
*/
package org.apache.spark.deploy.kubernetes

import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream, InputStream, OutputStream}
import java.io.{File, FileInputStream, FileOutputStream, InputStream, OutputStream}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import com.google.common.io.Files
import org.apache.commons.codec.binary.Base64
import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveInputStream, TarArchiveOutputStream}
import org.apache.commons.compress.utils.CharsetNames
import org.apache.commons.io.IOUtils
import scala.collection.mutable

import org.apache.spark.deploy.rest.kubernetes.v1.TarGzippedData
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ByteBufferOutputStream, Utils}
import org.apache.spark.util.Utils

private[spark] object CompressionUtils extends Logging {
// Defaults from TarArchiveOutputStream
private val BLOCK_SIZE = 10240
private val RECORD_SIZE = 512
private val ENCODING = CharsetNames.UTF_8

/**
* Compresses all of the given paths into a gzipped-tar archive, returning the compressed data in
* memory as an instance of {@link TarGzippedData}. The files are taken without consideration to
* their original folder structure, and are added to the tar archive in a flat hierarchy.
* Directories are not allowed, and duplicate file names are de-duplicated by appending a numeric
* suffix to the file name, before the file extension. For example, if paths a/b.txt and b/b.txt
* were provided, then the files added to the tar archive would be b.txt and b-1.txt.
* @param paths A list of file paths to be archived
* @return An in-memory representation of the compressed data.
*/
def createTarGzip(paths: Iterable[String]): TarGzippedData = {
val compressedBytesStream = Utils.tryWithResource(new ByteBufferOutputStream()) { raw =>
writeTarGzipToStream(raw, paths)
raw
}
val compressedAsBase64 = Base64.encodeBase64String(compressedBytesStream.toByteBuffer.array)
TarGzippedData(
dataBase64 = compressedAsBase64,
blockSize = BLOCK_SIZE,
recordSize = RECORD_SIZE,
encoding = ENCODING
)
}

def writeTarGzipToStream(outputStream: OutputStream, paths: Iterable[String]): Unit = {
Utils.tryWithResource(new GZIPOutputStream(outputStream)) { gzipping =>
Utils.tryWithResource(new TarArchiveOutputStream(
Expand Down Expand Up @@ -98,50 +72,14 @@ private[spark] object CompressionUtils extends Logging {
}
}

/**
* Decompresses the provided tar archive to a directory.
* @param compressedData In-memory representation of the compressed data, ideally created via
* {@link createTarGzip}.
* @param rootOutputDir Directory to write the output files to. All files from the tarball
* are written here in a flat hierarchy.
* @return List of file paths for each file that was unpacked from the archive.
*/
def unpackAndWriteCompressedFiles(
compressedData: TarGzippedData,
rootOutputDir: File): Seq[String] = {
val compressedBytes = Base64.decodeBase64(compressedData.dataBase64)
if (!rootOutputDir.exists) {
if (!rootOutputDir.mkdirs) {
throw new IllegalStateException(s"Failed to create output directory for unpacking" +
s" files at ${rootOutputDir.getAbsolutePath}")
}
} else if (rootOutputDir.isFile) {
throw new IllegalArgumentException(s"Root dir for writing decompressed files: " +
s"${rootOutputDir.getAbsolutePath} exists and is not a directory.")
}
Utils.tryWithResource(new ByteArrayInputStream(compressedBytes)) { compressedBytesStream =>
unpackTarStreamToDirectory(
compressedBytesStream,
rootOutputDir,
compressedData.blockSize,
compressedData.recordSize,
compressedData.encoding)
}
}

def unpackTarStreamToDirectory(
inputStream: InputStream,
outputDir: File,
blockSize: Int = BLOCK_SIZE,
recordSize: Int = RECORD_SIZE,
encoding: String = ENCODING): Seq[String] = {
def unpackTarStreamToDirectory(inputStream: InputStream, outputDir: File): Seq[String] = {
val paths = mutable.Buffer.empty[String]
Utils.tryWithResource(new GZIPInputStream(inputStream)) { gzipped =>
Utils.tryWithResource(new TarArchiveInputStream(
gzipped,
blockSize,
recordSize,
encoding)) { tarInputStream =>
BLOCK_SIZE,
RECORD_SIZE,
ENCODING)) { tarInputStream =>
var nextTarEntry = tarInputStream.getNextTarEntry
while (nextTarEntry != null) {
val outputFile = new File(outputDir, nextTarEntry.getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.kubernetes
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.v2.{ContainerNameEqualityPredicate, InitContainerUtil}
import org.apache.spark.deploy.kubernetes.submit.{ContainerNameEqualityPredicate, InitContainerUtil}

private[spark] trait SparkPodInitContainerBootstrap {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.util.concurrent.TimeUnit

import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.v1.NodePortUrisDriverServiceManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
Expand Down Expand Up @@ -212,77 +211,6 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_TIMEOUT =
ConfigBuilder("spark.kubernetes.driverSubmissionTimeout")
.doc("Time to wait for the driver process to start running before aborting its execution.")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(60L)

private[spark] val KUBERNETES_DRIVER_SUBMIT_SSL_KEYSTORE =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.keyStore")
.doc("KeyStore file for the driver submission server listening on SSL. Can be pre-mounted" +
" on the driver container or uploaded from the submitting client.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_SSL_TRUSTSTORE =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.trustStore")
.doc("TrustStore containing certificates for communicating to the driver submission server" +
" over SSL.")
.stringConf
.createOptional

private[spark] val DRIVER_SUBMIT_SSL_ENABLED =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.enabled")
.doc("Whether or not to use SSL when sending the application dependencies to the driver pod.")
.booleanConf
.createWithDefault(false)

private[spark] val DRIVER_SUBMIT_SSL_KEY_PEM =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.keyPem")
.doc("Key PEM file that the driver submission server will use when setting up TLS" +
" connections. Can be pre-mounted on the driver pod's disk or uploaded from the" +
" submitting client's machine.")
.stringConf
.createOptional

private[spark] val DRIVER_SUBMIT_SSL_SERVER_CERT_PEM =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.serverCertPem")
.doc("Certificate PEM file that is associated with the key PEM file" +
" the submission server uses to set up TLS connections. Can be pre-mounted" +
" on the driver pod's disk or uploaded from the submitting client's machine.")
.stringConf
.createOptional

private[spark] val DRIVER_SUBMIT_SSL_CLIENT_CERT_PEM =
ConfigBuilder("spark.ssl.kubernetes.driversubmitserver.clientCertPem")
.doc("Certificate pem file that the submission client uses to connect to the submission" +
" server over TLS. This should often be the same as the server certificate, but can be" +
" different if the submission client will contact the driver through a proxy instead of" +
" the driver service directly.")
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SERVICE_NAME =
ConfigBuilder("spark.kubernetes.driver.service.name")
.doc("Kubernetes service that exposes the driver pod for external access.")
.internal()
.stringConf
.createOptional

private[spark] val KUBERNETES_DRIVER_SUBMIT_SERVER_MEMORY =
ConfigBuilder("spark.kubernetes.driver.submissionServerMemory")
.doc("The amount of memory to allocate for the driver submission server.")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("256m")

private[spark] val EXPOSE_KUBERNETES_DRIVER_SERVICE_UI_PORT =
ConfigBuilder("spark.kubernetes.driver.service.exposeUiPort")
.doc("Whether to expose the driver Web UI port as a service NodePort. Turned off by default" +
" because NodePort is a limited resource. Use alternatives if possible.")
.booleanConf
.createWithDefault(false)

private[spark] val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
Expand Down Expand Up @@ -327,13 +255,6 @@ package object config extends Logging {
.longConf
.createWithDefault(1)

private[spark] val DRIVER_SERVICE_MANAGER_TYPE =
ConfigBuilder("spark.kubernetes.driver.serviceManagerType")
.doc("A tag indicating which class to use for creating the Kubernetes service and" +
" determining its URI for the submission client.")
.stringConf
.createWithDefault(NodePortUrisDriverServiceManager.TYPE)

private[spark] val WAIT_FOR_APP_COMPLETION =
ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
.doc("In cluster mode, whether to wait for the application to finish before exiting the" +
Expand All @@ -347,8 +268,7 @@ package object config extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("1s")

// Spark dependency server for submission v2

// Spark resource staging server.
private[spark] val RESOURCE_STAGING_SERVER_PORT =
ConfigBuilder("spark.kubernetes.resourceStagingServer.port")
.doc("Port for the Kubernetes resource staging server to listen on.")
Expand Down Expand Up @@ -451,7 +371,7 @@ package object config extends Logging {
.stringConf
.createOptional

// Driver and Init-Container parameters for submission v2
// Driver and Init-Container parameters
private[spark] val RESOURCE_STAGING_SERVER_URI =
ConfigBuilder("spark.kubernetes.resourceStagingServer.uri")
.doc("Base URI for the Spark resource staging server.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import java.io.File
import java.util.Collections
Expand All @@ -25,8 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.{LoggingPodStatusWatcher, LoggingPodStatusWatcherImpl}
import org.apache.spark.deploy.rest.kubernetes.v2.ResourceStagingServerSslOptionsProviderImpl
import org.apache.spark.deploy.rest.kubernetes.ResourceStagingServerSslOptionsProviderImpl
import org.apache.spark.internal.Logging
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.Utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import java.io.File

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import java.lang.Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import org.apache.spark.{SparkConf, SSLOptions}
import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl}
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.rest.kubernetes.v2.RetrofitClientFactoryImpl
import org.apache.spark.deploy.rest.kubernetes.RetrofitClientFactoryImpl
import org.apache.spark.util.Utils

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import io.fabric8.kubernetes.api.model.{PodBuilder, Secret, SecretBuilder}
import scala.collection.JavaConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v2
package org.apache.spark.deploy.kubernetes.submit

import org.apache.spark.SparkConf
import org.apache.spark.deploy.kubernetes.config._
import org.apache.spark.deploy.kubernetes.submit.DriverPodKubernetesCredentialsProvider

private[spark] trait DriverPodKubernetesCredentialsMounterProvider {

Expand Down
Loading

0 comments on commit 8f3d965

Please sign in to comment.