Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIVY-246. Support multiple Spark environments in Livy #232

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions bin/livy-server
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ start_livy_server() {

LIVY_CLASSPATH="$LIBDIR/*:$LIVY_CONF_DIR"

if [ -n "$SPARK_CONF_DIR" ]; then
LIVY_CLASSPATH="$LIVY_CLASSPATH:$SPARK_CONF_DIR"
fi
if [ -n "$HADOOP_CONF_DIR" ]; then
LIVY_CLASSPATH="$LIVY_CLASSPATH:$HADOOP_CONF_DIR"
fi
Expand Down
39 changes: 39 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,48 @@

# How often Livy polls YARN to refresh YARN app state.
# livy.server.yarn.poll-interval = 1s
<<<<<<< b5ef8c0df0a0fd6c64a36ee33ae30a2a2ee6e89f:conf/livy.conf.template
#
# Days to keep Livy server request logs.
# livy.server.request-log-retain.days = 5

# If the Livy Web UI should be included in the Livy Server. Enabled by default.
# livy.ui.enabled = true
=======

# Define Spark environments in Livy Server. User could pre-define multiple Spark environments, and
# pick one environment in the run-time via session creation request.
#
# A Spark enviroment is combined by several configurations:
# livy.server.spark-env.${sparkEnv}.spark-home = <SPARK_HOME>
# livy.server.spark-env.${sparkEnv}.spark-conf-dir = <SPARK_CONF_DIR>
# livy.server.spark-env.${sparkEnv}.scalaVersion = <SPARK_CONF_DIR>
# whether to enable HiveContext in interpreter session, by default is false.
# livy.server.spark-env.${sparkEnv}.enableHiveContext = false
# livy.server.spark-env.${sparkEnv}.sparkr.package = <PATH_TO_SPARKR_PACKAGE>
# livy.server.spark-env.${sparkEnv}.pyspark.archives = <PATH_TO_PYSPARK_ARCHIVES>
#
# Only livy.server.spark-env.${sparkEnv}.spark-home is required, others can be inferred from
# provided spark-home.
#
# Environement variables like SPARK_HOME, SPARK_CONF_DIR can still be used and the value will be
# merged into "default" environment.
#
# User can also define "${SPARK_ENV}_SPARK_HOME" and "${SPARK_ENV}_SPARK_CONF_DIR", and these values
# will merged with ${sparkEnv} environment.
#
# ${sparkEnv} can be replaced with any name wanted. When creating a session, user could specify the
# name of Spark environment, Livy server internally will pick right Spark environment accordingly,
# by default "default" spark environment will be pick if not specify.
#
# For the backward compatibility, all the previous configurations:

# livy.server.spark-home
# livy.server.spark-conf-dir
# livy.spark.scalaVersion
# livy.repl.enableHiveContext
# livy.sparkr.package
# livy.pyspark.archives
#
# can still be used and will automatically be merged into "default" Spark environment.
>>>>>>> Add unit tests and change docs and scripts:conf/livy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,13 @@ object MiniLivyMain extends MiniClusterBase {
var livyConf = Map(
LivyConf.LIVY_SPARK_MASTER.key -> "yarn",
LivyConf.LIVY_SPARK_DEPLOY_MODE.key -> "cluster",
LivyConf.LIVY_SPARK_SCALA_VERSION.key -> getSparkScalaVersion(),
LivyConf.HEARTBEAT_WATCHDOG_INTERVAL.key -> "1s",
"livy.server.spark-env.default.scala-version" -> getSparkScalaVersion(),
LivyConf.YARN_POLL_INTERVAL.key -> "500ms",
LivyConf.RECOVERY_MODE.key -> "recovery",
LivyConf.RECOVERY_STATE_STORE.key -> "filesystem",
LivyConf.RECOVERY_STATE_STORE_URL.key -> s"file://$configPath/state-store")
LivyConf.RECOVERY_STATE_STORE_URL.key -> s"file://$configPath/state-store",
"livy.server.spark-env.default.enable-hive-context" -> "true")

if (Cluster.isRunningOnTravis) {
livyConf ++= Map("livy.server.yarn.app-lookup-timeout" -> "2m")
Expand All @@ -171,7 +172,6 @@ object MiniLivyMain extends MiniClusterBase {

val server = new LivyServer()
server.start()
server.livyConf.set(LivyConf.ENABLE_HIVE_CONTEXT, true)
// Write a serverUrl.conf file to the conf directory with the location of the Livy
// server. Do it atomically since it's used by MiniCluster to detect when the Livy server
// is up and ready.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ReplDriverSuite extends FunSuite with LivyBaseUnitTestSuite {
.setURI(new URI("rsc:/"))
.setConf(RSCConf.Entry.DRIVER_CLASS.key(), classOf[ReplDriver].getName())
.setConf(RSCConf.Entry.SESSION_KIND.key(), Spark().toString)
.setConf(RSCConf.Entry.SPARK_HOME.key(), System.getenv("SPARK_HOME"))
.build()
.asInstanceOf[RSCClient]

Expand Down
28 changes: 8 additions & 20 deletions rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@
import java.io.Reader;
import java.io.Writer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -66,8 +60,6 @@ class ContextLauncher {

private static final String SPARK_DEPLOY_MODE = "spark.submit.deployMode";
private static final String SPARK_JARS_KEY = "spark.jars";
private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives";
private static final String SPARK_HOME_ENV = "SPARK_HOME";

static DriverProcessInfo create(RSCClientFactory factory, RSCConf conf)
throws IOException {
Expand Down Expand Up @@ -124,6 +116,7 @@ public void run() {
this.timeout = factory.getServer().getEventLoopGroup().schedule(timeoutTask,
conf.getTimeAsMs(RPC_CLIENT_HANDSHAKE_TIMEOUT), TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.error("exception: ", e);
dispose(true);
throw Utils.propagate(e);
}
Expand Down Expand Up @@ -172,11 +165,10 @@ private static ChildProcess startDriver(final RSCConf conf, Promise<?> promise)
}
merge(conf, SPARK_JARS_KEY, livyJars, ",");

HashMap<String, String> childEnv = new HashMap<>();
String kind = conf.get(SESSION_KIND);
if ("sparkr".equals(kind)) {
merge(conf, SPARK_ARCHIVES_KEY, conf.get(RSCConf.Entry.SPARKR_PACKAGE), ",");
} else if ("pyspark".equals(kind)) {
merge(conf, "spark.submit.pyFiles", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES), ",");
if ("pyspark".equals(kind) && conf.get(RSCConf.Entry.PYSPARK_ARCHIVES) != null) {
childEnv.put("PYSPARK_ARCHIVES_PATH", conf.get(RSCConf.Entry.PYSPARK_ARCHIVES));
}

// Disable multiple attempts since the RPC server doesn't yet support multiple
Expand Down Expand Up @@ -220,15 +212,15 @@ public void run() {
};
return new ChildProcess(conf, promise, child, confFile);
} else {
final SparkLauncher launcher = new SparkLauncher();
final SparkLauncher launcher = new SparkLauncher(childEnv);

// Spark 1.x does not support specifying deploy mode in conf and needs special handling.
String deployMode = conf.get(SPARK_DEPLOY_MODE);
if (deployMode != null) {
launcher.setDeployMode(deployMode);
}

launcher.setSparkHome(System.getenv(SPARK_HOME_ENV));
launcher.setSparkHome(conf.get(SPARK_HOME));
launcher.setAppResource("spark-internal");
launcher.setPropertiesFile(confFile.getAbsolutePath());
launcher.setMainClass(RSCDriverBootstrapper.class.getName());
Expand Down Expand Up @@ -266,11 +258,7 @@ private static File writeConfToFile(RSCConf conf) throws IOException {
}

// Load the default Spark configuration.
String confDir = System.getenv("SPARK_CONF_DIR");
if (confDir == null && System.getenv(SPARK_HOME_ENV) != null) {
confDir = System.getenv(SPARK_HOME_ENV) + File.separator + "conf";
}

String confDir = conf.get(SPARK_CONF_DIR);
if (confDir != null) {
File sparkDefaults = new File(confDir + File.separator + "spark-defaults.conf");
if (sparkDefaults.isFile()) {
Expand Down
4 changes: 3 additions & 1 deletion rsc/src/main/java/com/cloudera/livy/rsc/RSCConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ public static enum Entry implements ConfEntry {
SESSION_KIND("session.kind", null),

LIVY_JARS("jars", null),
SPARKR_PACKAGE("sparkr.package", null),

PYSPARK_ARCHIVES("pyspark.archives", null),
SPARK_HOME("spark_home", null),
SPARK_CONF_DIR("spark_conf_dir", null),

// Address for the RSC driver to connect back with it's connection info.
LAUNCHER_ADDRESS("launcher.address", null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ private Properties createConf(boolean local) {
}

conf.put(LIVY_JARS.key(), "");
conf.put(SPARK_HOME.key(), System.getenv("SPARK_HOME"));
return conf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ object ScalaClientTest {
conf.put(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH, classpath)
}
conf.put(LIVY_JARS.key, "")
conf.put(SPARK_HOME.key(), System.getenv("SPARK_HOME"))
conf
}

Expand Down
48 changes: 6 additions & 42 deletions server/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration

import com.cloudera.livy.client.common.ClientConf
import com.cloudera.livy.client.common.ClientConf.ConfEntry
import com.cloudera.livy.client.common.ClientConf.DeprecatedConf
import com.cloudera.livy.client.common.ClientConf._
import com.cloudera.livy.utils.SparkEnvironment

object LivyConf {

Expand All @@ -42,21 +42,12 @@ object LivyConf {

val TEST_MODE = ClientConf.TEST_MODE

val SPARK_HOME = Entry("livy.server.spark-home", null)
val LIVY_SPARK_MASTER = Entry("livy.spark.master", "local")
val LIVY_SPARK_DEPLOY_MODE = Entry("livy.spark.deploy-mode", null)

// Two configurations to specify Spark and related Scala version. These are internal
// configurations will be set by LivyServer and used in session creation. It is not required to
// set usually unless running with unofficial Spark + Scala versions
// (like Spark 2.0 + Scala 2.10, Spark 1.6 + Scala 2.11)
val LIVY_SPARK_SCALA_VERSION = Entry("livy.spark.scala-version", null)
val LIVY_SPARK_VERSION = Entry("livy.spark.version", null)

val SESSION_STAGING_DIR = Entry("livy.session.staging-dir", null)
val FILE_UPLOAD_MAX_SIZE = Entry("livy.file.upload.max.size", 100L * 1024 * 1024)
val LOCAL_FS_WHITELIST = Entry("livy.file.local-dir-whitelist", null)
val ENABLE_HIVE_CONTEXT = Entry("livy.repl.enable-hive-context", false)

val ENVIRONMENT = Entry("livy.environment", "production")

Expand Down Expand Up @@ -142,13 +133,6 @@ object LivyConf {
// How long a finished session state will be kept in memory
val SESSION_STATE_RETAIN_TIME = Entry("livy.server.session.state-retain.sec", "600s")

val SPARK_MASTER = "spark.master"
val SPARK_DEPLOY_MODE = "spark.submit.deployMode"
val SPARK_JARS = "spark.jars"
val SPARK_FILES = "spark.files"
val SPARK_ARCHIVES = "spark.yarn.dist.archives"
val SPARK_PY_FILES = "spark.submit.pyFiles"

/**
* These are Spark configurations that contain lists of files that the user can add to
* their jobs in one way or another. Livy needs to pre-process these to make sure the
Expand All @@ -160,18 +144,6 @@ object LivyConf {
*/
val SPARK_FILE_LISTS = Entry("livy.spark.file-list-configs", null)

private val HARDCODED_SPARK_FILE_LISTS = Seq(
SPARK_JARS,
SPARK_FILES,
SPARK_ARCHIVES,
SPARK_PY_FILES,
"spark.yarn.archive",
"spark.yarn.dist.files",
"spark.yarn.dist.jars",
"spark.yarn.jar",
"spark.yarn.jars"
)

case class DepConf(
override val key: String,
override val version: String,
Expand All @@ -180,8 +152,8 @@ object LivyConf {

private val configsWithAlternatives: Map[String, DeprecatedConf] = Map[String, DepConf](
LIVY_SPARK_DEPLOY_MODE.key -> DepConf("livy.spark.deployMode", "0.4"),
LIVY_SPARK_SCALA_VERSION.key -> DepConf("livy.spark.scalaVersion", "0.4"),
ENABLE_HIVE_CONTEXT.key -> DepConf("livy.repl.enableHiveContext", "0.4"),
"livy.spark.scala-version" -> DepConf("livy.spark.scalaVersion", "0.4"),
"livy-repl.enable-hive-context" -> DepConf("livy.repl.enableHiveContext", "0.4"),
CSRF_PROTECTION.key -> DepConf("livy.server.csrf_protection.enabled", "0.4"),
ACCESS_CONTROL_ENABLED.key -> DepConf("livy.server.access_control.enabled", "0.4"),
ACCESS_CONTROL_USERS.key -> DepConf("livy.server.access_control.users", "0.4"),
Expand All @@ -202,7 +174,6 @@ object LivyConf {

Map(configs.map { cfg => (cfg.key -> cfg) }: _*)
}

}

/**
Expand All @@ -222,7 +193,8 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
path.stripSuffix("/") + "/"
}

lazy val sparkFileLists = HARDCODED_SPARK_FILE_LISTS ++ configToSeq(SPARK_FILE_LISTS)
lazy val sparkFileLists = SparkEnvironment.HARDCODED_SPARK_FILE_LISTS ++
configToSeq(SPARK_FILE_LISTS)

/**
* Create a LivyConf that loads defaults from the system properties and the classpath.
Expand All @@ -247,17 +219,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
/** Return the spark deploy mode Livy sessions should use. */
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)

/** Return the location of the spark home directory */
def sparkHome(): Option[String] = Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME"))

/** Return the spark master Livy sessions should use. */
def sparkMaster(): String = get(LIVY_SPARK_MASTER)

/** Return the path to the spark-submit executable. */
def sparkSubmit(): String = {
sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get
}

/** Return the list of superusers. */
def superusers(): Seq[String] = _superusers

Expand Down
24 changes: 0 additions & 24 deletions server/src/main/scala/com/cloudera/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,6 @@ class LivyServer extends Logging {
maxFileSize = Some(livyConf.getLong(LivyConf.FILE_UPLOAD_MAX_SIZE))
).toMultipartConfigElement

// Make sure the `spark-submit` program exists, otherwise much of livy won't work.
testSparkHome(livyConf)

// Test spark-submit and get Spark Scala version accordingly.
val (sparkVersion, scalaVersionFromSparkSubmit) = sparkSubmitVersion(livyConf)
testSparkVersion(sparkVersion)

// If Spark and Scala version is set manually, should verify if they're consistent with
// ones parsed from "spark-submit --version"
val formattedSparkVersion = formatSparkVersion(sparkVersion)
Option(livyConf.get(LIVY_SPARK_VERSION)).map(formatSparkVersion).foreach { version =>
require(formattedSparkVersion == version,
s"Configured Spark version $version is not equal to Spark version $formattedSparkVersion " +
"got from spark-submit -version")
}

// Set formatted Spark and Scala version into livy configuration, this will be used by
// session creation.
// TODO Create a new class to pass variables from LivyServer to sessions and remove these
// internal LivyConfs.
livyConf.set(LIVY_SPARK_VERSION.key, formattedSparkVersion.productIterator.mkString("."))
livyConf.set(LIVY_SPARK_SCALA_VERSION.key,
sparkScalaVersion(formattedSparkVersion, scalaVersionFromSparkSubmit, livyConf))

if (UserGroupInformation.isSecurityEnabled) {
// If Hadoop security is enabled, run kinit periodically. runKinit() should be called
// before any Hadoop operation, otherwise Kerberos exception will be thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.cloudera.livy.{LivyConf, Logging}
import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.sessions.{Session, SessionState}
import com.cloudera.livy.sessions.Session._
import com.cloudera.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder}
import com.cloudera.livy.utils._

@JsonIgnoreProperties(ignoreUnknown = true)
case class BatchRecoveryMetadata(
Expand All @@ -53,6 +53,7 @@ object BatchSession extends Logging {
sessionStore: SessionStore,
mockApp: Option[SparkApp] = None): BatchSession = {
val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}"
val sparkEnv = SparkEnvironment.getSparkEnv(livyConf, request.sparkEnv)

def createSparkApp(s: BatchSession): SparkApp = {
val conf = SparkApp.prepareSparkConf(
Expand All @@ -63,7 +64,7 @@ object BatchSession extends Logging {
require(request.file != null, "File is required.")

val builder = new SparkProcessBuilder(livyConf)
builder.conf(conf)
builder.conf(conf).executable(sparkEnv.sparkSubmit())

proxyUser.foreach(builder.proxyUser)
request.className.foreach(builder.className)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package com.cloudera.livy.server.batch

class CreateBatchRequest {

var sparkEnv: String = "default"
var file: String = _
var proxyUser: Option[String] = None
var args: List[String] = List()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.cloudera.livy.sessions.{Kind, Spark}

class CreateInteractiveRequest {
var kind: Kind = Spark()
var sparkEnv: String = "default"
var proxyUser: Option[String] = None
var jars: List[String] = List()
var pyFiles: List[String] = List()
Expand Down
Loading