diff --git a/.gitignore b/.gitignore index ad72588b472d6..b54a3058de659 100644 --- a/.gitignore +++ b/.gitignore @@ -7,7 +7,7 @@ sbt/*.jar .settings .cache -.mima-excludes +.generated-mima-excludes /build/ work/ out/ diff --git a/.rat-excludes b/.rat-excludes index 15589702c5599..52b2dfac5cf2b 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -3,6 +3,7 @@ target .project .classpath .mima-excludes +.generated-mima-excludes .rat-excludes .*md derby.log diff --git a/core/pom.xml b/core/pom.xml index 2b9f750e07d97..e928cc556b550 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -263,35 +263,6 @@ target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes - - org.apache.maven.plugins - maven-antrun-plugin - - - test - - run - - - true - - - - - - - - - - - - - - - - - - org.scalatest scalatest-maven-plugin diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 03ceff8bf1fb0..d941aea9d7eb2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -76,8 +76,8 @@ class SparkContext(config: SparkConf) extends Logging { * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. * - * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Ca - * be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] + * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. + * Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 57b28b9972366..d1df99300c5b1 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -269,6 +269,26 @@ private object SpecialLengths { private[spark] object PythonRDD { val UTF8 = Charset.forName("UTF-8") + /** + * Adapter for calling SparkContext#runJob from Python. + * + * This method will return an iterator of an array that contains all elements in the RDD + * (effectively a collect()), but allows you to run on a certain subset of partitions, + * or to enable local execution. + */ + def runJob( + sc: SparkContext, + rdd: JavaRDD[Array[Byte]], + partitions: JArrayList[Int], + allowLocal: Boolean): Iterator[Array[Byte]] = { + type ByteArray = Array[Byte] + type UnrolledPartition = Array[ByteArray] + val allPartitions: Array[UnrolledPartition] = + sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal) + val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*) + flattenedPartition.iterator + } + def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int): JavaRDD[Array[Byte]] = { val file = new DataInputStream(new FileInputStream(filename)) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index bf449afae695f..153eee3bc5889 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -381,16 +381,19 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { object SparkSubmitArguments { /** Load properties present in the given file. */ def getPropertiesFromFile(file: File): Seq[(String, String)] = { - require(file.exists(), s"Properties file ${file.getName} does not exist") + require(file.exists(), s"Properties file $file does not exist") + require(file.isFile(), s"Properties file $file is not a normal file") val inputStream = new FileInputStream(file) - val properties = new Properties() try { + val properties = new Properties() properties.load(inputStream) + properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim)) } catch { case e: IOException => - val message = s"Failed when loading Spark properties file ${file.getName}" + val message = s"Failed when loading Spark properties file $file" throw new SparkException(message, e) + } finally { + inputStream.close() } - properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim)) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2051403682737..d27e0e1f15c65 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -61,17 +61,23 @@ private[spark] class ExecutorRunner( // Shutdown hook that kills actors on shutdown. shutdownHook = new Thread() { override def run() { - killProcess() + killProcess(Some("Worker shutting down")) } } Runtime.getRuntime.addShutdownHook(shutdownHook) } - private def killProcess() { + /** + * kill executor process, wait for exit and notify worker to update resource status + * + * @param message the exception message which caused the executor's death + */ + private def killProcess(message: Option[String]) { if (process != null) { logInfo("Killing process!") process.destroy() - process.waitFor() + val exitCode = process.waitFor() + worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode)) } } @@ -82,7 +88,6 @@ private[spark] class ExecutorRunner( workerThread.interrupt() workerThread = null state = ExecutorState.KILLED - worker ! ExecutorStateChanged(appId, execId, state, None, None) Runtime.getRuntime.removeShutdownHook(shutdownHook) } } @@ -148,14 +153,13 @@ private[spark] class ExecutorRunner( } catch { case interrupted: InterruptedException => { logInfo("Runner thread for executor " + fullId + " interrupted") - killProcess() + state = ExecutorState.KILLED + killProcess(None) } case e: Exception => { logError("Error running executor", e) - killProcess() state = ExecutorState.FAILED - val message = e.getClass + ": " + e.getMessage - worker ! ExecutorStateChanged(appId, execId, state, Some(message), None) + killProcess(Some(e.toString)) } } } diff --git a/dev/mima b/dev/mima new file mode 100755 index 0000000000000..d4099990254cc --- /dev/null +++ b/dev/mima @@ -0,0 +1,34 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -o pipefail + +# Go to the Spark project root directory +FWDIR="$(cd `dirname $0`/..; pwd)" +cd $FWDIR + +./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore +echo -e "q\n" | sbt/sbt mima-report-binary-issues | grep -v -e "info.*Resolving" +ret_val=$? + +if [ $ret_val != 0 ]; then + echo "NOTE: Exceptions to binary compatibility can be added in project/MimaExcludes.scala" +fi + +exit $ret_val diff --git a/dev/run-tests b/dev/run-tests index 6043f859ae463..93d6692f83ca8 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -81,5 +81,4 @@ fi echo "=========================================================================" echo "Detecting binary incompatibilites with MiMa" echo "=========================================================================" -./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore -echo -e "q\n" | sbt/sbt mima-report-binary-issues | grep -v -e "info.*Resolving" +dev/mima diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index fb808129bb65d..4ba20e590f2c2 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -9,6 +9,11 @@ {{ page.title }} - Spark {{site.SPARK_VERSION_SHORT}} Documentation + {% if page.redirect %} + + + {% endif %} +