Skip to content

Commit

Permalink
Merge pull request apache#90 from oscaroboto/master
Browse files Browse the repository at this point in the history
Added ExecutorEnv and ability to pass additional JARs to sparkContext (to be used with YARN)
  • Loading branch information
shivaram committed Oct 31, 2014
2 parents eb97d85 + 187526a commit 51924f7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 9 deletions.
36 changes: 29 additions & 7 deletions pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ sparkR.onLoad <- function(libname, pkgname) {
packageStartupMessage("[SparkR] Initializing with classpath ", assemblyJarPath, "\n")

sparkMem <- Sys.getenv("SPARK_MEM", "512m")
yarn_conf_dir <- Sys.getenv("YARN_CONF_DIR", "")

.sparkREnv$libname <- libname
.sparkREnv$assemblyJarPath <- assemblyJarPath
.jinit(classpath=assemblyJarPath, parameters=paste("-Xmx", sparkMem, sep=""))
.jaddClassPath(yarn_conf_dir)
}

#' Initialize a new Spark Context.
Expand All @@ -20,19 +23,27 @@ sparkR.onLoad <- function(libname, pkgname) {
#' @param appName Application name to register with cluster manager
#' @param sparkHome Spark Home directory
#' @param sparkEnvir Named list of environment variables to set on worker nodes.
#' @param sparkExecutorEnv Named list of environment variables to be used when launching executors.
#' @param sparkJars Character string vector of jar files to pass to the worker nodes.
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
#' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
#' list(spark.executor.memory="1g"))
#' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
#' list(spark.executor.memory="1g"),
#' list(LD_LIBRARY_PATH="/directory of Java VM Library Files (libjvm.so) on worker nodes/"),
#' c("jarfile1.jar","jarfile2.jar"))
#'}

sparkR.init <- function(
master = "local",
appName = "SparkR",
sparkHome = Sys.getenv("SPARK_HOME"),
sparkEnvir = list() ) {
sparkEnvir = list(),
sparkExecutorEnv = list(),
sparkJars = "") {

if (exists(".sparkRjsc", envir=.sparkREnv)) {
return(get(".sparkRjsc", envir=.sparkREnv))
Expand All @@ -42,21 +53,32 @@ sparkR.init <- function(
sparkHome <- normalizePath(sparkHome)
}

hm <- .jnew("java/util/HashMap")
sparkEnvirMap <- .jnew("java/util/HashMap")
for (varname in names(sparkEnvir)) {
hm$put(varname, sparkEnvir[[varname]])
sparkEnvirMap$put(varname, sparkEnvir[[varname]])
}


sparkExecutorEnvMap <- .jnew("java/util/HashMap")
if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) {
sparkExecutorEnvMap$put("LD_LIBRARY_PATH", paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH")))
}
for (varname in names(sparkExecutorEnv)) {
sparkExecutorEnvMap$put(varname, sparkExecutorEnv[[varname]])
}

.jaddClassPath(sparkJars)
jars=c(as.character(.sparkREnv$assemblyJarPath), as.character(sparkJars))

assign(
".sparkRjsc",
J("edu.berkeley.cs.amplab.sparkr.RRDD",
"createSparkContext",
master,
appName,
as.character(sparkHome),
.jarray(as.character(.sparkREnv$assemblyJarPath),
"java/lang/String"),
hm),
.jarray(jars, "java/lang/String"),
sparkEnvirMap,
sparkExecutorEnvMap),
envir=.sparkREnv
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,19 @@ object RRDD {
appName: String,
sparkHome: String,
jars: Array[String],
vars: JMap[Object, Object]): JavaSparkContext = {
sparkEnvirMap: JMap[Object, Object],
sparkExecutorEnvMap: JMap[Object, Object]): JavaSparkContext = {

val sparkConf = new SparkConf().setMaster(master)
.setAppName(appName)
.setSparkHome(sparkHome)
.setJars(jars)
for ( (name, value) <- vars) {
for ((name, value) <- sparkEnvirMap) {
sparkConf.set(name.asInstanceOf[String], value.asInstanceOf[String])
}
for ((name, value) <- sparkExecutorEnvMap) {
sparkConf.setExecutorEnv(name.asInstanceOf[String], value.asInstanceOf[String])
}
new JavaSparkContext(sparkConf)
}

Expand Down

0 comments on commit 51924f7

Please sign in to comment.