Skip to content

Commit

Permalink
A number of small tweaks to the MVP. (apache#23)
Browse files Browse the repository at this point in the history
* A number of small tweaks to the MVP.

- Master protocol defaults to https if not specified
- Removed upload driver extra classpath functionality
- Added ability to specify main app resource with container:// URI
- Updated docs to reflect all of the above
- Add examples to Docker images, mostly for integration testing but
could be useful for easily getting started without shipping anything

* Add example to documentation.
  • Loading branch information
mccheah authored and foxish committed Jul 24, 2017
1 parent f71abc1 commit 95747bc
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 142 deletions.
49 changes: 24 additions & 25 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ are set up as described above:
--conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
examples/jars/spark_examples_2.11-2.2.0.jar

<!-- TODO master should default to https if no scheme is specified -->
The Spark master, specified either via passing the `--master` command line argument to `spark-submit` or by setting
`spark.master` in the application's configuration, must be a URL with the format `k8s://<api_server_url>`. Prefixing the
master string with `k8s://` will cause the Spark application to launch on the Kubernetes cluster, with the API server
being contacted at `api_server_url`. The HTTP protocol must also be specified.
being contacted at `api_server_url`. If no HTTP protocol is specified in the URL, it defaults to `https`. For example,
setting the master to `k8s://example.com:443` is equivalent to setting it to `k8s://https://example.com:443`, but to
connect without SSL on a different port, the master would be set to `k8s://http://example.com:8443`.

Note that applications can currently only be executed in cluster mode, where the driver and its executors are running on
the cluster.
Expand All @@ -58,17 +59,18 @@ disk of the submitter's machine. These two types of dependencies are specified v
`spark-submit`:

* Local jars provided by specifying the `--jars` command line argument to `spark-submit`, or by setting `spark.jars` in
the application's configuration, will be treated as jars that are located on the *disk of the driver Docker
container*. This only applies to jar paths that do not specify a scheme or that have the scheme `file://`. Paths with
other schemes are fetched from their appropriate locations.
the application's configuration, will be treated as jars that are located on the *disk of the driver container*. This
only applies to jar paths that do not specify a scheme or that have the scheme `file://`. Paths with other schemes are
fetched from their appropriate locations.
* Local jars provided by specifying the `--upload-jars` command line argument to `spark-submit`, or by setting
`spark.kubernetes.driver.uploads.jars` in the application's configuration, will be treated as jars that are located on
the *disk of the submitting machine*. These jars are uploaded to the driver docker container before executing the
application.
<!-- TODO support main resource bundled in the Docker image -->
* A main application resource path that does not have a scheme or that has the scheme `file://` is assumed to be on the
*disk of the submitting machine*. This resource is uploaded to the driver docker container before executing the
application. A remote path can still be specified and the resource will be fetched from the appropriate location.
* A main application resource path that has the scheme `container://` is assumed to be on the *disk of the driver
container*.

In all of these cases, the jars are placed on the driver's classpath, and are also sent to the executors. Below are some
examples of providing application dependencies.
Expand All @@ -78,8 +80,7 @@ To submit an application with both the main resource and two other jars living o
bin/spark-submit \
--deploy-mode cluster \
--class com.example.applications.SampleApplication \
--master k8s://https://192.168.99.100 \
--kubernetes-namespace default \
--master k8s://192.168.99.100 \
--upload-jars /home/exampleuser/exampleapplication/dep1.jar,/home/exampleuser/exampleapplication/dep2.jar \
--conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver:latest \
--conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
Expand All @@ -91,8 +92,7 @@ Note that since passing the jars through the `--upload-jars` command line argume
bin/spark-submit \
--deploy-mode cluster \
--class com.example.applications.SampleApplication \
--master k8s://https://192.168.99.100 \
--kubernetes-namespace default \
--master k8s://192.168.99.100 \
--conf spark.kubernetes.driver.uploads.jars=/home/exampleuser/exampleapplication/dep1.jar,/home/exampleuser/exampleapplication/dep2.jar \
--conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver:latest \
--conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
Expand All @@ -104,8 +104,7 @@ is located in the jar `/opt/spark-plugins/app-plugin.jar` on the docker image's
bin/spark-submit \
--deploy-mode cluster \
--class com.example.applications.PluggableApplication \
--master k8s://https://192.168.99.100 \
--kubernetes-namespace default \
--master k8s://192.168.99.100 \
--jars /opt/spark-plugins/app-plugin.jar \
--conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \
--conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
Expand All @@ -117,13 +116,22 @@ Spark property, the above will behave identically to this command:
bin/spark-submit \
--deploy-mode cluster \
--class com.example.applications.PluggableApplication \
--master k8s://https://192.168.99.100 \
--kubernetes-namespace default \
--master k8s://192.168.99.100 \
--conf spark.jars=file:///opt/spark-plugins/app-plugin.jar \
--conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \
--conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
http://example.com:8080/applications/sparkpluggable/app.jar
To specify a main application resource that is in the Docker image, and if it has no other dependencies:

bin/spark-submit \
--deploy-mode cluster \
--class com.example.applications.PluggableApplication \
--master k8s://192.168.99.100:8443 \
--conf spark.kubernetes.driver.docker.image=registry-host:5000/spark-driver-custom:latest \
--conf spark.kubernetes.executor.docker.image=registry-host:5000/spark-executor:latest \
container:///home/applications/examples/example.jar

### Spark Properties

Below are some other common properties that are specific to Kubernetes. Most of the other configurations are the same
Expand All @@ -133,10 +141,9 @@ from the other deployment modes. See the [configuration page](configuration.html
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
<tr>
<td><code>spark.kubernetes.namespace</code></td>
<!-- TODO set default to "default" -->
<td>(none)</td>
<td><code>default</code></td>
<td>
The namespace that will be used for running the driver and executor pods. Must be specified. When using
The namespace that will be used for running the driver and executor pods. When using
<code>spark-submit</code> in cluster mode, this can also be passed to <code>spark-submit</code> via the
<code>--kubernetes-namespace</code> command line argument.
</td>
Expand Down Expand Up @@ -196,14 +203,6 @@ from the other deployment modes. See the [configuration page](configuration.html
mode. Refer to <a href="running-on-kubernetes.html#adding-other-jars">adding other jars</a> for more information.
</td>
</tr>
<tr>
<!-- TODO remove this functionality -->
<td><code>spark.kubernetes.driver.uploads.driverExtraClasspath</code></td>
<td>(none)</td>
<td>
Comma-separated list of jars to be sent to the driver only when submitting the application in cluster mode.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import scala.concurrent.duration.DurationInt
import scala.util.Success

import org.apache.spark.{SPARK_VERSION, SparkConf, SparkException}
import org.apache.spark.deploy.rest.{AppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource}
import org.apache.spark.deploy.rest.{AppResource, ContainerAppResource, KubernetesCreateSubmissionRequest, RemoteAppResource, TarGzippedData, UploadedAppResource}
import org.apache.spark.deploy.rest.kubernetes._
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils
Expand All @@ -47,13 +47,8 @@ private[spark] class Client(
appArgs: Array[String]) extends Logging {
import Client._

private val namespace = sparkConf.getOption("spark.kubernetes.namespace").getOrElse(
throw new IllegalArgumentException("Namespace must be provided in spark.kubernetes.namespace"))
private val rawMaster = sparkConf.get("spark.master")
if (!rawMaster.startsWith("k8s://")) {
throw new IllegalArgumentException("Master should be a URL with scheme k8s://")
}
private val master = rawMaster.replaceFirst("k8s://", "")
private val namespace = sparkConf.get("spark.kubernetes.namespace", "default")
private val master = resolveK8sMaster(sparkConf.get("spark.master"))

private val launchTime = System.currentTimeMillis
private val appName = sparkConf.getOption("spark.app.name")
Expand All @@ -64,8 +59,6 @@ private[spark] class Client(
private val driverLauncherSelectorValue = s"driver-launcher-$launchTime"
private val driverDockerImage = sparkConf.get(
"spark.kubernetes.driver.docker.image", s"spark-driver:$SPARK_VERSION")
private val uploadedDriverExtraClasspath = sparkConf
.getOption("spark.kubernetes.driver.uploads.driverExtraClasspath")
private val uploadedJars = sparkConf.getOption("spark.kubernetes.driver.uploads.jars")

private val secretBase64String = {
Expand Down Expand Up @@ -112,12 +105,15 @@ private[spark] class Client(
.withType("Opaque")
.done()
try {
val resolvedSelectors = (Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue)
val resolvedSelectors = (Map(
DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue,
SPARK_APP_NAME_LABEL -> appName)
++ parsedCustomLabels).asJava
val (servicePorts, containerPorts) = configurePorts()
val service = kubernetesClient.services().createNew()
.withNewMetadata()
.withName(kubernetesAppId)
.withLabels(Map(SPARK_APP_NAME_LABEL -> appName).asJava)
.endMetadata()
.withNewSpec()
.withSelector(resolvedSelectors)
Expand Down Expand Up @@ -355,18 +351,17 @@ private[spark] class Client(
val fileBytes = Files.toByteArray(appFile)
val fileBase64 = Base64.encodeBase64String(fileBytes)
UploadedAppResource(resourceBase64Contents = fileBase64, name = appFile.getName)
case "container" => ContainerAppResource(appResourceUri.getPath)
case other => RemoteAppResource(other)
}

val uploadDriverExtraClasspathBase64Contents = compressJars(uploadedDriverExtraClasspath)
val uploadJarsBase64Contents = compressJars(uploadedJars)
KubernetesCreateSubmissionRequest(
appResource = resolvedAppResource,
mainClass = mainClass,
appArgs = appArgs,
secret = secretBase64String,
sparkProperties = sparkConf.getAll.toMap,
uploadedDriverExtraClasspathBase64Contents = uploadDriverExtraClasspathBase64Contents,
uploadedJarsBase64Contents = uploadJarsBase64Contents)
}

Expand Down Expand Up @@ -414,7 +409,7 @@ private[spark] class Client(
}
}

private object Client {
private[spark] object Client extends Logging {

private val SUBMISSION_SERVER_SECRET_NAME = "spark-submission-server-secret"
private val DRIVER_LAUNCHER_SELECTOR_LABEL = "driver-launcher-selector"
Expand All @@ -430,6 +425,7 @@ private object Client {
private val SECURE_RANDOM = new SecureRandom()
private val SPARK_SUBMISSION_SECRET_BASE_DIR = "/var/run/secrets/spark-submission"
private val LAUNCH_TIMEOUT_SECONDS = 30
private val SPARK_APP_NAME_LABEL = "spark-app-name"

def main(args: Array[String]): Unit = {
require(args.length >= 2, s"Too few arguments. Usage: ${getClass.getName} <mainAppResource>" +
Expand All @@ -444,4 +440,20 @@ private object Client {
sparkConf = sparkConf,
appArgs = appArgs).run()
}

def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
}
val masterWithoutK8sPrefix = rawMasterString.replaceFirst("k8s://", "")
if (masterWithoutK8sPrefix.startsWith("http://")
|| masterWithoutK8sPrefix.startsWith("https://")) {
masterWithoutK8sPrefix
} else {
val resolvedURL = s"https://$masterWithoutK8sPrefix"
logDebug(s"No scheme specified for kubernetes master URL, so defaulting to https. Resolved" +
s" URL is $resolvedURL")
resolvedURL
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ case class KubernetesCreateSubmissionRequest(
val appArgs: Array[String],
val sparkProperties: Map[String, String],
val secret: String,
val uploadedDriverExtraClasspathBase64Contents: Option[TarGzippedData],
val uploadedJarsBase64Contents: Option[TarGzippedData]) extends SubmitRestProtocolRequest {
message = "create"
clientSparkVersion = SPARK_VERSION
Expand All @@ -46,13 +45,16 @@ case class TarGzippedData(
property = "type")
@JsonSubTypes(value = Array(
new JsonSubTypes.Type(value = classOf[UploadedAppResource], name = "UploadedAppResource"),
new JsonSubTypes.Type(value = classOf[ContainerAppResource], name = "ContainerLocalAppResource"),
new JsonSubTypes.Type(value = classOf[RemoteAppResource], name = "RemoteAppResource")))
abstract class AppResource

case class UploadedAppResource(
resourceBase64Contents: String,
name: String = "spark-app-resource") extends AppResource

case class ContainerAppResource(resourcePath: String) extends AppResource

case class RemoteAppResource(resource: String) extends AppResource

class PingResponse extends SubmitRestProtocolResponse {
Expand Down
Loading

0 comments on commit 95747bc

Please sign in to comment.