Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKIPME merged Apache branch-1.5 #134

Merged
merged 8 commits into from
Dec 18, 2015
6 changes: 6 additions & 0 deletions R/install-dev.bat
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,9 @@ set SPARK_HOME=%~dp0..
MKDIR %SPARK_HOME%\R\lib

R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\

rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
pushd %SPARK_HOME%\R\lib
%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
popd

4 changes: 4 additions & 0 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
# Install SparkR to $LIB_DIR
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/

# Zip the SparkR package so that it can be distributed to worker nodes on YARN
cd $LIB_DIR
jar cfM "$LIB_DIR/sparkr.zip" SparkR

popd > /dev/null
14 changes: 13 additions & 1 deletion R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ sparkR.stop <- function() {
}
}

# Remove the R package lib path from .libPaths()
if (exists(".libPath", envir = env)) {
libPath <- get(".libPath", envir = env)
.libPaths(.libPaths()[.libPaths() != libPath])
}

if (exists(".backendLaunched", envir = env)) {
callJStatic("SparkRHandler", "stopBackend")
}
Expand Down Expand Up @@ -149,14 +155,20 @@ sparkR.init <- function(
f <- file(path, open="rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
rLibPath <- readString(f)
close(f)
file.remove(path)
if (length(backendPort) == 0 || backendPort == 0 ||
length(monitorPort) == 0 || monitorPort == 0) {
length(monitorPort) == 0 || monitorPort == 0 ||
length(rLibPath) != 1) {
stop("JVM failed to launch")
}
assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
assign(".backendLaunched", 1, envir = .sparkREnv)
if (rLibPath != "") {
assign(".libPath", rLibPath, envir = .sparkREnv)
.libPaths(c(rLibPath, .libPaths()))
}
}

.sparkREnv$backendPort <- backendPort
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/inst/profile/general.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

.First <- function() {
packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
.libPaths(c(packageDir, .libPaths()))
dirs <- strsplit(packageDir, ",")[[1]]
.libPaths(c(dirs, .libPaths()))
Sys.setenv(NOAWT=1)
}
5 changes: 3 additions & 2 deletions R/pkg/inst/worker/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
# Worker daemon

rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
script <- paste(rLibDir, "SparkR/worker/worker.R", sep = "/")
dirs <- strsplit(rLibDir, ",")[[1]]
script <- file.path(dirs[[1]], "SparkR", "worker", "worker.R")

# preload SparkR package, speedup worker
.libPaths(c(rLibDir, .libPaths()))
.libPaths(c(dirs, .libPaths()))
suppressPackageStartupMessages(library(SparkR))

port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/inst/worker/worker.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ bootTime <- currentTimeSecs()
bootElap <- elapsedSecs()

rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
dirs <- strsplit(rLibDir, ",")[[1]]
# Set libPaths to include SparkR package as loadNamespace needs this
# TODO: Figure out if we can avoid this by not loading any objects that require
# SparkR namespace
.libPaths(c(rLibDir, .libPaths()))
.libPaths(c(dirs, .libPaths()))
suppressPackageStartupMessages(library(SparkR))

port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/api/r/RBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ private[spark] object RBackend extends Logging {
val dos = new DataOutputStream(new FileOutputStream(f))
dos.writeInt(boundPort)
dos.writeInt(listenPort)
SerDe.writeString(dos, RUtils.rPackages.getOrElse(""))
dos.close()
f.renameTo(new File(path))

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,14 @@ private[r] object RRDD {
val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript")
val rOptions = "--vanilla"
val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
val rExecScript = rLibDir + "/SparkR/worker/" + script
val rExecScript = rLibDir(0) + "/SparkR/worker/" + script
val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript))
// Unset the R_TESTS environment variable for workers.
// This is set by R CMD check as startup.Rs
// (http://svn.r-project.org/R/trunk/src/library/tools/R/testing.R)
// and confuses worker script which tries to load a non-existent file
pb.environment().put("R_TESTS", "")
pb.environment().put("SPARKR_RLIBDIR", rLibDir)
pb.environment().put("SPARKR_RLIBDIR", rLibDir.mkString(","))
pb.environment().put("SPARKR_WORKER_PORT", port.toString)
pb.redirectErrorStream(true) // redirect stderr into stdout
val proc = pb.start()
Expand Down
37 changes: 30 additions & 7 deletions core/src/main/scala/org/apache/spark/api/r/RUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import scala.collection.JavaConversions._
import org.apache.spark.{SparkEnv, SparkException}

private[spark] object RUtils {
// Local path where R binary packages built from R source code contained in the spark
// packages specified with "--packages" or "--jars" command line option reside.
var rPackages: Option[String] = None

/**
* Get the SparkR package path in the local spark distribution.
*/
Expand All @@ -35,11 +39,15 @@ private[spark] object RUtils {
}

/**
* Get the SparkR package path in various deployment modes.
* Get the list of paths for R packages in various deployment modes, of which the first
* path is for the SparkR package itself. The second path is for R packages built as
* part of Spark Packages, if any exist. Spark Packages can be provided through the
* "--packages" or "--jars" command line options.
*
* This assumes that Spark properties `spark.master` and `spark.submit.deployMode`
* and environment variable `SPARK_HOME` are set.
*/
def sparkRPackagePath(isDriver: Boolean): String = {
def sparkRPackagePath(isDriver: Boolean): Seq[String] = {
val (master, deployMode) =
if (isDriver) {
(sys.props("spark.master"), sys.props("spark.submit.deployMode"))
Expand All @@ -52,15 +60,30 @@ private[spark] object RUtils {
val isYarnClient = master != null && master.contains("yarn") && deployMode == "client"

// In YARN mode, the SparkR package is distributed as an archive symbolically
// linked to the "sparkr" file in the current directory. Note that this does not apply
// to the driver in client mode because it is run outside of the cluster.
// linked to the "sparkr" file in the current directory and additional R packages
// are distributed as an archive symbolically linked to the "rpkg" file in the
// current directory.
//
// Note that this does not apply to the driver in client mode because it is run
// outside of the cluster.
if (isYarnCluster || (isYarnClient && !isDriver)) {
new File("sparkr").getAbsolutePath
val sparkRPkgPath = new File("sparkr").getAbsolutePath
val rPkgPath = new File("rpkg")
if (rPkgPath.exists()) {
Seq(sparkRPkgPath, rPkgPath.getAbsolutePath)
} else {
Seq(sparkRPkgPath)
}
} else {
// Otherwise, assume the package is local
// TODO: support this for Mesos
localSparkRPackagePath.getOrElse {
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
val sparkRPkgPath = localSparkRPackagePath.getOrElse {
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
}
if (!rPackages.isEmpty) {
Seq(sparkRPkgPath, rPackages.get)
} else {
Seq(sparkRPkgPath)
}
}
}
Expand Down
26 changes: 19 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,29 @@ private[deploy] object RPackageUtils extends Logging {
* Runs the standard R package installation code to build the R package from source.
* Multiple runs don't cause problems.
*/
private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = {
private def rPackageBuilder(
dir: File,
printStream: PrintStream,
verbose: Boolean,
libDir: String): Boolean = {
// this code should be always running on the driver.
val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse(
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package."))
val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator)
val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg)
val installCmd = baseInstallCmd ++ Seq(libDir, pathToPkg)
if (verbose) {
print(s"Building R package with the command: $installCmd", printStream)
}
try {
val builder = new ProcessBuilder(installCmd)
builder.redirectErrorStream(true)

// Put the SparkR package directory into R library search paths in case this R package
// may depend on SparkR.
val env = builder.environment()
env.clear()
val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
env.put("SPARKR_PACKAGE_DIR", rPackageDir.mkString(","))
env.put("R_PROFILE_USER",
Seq(rPackageDir(0), "SparkR", "profile", "general.R").mkString(File.separator))

val process = builder.start()
new RedirectThread(process.getInputStream, printStream, "redirect R packaging").start()
process.waitFor() == 0
Expand Down Expand Up @@ -170,8 +179,11 @@ private[deploy] object RPackageUtils extends Logging {
if (checkManifestForR(jar)) {
print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
val rSource = extractRFolder(jar, printStream, verbose)
if (RUtils.rPackages.isEmpty) {
RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath)
}
try {
if (!rPackageBuilder(rSource, printStream, verbose)) {
if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) {
print(s"ERROR: Failed to build R package in $file.", printStream)
print(RJarDoc, printStream)
}
Expand Down Expand Up @@ -206,7 +218,7 @@ private[deploy] object RPackageUtils extends Logging {
}
}

/** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */
/** Zips all the R libraries built for distribution to the cluster. */
private[deploy] def zipRLibraries(dir: File, name: String): File = {
val filesToBundle = listFilesRecursively(dir, Seq(".zip"))
// create a zip file from scratch, do not append to existing file.
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/RRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ object RRunner {
val env = builder.environment()
env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
env.put("SPARKR_PACKAGE_DIR", rPackageDir)
// Put the R package directories into an env variable of comma-separated paths
env.put("SPARKR_PACKAGE_DIR", rPackageDir.mkString(","))
env.put("R_PROFILE_USER",
Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator))
Seq(rPackageDir(0), "SparkR", "profile", "general.R").mkString(File.separator))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()

Expand Down
43 changes: 34 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ object SparkSubmit {
private val PYSPARK_SHELL = "pyspark-shell"
private val SPARKR_SHELL = "sparkr-shell"
private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
private val R_PACKAGE_ARCHIVE = "rpkg.zip"

private val CLASS_NOT_FOUND_EXIT_STATUS = 101

Expand Down Expand Up @@ -362,22 +363,46 @@ object SparkSubmit {
}
}

// In YARN mode for an R app, add the SparkR package archive to archives
// that can be distributed with the job
// In YARN mode for an R app, add the SparkR package archive and the R package
// archive containing all of the built R libraries to archives so that they can
// be distributed with the job
if (args.isR && clusterManager == YARN) {
val rPackagePath = RUtils.localSparkRPackagePath
if (rPackagePath.isEmpty) {
val sparkRPackagePath = RUtils.localSparkRPackagePath
if (sparkRPackagePath.isEmpty) {
printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.")
}
val rPackageFile =
RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
if (!sparkRPackageFile.exists()) {
printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath)
val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString

// Distribute the SparkR package.
// Assigns a symbol link name "sparkr" to the shipped package.
args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr")
args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")

// Distribute the R package archive containing all the built R packages.
if (!RUtils.rPackages.isEmpty) {
val rPackageFile =
RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
printErrorAndExit("Failed to zip all the built R packages.")
}

val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
// Assigns a symbol link name "rpkg" to the shipped package.
args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
}
}

// TODO: Support distributing R packages with standalone cluster
if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
printErrorAndExit("Distributing R packages with standalone cluster is not supported.")
}

// TODO: Support SparkR with mesos cluster
if (args.isR && clusterManager == MESOS) {
printErrorAndExit("SparkR is not supported for Mesos cluster.")
}

// If we're running a R app, set the main class to our specific R runner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.util.{ResetSystemProperties, Utils}
Expand Down Expand Up @@ -366,10 +367,9 @@ class SparkSubmitSuite
}
}

test("correctly builds R packages included in a jar with --packages") {
// TODO(SPARK-9603): Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins.
// It's hard to write the test in SparkR (because we can't create the repository dynamically)
/*
// TODO(SPARK-9603): Building a package is flaky on Jenkins Maven builds.
// See https://gist.github.com/shivaram/3a2fecce60768a603dac for a error log
ignore("correctly builds R packages included in a jar with --packages") {
assume(RUtils.isRInstalled, "R isn't installed on this machine.")
val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
Expand All @@ -387,7 +387,6 @@ class SparkSubmitSuite
rScriptDir)
runSparkSubmit(args)
}
*/
}

test("resolves command line argument paths correctly") {
Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming application.
[Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`.

## Approach 2: Direct Approach (No Receivers)
This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it is not yet at full feature parity.
This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API.

This approach has the following advantages over the receiver-based approach (i.e. Approach 1).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object MovieLensALS {
def parseMovie(str: String): Movie = {
val fields = str.split("::")
assert(fields.size == 3)
Movie(fields(0).toInt, fields(1), fields(2).split("|"))
Movie(fields(0).toInt, fields(1), fields(2).split("\\|"))
}
}

Expand Down
1 change: 1 addition & 0 deletions make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ cp -r "$SPARK_HOME/ec2" "$DISTDIR"
if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then
mkdir -p "$DISTDIR"/R/lib
cp -r "$SPARK_HOME/R/lib/SparkR" "$DISTDIR"/R/lib
cp "$SPARK_HOME/R/lib/sparkr.zip" "$DISTDIR"/R/lib
fi

# Download and copy in tachyon, if requested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ private[python] class PythonMLLibAPI extends Serializable {
* Wrapper around RowMatrix constructor.
*/
def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = {
new RowMatrix(rows.rdd, numRows, numCols)
new RowMatrix(rows.rdd.retag(classOf[Vector]), numRows, numCols)
}

/**
Expand Down
Loading