diff --git a/R/install-dev.bat b/R/install-dev.bat index 008a5c668bc45..ed1c91ae3a0ff 100644 --- a/R/install-dev.bat +++ b/R/install-dev.bat @@ -25,3 +25,9 @@ set SPARK_HOME=%~dp0.. MKDIR %SPARK_HOME%\R\lib R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\ + +rem Zip the SparkR package so that it can be distributed to worker nodes on YARN +pushd %SPARK_HOME%\R\lib +%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR +popd + diff --git a/R/install-dev.sh b/R/install-dev.sh index 59d98c9c7a646..4972bb9217072 100755 --- a/R/install-dev.sh +++ b/R/install-dev.sh @@ -42,4 +42,8 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo # Install SparkR to $LIB_DIR R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/ +# Zip the SparkR package so that it can be distributed to worker nodes on YARN +cd $LIB_DIR +jar cfM "$LIB_DIR/sparkr.zip" SparkR + popd > /dev/null diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 26f941e912cc2..154fbb42b9bdf 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -49,6 +49,12 @@ sparkR.stop <- function() { } } + # Remove the R package lib path from .libPaths() + if (exists(".libPath", envir = env)) { + libPath <- get(".libPath", envir = env) + .libPaths(.libPaths()[.libPaths() != libPath]) + } + if (exists(".backendLaunched", envir = env)) { callJStatic("SparkRHandler", "stopBackend") } @@ -149,14 +155,20 @@ sparkR.init <- function( f <- file(path, open="rb") backendPort <- readInt(f) monitorPort <- readInt(f) + rLibPath <- readString(f) close(f) file.remove(path) if (length(backendPort) == 0 || backendPort == 0 || - length(monitorPort) == 0 || monitorPort == 0) { + length(monitorPort) == 0 || monitorPort == 0 || + length(rLibPath) != 1) { stop("JVM failed to launch") } assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv) assign(".backendLaunched", 1, envir = .sparkREnv) + if (rLibPath != "") { + assign(".libPath", rLibPath, envir = .sparkREnv) + .libPaths(c(rLibPath, .libPaths())) + } } .sparkREnv$backendPort <- backendPort diff --git a/R/pkg/inst/profile/general.R b/R/pkg/inst/profile/general.R index 2a8a8213d0849..c55fe9ba7af7a 100644 --- a/R/pkg/inst/profile/general.R +++ b/R/pkg/inst/profile/general.R @@ -17,6 +17,7 @@ .First <- function() { packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR") - .libPaths(c(packageDir, .libPaths())) + dirs <- strsplit(packageDir, ",")[[1]] + .libPaths(c(dirs, .libPaths())) Sys.setenv(NOAWT=1) } diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R index 3584b418a71a9..f55beac6c8c07 100644 --- a/R/pkg/inst/worker/daemon.R +++ b/R/pkg/inst/worker/daemon.R @@ -18,10 +18,11 @@ # Worker daemon rLibDir <- Sys.getenv("SPARKR_RLIBDIR") -script <- paste(rLibDir, "SparkR/worker/worker.R", sep = "/") +dirs <- strsplit(rLibDir, ",")[[1]] +script <- file.path(dirs[[1]], "SparkR", "worker", "worker.R") # preload SparkR package, speedup worker -.libPaths(c(rLibDir, .libPaths())) +.libPaths(c(dirs, .libPaths())) suppressPackageStartupMessages(library(SparkR)) port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index 7e3b5fc403b25..aac8adfe69904 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -35,10 +35,11 @@ bootTime <- currentTimeSecs() bootElap <- elapsedSecs() rLibDir <- Sys.getenv("SPARKR_RLIBDIR") +dirs <- strsplit(rLibDir, ",")[[1]] # Set libPaths to include SparkR package as loadNamespace needs this # TODO: Figure out if we can avoid this by not loading any objects that require # SparkR namespace -.libPaths(c(rLibDir, .libPaths())) +.libPaths(c(dirs, .libPaths())) suppressPackageStartupMessages(library(SparkR)) port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala index b7e72d4d0ed0b..8b3be0da2c8c4 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackend.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackend.scala @@ -113,6 +113,7 @@ private[spark] object RBackend extends Logging { val dos = new DataOutputStream(new FileOutputStream(f)) dos.writeInt(boundPort) dos.writeInt(listenPort) + SerDe.writeString(dos, RUtils.rPackages.getOrElse("")) dos.close() f.renameTo(new File(path)) diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala index 1cf2824f862ee..ca47aa68bc075 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala @@ -394,14 +394,14 @@ private[r] object RRDD { val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript") val rOptions = "--vanilla" val rLibDir = RUtils.sparkRPackagePath(isDriver = false) - val rExecScript = rLibDir + "/SparkR/worker/" + script + val rExecScript = rLibDir(0) + "/SparkR/worker/" + script val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript)) // Unset the R_TESTS environment variable for workers. // This is set by R CMD check as startup.Rs // (http://svn.r-project.org/R/trunk/src/library/tools/R/testing.R) // and confuses worker script which tries to load a non-existent file pb.environment().put("R_TESTS", "") - pb.environment().put("SPARKR_RLIBDIR", rLibDir) + pb.environment().put("SPARKR_RLIBDIR", rLibDir.mkString(",")) pb.environment().put("SPARKR_WORKER_PORT", port.toString) pb.redirectErrorStream(true) // redirect stderr into stdout val proc = pb.start() diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index 646fd0b603b02..c3009ae04a59e 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -24,6 +24,10 @@ import scala.collection.JavaConversions._ import org.apache.spark.{SparkEnv, SparkException} private[spark] object RUtils { + // Local path where R binary packages built from R source code contained in the spark + // packages specified with "--packages" or "--jars" command line option reside. + var rPackages: Option[String] = None + /** * Get the SparkR package path in the local spark distribution. */ @@ -35,11 +39,15 @@ private[spark] object RUtils { } /** - * Get the SparkR package path in various deployment modes. + * Get the list of paths for R packages in various deployment modes, of which the first + * path is for the SparkR package itself. The second path is for R packages built as + * part of Spark Packages, if any exist. Spark Packages can be provided through the + * "--packages" or "--jars" command line options. + * * This assumes that Spark properties `spark.master` and `spark.submit.deployMode` * and environment variable `SPARK_HOME` are set. */ - def sparkRPackagePath(isDriver: Boolean): String = { + def sparkRPackagePath(isDriver: Boolean): Seq[String] = { val (master, deployMode) = if (isDriver) { (sys.props("spark.master"), sys.props("spark.submit.deployMode")) @@ -52,15 +60,30 @@ private[spark] object RUtils { val isYarnClient = master != null && master.contains("yarn") && deployMode == "client" // In YARN mode, the SparkR package is distributed as an archive symbolically - // linked to the "sparkr" file in the current directory. Note that this does not apply - // to the driver in client mode because it is run outside of the cluster. + // linked to the "sparkr" file in the current directory and additional R packages + // are distributed as an archive symbolically linked to the "rpkg" file in the + // current directory. + // + // Note that this does not apply to the driver in client mode because it is run + // outside of the cluster. if (isYarnCluster || (isYarnClient && !isDriver)) { - new File("sparkr").getAbsolutePath + val sparkRPkgPath = new File("sparkr").getAbsolutePath + val rPkgPath = new File("rpkg") + if (rPkgPath.exists()) { + Seq(sparkRPkgPath, rPkgPath.getAbsolutePath) + } else { + Seq(sparkRPkgPath) + } } else { // Otherwise, assume the package is local // TODO: support this for Mesos - localSparkRPackagePath.getOrElse { - throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.") + val sparkRPkgPath = localSparkRPackagePath.getOrElse { + throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.") + } + if (!rPackages.isEmpty) { + Seq(sparkRPkgPath, rPackages.get) + } else { + Seq(sparkRPkgPath) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index ed1e972955679..540580b0e7c15 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -100,20 +100,29 @@ private[deploy] object RPackageUtils extends Logging { * Runs the standard R package installation code to build the R package from source. * Multiple runs don't cause problems. */ - private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = { + private def rPackageBuilder( + dir: File, + printStream: PrintStream, + verbose: Boolean, + libDir: String): Boolean = { // this code should be always running on the driver. - val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse( - throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")) val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator) - val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg) + val installCmd = baseInstallCmd ++ Seq(libDir, pathToPkg) if (verbose) { print(s"Building R package with the command: $installCmd", printStream) } try { val builder = new ProcessBuilder(installCmd) builder.redirectErrorStream(true) + + // Put the SparkR package directory into R library search paths in case this R package + // may depend on SparkR. val env = builder.environment() - env.clear() + val rPackageDir = RUtils.sparkRPackagePath(isDriver = true) + env.put("SPARKR_PACKAGE_DIR", rPackageDir.mkString(",")) + env.put("R_PROFILE_USER", + Seq(rPackageDir(0), "SparkR", "profile", "general.R").mkString(File.separator)) + val process = builder.start() new RedirectThread(process.getInputStream, printStream, "redirect R packaging").start() process.waitFor() == 0 @@ -170,8 +179,11 @@ private[deploy] object RPackageUtils extends Logging { if (checkManifestForR(jar)) { print(s"$file contains R source code. Now installing package.", printStream, Level.INFO) val rSource = extractRFolder(jar, printStream, verbose) + if (RUtils.rPackages.isEmpty) { + RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath) + } try { - if (!rPackageBuilder(rSource, printStream, verbose)) { + if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) { print(s"ERROR: Failed to build R package in $file.", printStream) print(RJarDoc, printStream) } @@ -206,7 +218,7 @@ private[deploy] object RPackageUtils extends Logging { } } - /** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */ + /** Zips all the R libraries built for distribution to the cluster. */ private[deploy] def zipRLibraries(dir: File, name: String): File = { val filesToBundle = listFilesRecursively(dir, Seq(".zip")) // create a zip file from scratch, do not append to existing file. diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala index 105e762769d67..2b91c9edd8149 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala @@ -81,9 +81,10 @@ object RRunner { val env = builder.environment() env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString) val rPackageDir = RUtils.sparkRPackagePath(isDriver = true) - env.put("SPARKR_PACKAGE_DIR", rPackageDir) + // Put the R package directories into an env variable of comma-separated paths + env.put("SPARKR_PACKAGE_DIR", rPackageDir.mkString(",")) env.put("R_PROFILE_USER", - Seq(rPackageDir, "SparkR", "profile", "general.R").mkString(File.separator)) + Seq(rPackageDir(0), "SparkR", "profile", "general.R").mkString(File.separator)) builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize val process = builder.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index dc555cb7e4334..bd0e3a7833a67 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -83,6 +83,7 @@ object SparkSubmit { private val PYSPARK_SHELL = "pyspark-shell" private val SPARKR_SHELL = "sparkr-shell" private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip" + private val R_PACKAGE_ARCHIVE = "rpkg.zip" private val CLASS_NOT_FOUND_EXIT_STATUS = 101 @@ -362,22 +363,46 @@ object SparkSubmit { } } - // In YARN mode for an R app, add the SparkR package archive to archives - // that can be distributed with the job + // In YARN mode for an R app, add the SparkR package archive and the R package + // archive containing all of the built R libraries to archives so that they can + // be distributed with the job if (args.isR && clusterManager == YARN) { - val rPackagePath = RUtils.localSparkRPackagePath - if (rPackagePath.isEmpty) { + val sparkRPackagePath = RUtils.localSparkRPackagePath + if (sparkRPackagePath.isEmpty) { printErrorAndExit("SPARK_HOME does not exist for R application in YARN mode.") } - val rPackageFile = - RPackageUtils.zipRLibraries(new File(rPackagePath.get), SPARKR_PACKAGE_ARCHIVE) - if (!rPackageFile.exists()) { + val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE) + if (!sparkRPackageFile.exists()) { printErrorAndExit(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.") } - val localURI = Utils.resolveURI(rPackageFile.getAbsolutePath) + val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString + // Distribute the SparkR package. // Assigns a symbol link name "sparkr" to the shipped package. - args.archives = mergeFileLists(args.archives, localURI.toString + "#sparkr") + args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr") + + // Distribute the R package archive containing all the built R packages. + if (!RUtils.rPackages.isEmpty) { + val rPackageFile = + RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE) + if (!rPackageFile.exists()) { + printErrorAndExit("Failed to zip all the built R packages.") + } + + val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString + // Assigns a symbol link name "rpkg" to the shipped package. + args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg") + } + } + + // TODO: Support distributing R packages with standalone cluster + if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) { + printErrorAndExit("Distributing R packages with standalone cluster is not supported.") + } + + // TODO: Support SparkR with mesos cluster + if (args.isR && clusterManager == MESOS) { + printErrorAndExit("SparkR is not supported for Mesos cluster.") } // If we're running a R app, set the main class to our specific R runner diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 1110ca6051a40..c36393b6f2a9a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ import org.apache.spark._ +import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.SparkSubmit._ import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -366,10 +367,9 @@ class SparkSubmitSuite } } - test("correctly builds R packages included in a jar with --packages") { - // TODO(SPARK-9603): Building a package to $SPARK_HOME/R/lib is unavailable on Jenkins. - // It's hard to write the test in SparkR (because we can't create the repository dynamically) - /* + // TODO(SPARK-9603): Building a package is flaky on Jenkins Maven builds. + // See https://gist.github.com/shivaram/3a2fecce60768a603dac for a error log + ignore("correctly builds R packages included in a jar with --packages") { assume(RUtils.isRInstalled, "R isn't installed on this machine.") val main = MavenCoordinate("my.great.lib", "mylib", "0.1") val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) @@ -387,7 +387,6 @@ class SparkSubmitSuite rScriptDir) runSparkSubmit(args) } - */ } test("resolves command line argument paths correctly") { diff --git a/docs/streaming-kafka-integration.md b/docs/streaming-kafka-integration.md index ab7f0117c0b7f..d58f4f66fd75c 100644 --- a/docs/streaming-kafka-integration.md +++ b/docs/streaming-kafka-integration.md @@ -74,7 +74,7 @@ Next, we discuss how to use this approach in your streaming application. [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kafka-assembly_2.10%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`. ## Approach 2: Direct Approach (No Receivers) -This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API. Spark 1.4 added a Python API, but it is not yet at full feature parity. +This new receiver-less "direct" approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka's simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system). Note that this is an experimental feature introduced in Spark 1.3 for the Scala and Java API, in Spark 1.4 for the Python API. This approach has the following advantages over the receiver-based approach (i.e. Approach 1). diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala index 3ae53e57dbdb8..02ed746954f23 100644 --- a/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala @@ -50,7 +50,7 @@ object MovieLensALS { def parseMovie(str: String): Movie = { val fields = str.split("::") assert(fields.size == 3) - Movie(fields(0).toInt, fields(1), fields(2).split("|")) + Movie(fields(0).toInt, fields(1), fields(2).split("\\|")) } } diff --git a/make-distribution.sh b/make-distribution.sh index 62c0ba6df7d3f..32675774bebac 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -220,6 +220,7 @@ cp -r "$SPARK_HOME/ec2" "$DISTDIR" if [ -d "$SPARK_HOME"/R/lib/SparkR ]; then mkdir -p "$DISTDIR"/R/lib cp -r "$SPARK_HOME/R/lib/SparkR" "$DISTDIR"/R/lib + cp "$SPARK_HOME/R/lib/sparkr.zip" "$DISTDIR"/R/lib fi # Download and copy in tachyon, if requested diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 06e13b717e920..2f8b5e5f687fa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -1101,7 +1101,7 @@ private[python] class PythonMLLibAPI extends Serializable { * Wrapper around RowMatrix constructor. */ def createRowMatrix(rows: JavaRDD[Vector], numRows: Long, numCols: Int): RowMatrix = { - new RowMatrix(rows.rdd, numRows, numCols) + new RowMatrix(rows.rdd.retag(classOf[Vector]), numRows, numCols) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 5768c6087db32..8957df0be6814 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions +import java.util.UUID + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.codegen._ @@ -24,16 +26,23 @@ import org.apache.spark.sql.types._ object NamedExpression { private val curId = new java.util.concurrent.atomic.AtomicLong() - def newExprId: ExprId = ExprId(curId.getAndIncrement()) + private[expressions] val jvmId = UUID.randomUUID() + def newExprId: ExprId = ExprId(curId.getAndIncrement(), jvmId) def unapply(expr: NamedExpression): Option[(String, DataType)] = Some(expr.name, expr.dataType) } /** - * A globally unique (within this JVM) id for a given named expression. + * A globally unique id for a given named expression. * Used to identify which attribute output by a relation is being * referenced in a subsequent computation. + * + * The `id` field is unique within a given JVM, while the `uuid` is used to uniquely identify JVMs. */ -case class ExprId(id: Long) +case class ExprId(id: Long, jvmId: UUID) + +object ExprId { + def apply(id: Long): ExprId = ExprId(id, NamedExpression.jvmId) +} /** * An [[Expression]] that is named. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4df53687a0731..2b514e011da4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -388,7 +388,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case e @ EvaluatePython(udf, child, _) => BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil - case BroadcastHint(child) => apply(child) + case BroadcastHint(child) => planLater(child) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 1f0405f4210b1..3f7a4096643c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -264,7 +264,17 @@ private[sql] object ParquetFilters { makeGtEq.lift(dataTypeOf(name)).map(_(name, value)) case sources.And(lhs, rhs) => - (createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and) + // At here, it is not safe to just convert one side if we do not understand the + // other side. Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // Pushing one side of AND down is only safe to do at the top level. + // You can see ParquetRelation's initializeLocalJobFunc method as an example. + for { + lhsFilter <- createFilter(schema, lhs) + rhsFilter <- createFilter(schema, rhs) + } yield FilterApi.and(lhsFilter, rhsFilter) case sources.Or(lhs, rhs) => for { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index e2716d7841d85..718e158c5d6ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -107,5 +107,12 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { // planner should not crash without a join broadcast(df1).queryExecution.executedPlan + + // SPARK-12275: no physical plan for BroadcastHint in some condition + withTempPath { path => + df1.write.parquet(path.getCanonicalPath) + val pf1 = sqlContext.read.parquet(path.getCanonicalPath) + assert(df1.join(broadcast(pf1)).count() === 4) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index f88ddc77a6a4e..05d305a948281 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -336,4 +336,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-12218: 'Not' is included in Parquet filter pushdown") { + import testImplicits._ + + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.parquet(path) + + checkAnswer( + sqlContext.read.parquet(path).where("not (a = 2) or not(b in ('1'))"), + (1 to 5).map(i => Row(i, (i % 2).toString))) + + checkAnswer( + sqlContext.read.parquet(path).where("not (a = 2 and b in ('1'))"), + (1 to 5).map(i => Row(i, (i % 2).toString))) + } + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 27193f54d3a91..ebfb1759b8b96 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -74,22 +74,20 @@ private[orc] object OrcFilters extends Logging { expression match { case And(left, right) => - val tryLeft = buildSearchArgument(left, newBuilder) - val tryRight = buildSearchArgument(right, newBuilder) - - val conjunction = for { - _ <- tryLeft - _ <- tryRight + // At here, it is not safe to just convert one side if we do not understand the + // other side. Here is an example used to explain the reason. + // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to + // convert b in ('1'). If we only convert a = 2, we will end up with a filter + // NOT(a = 2), which will generate wrong results. + // Pushing one side of AND down is only safe to do at the top level. + // You can see ParquetRelation's initializeLocalJobFunc method as an example. + for { + _ <- buildSearchArgument(left, newBuilder) + _ <- buildSearchArgument(right, newBuilder) lhs <- buildSearchArgument(left, builder.startAnd()) rhs <- buildSearchArgument(right, lhs) } yield rhs.end() - // For filter `left AND right`, we can still push down `left` even if `right` is not - // convertible, and vice versa. - conjunction - .orElse(tryLeft.flatMap(_ => buildSearchArgument(left, builder))) - .orElse(tryRight.flatMap(_ => buildSearchArgument(right, builder))) - case Or(left, right) => for { _ <- buildSearchArgument(left, newBuilder) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala index 593e68949ef45..6bcc6441a93fe 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.orc import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.{Row, SQLConf} import org.apache.spark.sql.sources.HadoopFsRelationTest import org.apache.spark.sql.types._ @@ -61,4 +62,23 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest { "dataSchema" -> dataSchemaWithPartition.json)).format(dataSourceName).load()) } } + + test("SPARK-12218: 'Not' is included in ORC filter pushdown") { + import testImplicits._ + + withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/table1" + (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path) + + checkAnswer( + sqlContext.read.orc(path).where("not (a = 2) or not(b in ('1'))"), + (1 to 5).map(i => Row(i, (i % 2).toString))) + + checkAnswer( + sqlContext.read.orc(path).where("not (a = 2 and b in ('1'))"), + (1 to 5).map(i => Row(i, (i % 2).toString))) + } + } + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index fe6328b1ce727..68f8ea834a641 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -231,7 +231,7 @@ private[streaming] object FileBasedWriteAheadLog { def getCallerName(): Option[String] = { val stackTraceClasses = Thread.currentThread.getStackTrace().map(_.getClassName) - stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split(".").lastOption) + stackTraceClasses.find(!_.contains("WriteAheadLog")).flatMap(_.split("\\.").lastOption) } /** Convert a sequence of files to a sequence of sorted LogInfo objects */