Skip to content

Commit

Permalink
Driver submission with mounting dependencies from the staging server (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
mccheah authored and foxish committed Apr 25, 2017
1 parent 4940eae commit 04afcf8
Show file tree
Hide file tree
Showing 38 changed files with 2,932 additions and 616 deletions.
8 changes: 7 additions & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- End of shaded deps. -->

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
Expand All @@ -116,7 +118,11 @@
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
</dependency>
<!-- End of shaded deps. -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes.submit.v1
package org.apache.spark.deploy.kubernetes

import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream, InputStream, OutputStream}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

import com.google.common.io.Files
Expand Down Expand Up @@ -48,40 +48,7 @@ private[spark] object CompressionUtils extends Logging {
*/
def createTarGzip(paths: Iterable[String]): TarGzippedData = {
val compressedBytesStream = Utils.tryWithResource(new ByteBufferOutputStream()) { raw =>
Utils.tryWithResource(new GZIPOutputStream(raw)) { gzipping =>
Utils.tryWithResource(new TarArchiveOutputStream(
gzipping,
BLOCK_SIZE,
RECORD_SIZE,
ENCODING)) { tarStream =>
val usedFileNames = mutable.HashSet.empty[String]
for (path <- paths) {
val file = new File(path)
if (!file.isFile) {
throw new IllegalArgumentException(s"Cannot add $path to tarball; either does" +
s" not exist or is a directory.")
}
var resolvedFileName = file.getName
val extension = Files.getFileExtension(file.getName)
val nameWithoutExtension = Files.getNameWithoutExtension(file.getName)
var deduplicationCounter = 1
while (usedFileNames.contains(resolvedFileName)) {
val oldResolvedFileName = resolvedFileName
resolvedFileName = s"$nameWithoutExtension-$deduplicationCounter.$extension"
logWarning(s"File with name $oldResolvedFileName already exists. Trying to add" +
s" with file name $resolvedFileName instead.")
deduplicationCounter += 1
}
usedFileNames += resolvedFileName
val tarEntry = new TarArchiveEntry(file, resolvedFileName)
tarStream.putArchiveEntry(tarEntry)
Utils.tryWithResource(new FileInputStream(file)) { fileInput =>
IOUtils.copy(fileInput, tarStream)
}
tarStream.closeArchiveEntry()
}
}
}
writeTarGzipToStream(raw, paths)
raw
}
val compressedAsBase64 = Base64.encodeBase64String(compressedBytesStream.toByteBuffer.array)
Expand All @@ -93,6 +60,44 @@ private[spark] object CompressionUtils extends Logging {
)
}

def writeTarGzipToStream(outputStream: OutputStream, paths: Iterable[String]): Unit = {
Utils.tryWithResource(new GZIPOutputStream(outputStream)) { gzipping =>
Utils.tryWithResource(new TarArchiveOutputStream(
gzipping,
BLOCK_SIZE,
RECORD_SIZE,
ENCODING)) { tarStream =>
val usedFileNames = mutable.HashSet.empty[String]
for (path <- paths) {
val file = new File(path)
if (!file.isFile) {
throw new IllegalArgumentException(s"Cannot add $path to tarball; either does" +
s" not exist or is a directory.")
}
var resolvedFileName = file.getName
val extension = Files.getFileExtension(file.getName)
val nameWithoutExtension = Files.getNameWithoutExtension(file.getName)
var deduplicationCounter = 1
while (usedFileNames.contains(resolvedFileName)) {
val oldResolvedFileName = resolvedFileName
resolvedFileName = s"$nameWithoutExtension-$deduplicationCounter.$extension"
logWarning(s"File with name $oldResolvedFileName already exists. Trying to add" +
s" with file name $resolvedFileName instead.")
deduplicationCounter += 1
}
usedFileNames += resolvedFileName
val tarEntry = new TarArchiveEntry(resolvedFileName)
tarEntry.setSize(file.length());
tarStream.putArchiveEntry(tarEntry)
Utils.tryWithResource(new FileInputStream(file)) { fileInput =>
IOUtils.copy(fileInput, tarStream)
}
tarStream.closeArchiveEntry()
}
}
}
}

/**
* Decompresses the provided tar archive to a directory.
* @param compressedData In-memory representation of the compressed data, ideally created via
Expand All @@ -104,7 +109,6 @@ private[spark] object CompressionUtils extends Logging {
def unpackAndWriteCompressedFiles(
compressedData: TarGzippedData,
rootOutputDir: File): Seq[String] = {
val paths = mutable.Buffer.empty[String]
val compressedBytes = Base64.decodeBase64(compressedData.dataBase64)
if (!rootOutputDir.exists) {
if (!rootOutputDir.mkdirs) {
Expand All @@ -116,24 +120,39 @@ private[spark] object CompressionUtils extends Logging {
s"${rootOutputDir.getAbsolutePath} exists and is not a directory.")
}
Utils.tryWithResource(new ByteArrayInputStream(compressedBytes)) { compressedBytesStream =>
Utils.tryWithResource(new GZIPInputStream(compressedBytesStream)) { gzipped =>
Utils.tryWithResource(new TarArchiveInputStream(
gzipped,
compressedData.blockSize,
compressedData.recordSize,
compressedData.encoding)) { tarInputStream =>
var nextTarEntry = tarInputStream.getNextTarEntry
while (nextTarEntry != null) {
val outputFile = new File(rootOutputDir, nextTarEntry.getName)
Utils.tryWithResource(new FileOutputStream(outputFile)) { fileOutputStream =>
IOUtils.copy(tarInputStream, fileOutputStream)
}
paths += outputFile.getAbsolutePath
nextTarEntry = tarInputStream.getNextTarEntry
unpackTarStreamToDirectory(
compressedBytesStream,
rootOutputDir,
compressedData.blockSize,
compressedData.recordSize,
compressedData.encoding)
}
}

def unpackTarStreamToDirectory(
inputStream: InputStream,
outputDir: File,
blockSize: Int = BLOCK_SIZE,
recordSize: Int = RECORD_SIZE,
encoding: String = ENCODING): Seq[String] = {
val paths = mutable.Buffer.empty[String]
Utils.tryWithResource(new GZIPInputStream(inputStream)) { gzipped =>
Utils.tryWithResource(new TarArchiveInputStream(
gzipped,
blockSize,
recordSize,
encoding)) { tarInputStream =>
var nextTarEntry = tarInputStream.getNextTarEntry
while (nextTarEntry != null) {
val outputFile = new File(outputDir, nextTarEntry.getName)
Utils.tryWithResource(new FileOutputStream(outputFile)) { fileOutputStream =>
IOUtils.copy(tarInputStream, fileOutputStream)
}
paths += outputFile.getAbsolutePath
nextTarEntry = tarInputStream.getNextTarEntry
}
}
}
paths.toSeq
paths
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package org.apache.spark.deploy.kubernetes
import java.util.concurrent.TimeUnit

import org.apache.spark.{SPARK_VERSION => sparkVersion}
import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.v1.NodePortUrisDriverServiceManager
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

package object config {
package object config extends Logging {

private[spark] val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
Expand Down Expand Up @@ -321,4 +323,107 @@ package object config {
.doc("File containing the key password for the Kubernetes dependency server.")
.stringConf
.createOptional

private[spark] val RESOURCE_STAGING_SERVER_SSL_ENABLED =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.enabled")
.doc("Whether or not to use SSL when communicating with the dependency server.")
.booleanConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_FILE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStore")
.doc("File containing the trustStore to communicate with the Kubernetes dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_PASSWORD =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStorePassword")
.doc("Password for the trustStore for talking to the dependency server.")
.stringConf
.createOptional
private[spark] val RESOURCE_STAGING_SERVER_TRUSTSTORE_TYPE =
ConfigBuilder("spark.ssl.kubernetes.resourceStagingServer.trustStoreType")
.doc("Type of trustStore for communicating with the dependency server.")
.stringConf
.createOptional

// Driver and Init-Container parameters for submission v2
private[spark] val RESOURCE_STAGING_SERVER_URI =
ConfigBuilder("spark.kubernetes.resourceStagingServer.uri")
.doc("Base URI for the Spark resource staging server")
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_RESOURCE_IDENTIFIER =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsResourceIdentifier")
.doc("Identifier for the jars tarball that was uploaded to the staging service.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_LOCATION =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadJarsSecretLocation")
.doc("Location of the application secret to use when the init-container contacts the" +
" resource staging server to download jars.")
.internal()
.stringConf
.createWithDefault(INIT_CONTAINER_DOWNLOAD_JARS_SECRET_PATH)

private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_RESOURCE_IDENTIFIER =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesResourceIdentifier")
.doc("Identifier for the files tarball that was uploaded to the staging service.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_LOCATION =
ConfigBuilder("spark.kubernetes.driver.initcontainer.downloadFilesSecretLocation")
.doc("Location of the application secret to use when the init-container contacts the" +
" resource staging server to download files.")
.internal()
.stringConf
.createWithDefault(INIT_CONTAINER_DOWNLOAD_FILES_SECRET_PATH)

private[spark] val INIT_CONTAINER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.driver.initcontainer.docker.image")
.doc("Image for the driver's init-container that downloads mounted dependencies.")
.stringConf
.createWithDefault(s"spark-driver-init:$sparkVersion")

private[spark] val DRIVER_LOCAL_JARS_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.jarsDownloadDir")
.doc("Location to download local jars to in the driver. When using spark-submit, this" +
" directory must be empty and will be mounted as an empty directory volume on the" +
" driver pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-local-jars")

private[spark] val DRIVER_LOCAL_FILES_DOWNLOAD_LOCATION =
ConfigBuilder("spark.kubernetes.driver.mountdependencies.filesDownloadDir")
.doc("Location to download local files to in the driver. When using spark-submit, this" +
" directory must be empty and will be mounted as an empty directory volume on the" +
" driver pod.")
.stringConf
.createWithDefault("/var/spark-data/spark-local-files")

private[spark] val DRIVER_MOUNT_DEPENDENCIES_INIT_TIMEOUT =
ConfigBuilder("spark.kubernetes.mountdependencies.mountTimeout")
.doc("Timeout before aborting the attempt to download and unpack local dependencies from" +
" the dependency staging server when initializing the driver pod.")
.timeConf(TimeUnit.MINUTES)
.createWithDefault(5)

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
}
val masterWithoutK8sPrefix = rawMasterString.replaceFirst("k8s://", "")
if (masterWithoutK8sPrefix.startsWith("http://")
|| masterWithoutK8sPrefix.startsWith("https://")) {
masterWithoutK8sPrefix
} else {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logDebug(s"No scheme specified for kubernetes master URL, so defaulting to https. Resolved" +
s" URL is $resolvedURL")
resolvedURL
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ package object constants {
private[spark] val SUBMISSION_APP_SECRET_PREFIX = "spark-submission-server-secret"
private[spark] val SUBMISSION_APP_SECRET_VOLUME_NAME = "spark-submission-secret-volume"
private[spark] val SUBMISSION_SSL_KEY_PASSWORD_SECRET_NAME =
"spark-submission-server-key-password"
"spark-submission-server-key-password"
private[spark] val SUBMISSION_SSL_KEYSTORE_PASSWORD_SECRET_NAME =
"spark-submission-server-keystore-password"
"spark-submission-server-keystore-password"
private[spark] val SUBMISSION_SSL_KEYSTORE_SECRET_NAME = "spark-submission-server-keystore"
private[spark] val SUBMISSION_SSL_SECRETS_PREFIX = "spark-submission-server-ssl"
private[spark] val SUBMISSION_SSL_SECRETS_VOLUME_NAME = "spark-submission-server-ssl-secrets"
Expand All @@ -55,9 +55,9 @@ package object constants {
private[spark] val ENV_SUBMISSION_SERVER_PORT = "SPARK_SUBMISSION_SERVER_PORT"
private[spark] val ENV_SUBMISSION_KEYSTORE_FILE = "SPARK_SUBMISSION_KEYSTORE_FILE"
private[spark] val ENV_SUBMISSION_KEYSTORE_PASSWORD_FILE =
"SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"
"SPARK_SUBMISSION_KEYSTORE_PASSWORD_FILE"
private[spark] val ENV_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE =
"SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"
"SPARK_SUBMISSION_KEYSTORE_KEY_PASSWORD_FILE"
private[spark] val ENV_SUBMISSION_KEYSTORE_TYPE = "SPARK_SUBMISSION_KEYSTORE_TYPE"
private[spark] val ENV_SUBMISSION_KEY_PEM_FILE = "SPARK_SUBMISSION_KEY_PEM_FILE"
private[spark] val ENV_SUBMISSION_CERT_PEM_FILE = "SPARK_SUBMISSION_CERT_PEM_FILE"
Expand All @@ -70,17 +70,47 @@ package object constants {
private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
private[spark] val ENV_DRIVER_MEMORY = "SPARK_DRIVER_MEMORY"
private[spark] val ENV_UPLOADED_JARS_DIR = "SPARK_UPLOADED_JARS_DIR"
private[spark] val ENV_SUBMIT_EXTRA_CLASSPATH = "SPARK_SUBMIT_EXTRA_CLASSPATH"
private[spark] val ENV_MOUNTED_CLASSPATH = "SPARK_MOUNTED_CLASSPATH"
private[spark] val ENV_DRIVER_MAIN_CLASS = "SPARK_DRIVER_CLASS"
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"

// Annotation keys
private[spark] val ANNOTATION_PROVIDE_EXTERNAL_URI =
"spark-job.alpha.apache.org/provideExternalUri"
"spark-job.alpha.apache.org/provideExternalUri"
private[spark] val ANNOTATION_RESOLVED_EXTERNAL_URI =
"spark-job.alpha.apache.org/resolvedExternalUri"
"spark-job.alpha.apache.org/resolvedExternalUri"

// Miscellaneous
private[spark] val DRIVER_CONTAINER_NAME = "spark-kubernetes-driver"
private[spark] val DRIVER_SUBMIT_SSL_NAMESPACE = "kubernetes.driversubmitserver"
private[spark] val KUBERNETES_MASTER_INTERNAL_URL = "https://kubernetes.default.svc"
private[spark] val MEMORY_OVERHEAD_FACTOR = 0.10
private[spark] val MEMORY_OVERHEAD_MIN = 384L

// V2 submission init container
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
private[spark] val INIT_CONTAINER_SECRETS_VOLUME_NAME = "dependency-secret"
private[spark] val INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH = "/mnt/secrets/spark-init"
private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_KEY = "downloadJarsSecret"
private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_KEY = "downloadFilesSecret"
private[spark] val INIT_CONTAINER_TRUSTSTORE_SECRET_KEY = "trustStore"
private[spark] val INIT_CONTAINER_DOWNLOAD_JARS_SECRET_PATH =
s"$INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH/$INIT_CONTAINER_DOWNLOAD_JARS_SECRET_KEY"
private[spark] val INIT_CONTAINER_DOWNLOAD_FILES_SECRET_PATH =
s"$INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH/$INIT_CONTAINER_DOWNLOAD_FILES_SECRET_KEY"
private[spark] val INIT_CONTAINER_TRUSTSTORE_PATH =
s"$INIT_CONTAINER_SECRETS_VOLUME_MOUNT_PATH/$INIT_CONTAINER_TRUSTSTORE_SECRET_KEY"
private[spark] val INIT_CONTAINER_DOWNLOAD_CREDENTIALS_PATH =
"/mnt/secrets/kubernetes-credentials"
private[spark] val INIT_CONTAINER_CONFIG_MAP_KEY = "init-driver"
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_VOLUME = "init-container-properties"
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH = "/etc/spark-init/"
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_NAME = "init-driver.properties"
private[spark] val INIT_CONTAINER_PROPERTIES_FILE_PATH =
s"$INIT_CONTAINER_PROPERTIES_FILE_MOUNT_PATH/$INIT_CONTAINER_PROPERTIES_FILE_NAME"
private[spark] val DOWNLOAD_JARS_VOLUME_NAME = "download-jars"
private[spark] val DOWNLOAD_FILES_VOLUME_NAME = "download-files"
}
Loading

0 comments on commit 04afcf8

Please sign in to comment.