Skip to content

Commit

Permalink
Merge branch 'master' into takeSample
Browse files Browse the repository at this point in the history
  • Loading branch information
dorx committed Jun 2, 2014
2 parents 7cab53a + 9a5d482 commit e3fd6a6
Show file tree
Hide file tree
Showing 62 changed files with 3,412 additions and 2,035 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
sbt/*.jar
.settings
.cache
.mima-excludes
.generated-mima-excludes
/build/
work/
out/
Expand Down
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ target
.project
.classpath
.mima-excludes
.generated-mima-excludes
.rat-excludes
.*md
derby.log
Expand Down
29 changes: 0 additions & 29 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -263,35 +263,6 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<phase>test</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
<target>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
<condition>
<not>
<or>
<isset property="env.SCALA_HOME" />
<isset property="env.SCALA_LIBRARY_PATH" />
</or>
</not>
</condition>
</fail>
</target>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class SparkContext(config: SparkConf) extends Logging {
* :: DeveloperApi ::
* Alternative constructor for setting preferred locations where Spark will create executors.
*
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. Ca
* be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on.
* Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]]
* from a list of input files or InputFormats for the application.
*/
@DeveloperApi
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,26 @@ private object SpecialLengths {
private[spark] object PythonRDD {
val UTF8 = Charset.forName("UTF-8")

/**
* Adapter for calling SparkContext#runJob from Python.
*
* This method will return an iterator of an array that contains all elements in the RDD
* (effectively a collect()), but allows you to run on a certain subset of partitions,
* or to enable local execution.
*/
def runJob(
sc: SparkContext,
rdd: JavaRDD[Array[Byte]],
partitions: JArrayList[Int],
allowLocal: Boolean): Iterator[Array[Byte]] = {
type ByteArray = Array[Byte]
type UnrolledPartition = Array[ByteArray]
val allPartitions: Array[UnrolledPartition] =
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal)
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
flattenedPartition.iterator
}

def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
JavaRDD[Array[Byte]] = {
val file = new DataInputStream(new FileInputStream(filename))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,16 +381,19 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
object SparkSubmitArguments {
/** Load properties present in the given file. */
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
require(file.exists(), s"Properties file ${file.getName} does not exist")
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inputStream = new FileInputStream(file)
val properties = new Properties()
try {
val properties = new Properties()
properties.load(inputStream)
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
} catch {
case e: IOException =>
val message = s"Failed when loading Spark properties file ${file.getName}"
val message = s"Failed when loading Spark properties file $file"
throw new SparkException(message, e)
} finally {
inputStream.close()
}
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,23 @@ private[spark] class ExecutorRunner(
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess()
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}

private def killProcess() {
/**
* kill executor process, wait for exit and notify worker to update resource status
*
* @param message the exception message which caused the executor's death
*/
private def killProcess(message: Option[String]) {
if (process != null) {
logInfo("Killing process!")
process.destroy()
process.waitFor()
val exitCode = process.waitFor()
worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
}
}

Expand All @@ -82,7 +88,6 @@ private[spark] class ExecutorRunner(
workerThread.interrupt()
workerThread = null
state = ExecutorState.KILLED
worker ! ExecutorStateChanged(appId, execId, state, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
Expand Down Expand Up @@ -148,14 +153,13 @@ private[spark] class ExecutorRunner(
} catch {
case interrupted: InterruptedException => {
logInfo("Runner thread for executor " + fullId + " interrupted")
killProcess()
state = ExecutorState.KILLED
killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
killProcess()
state = ExecutorState.FAILED
val message = e.getClass + ": " + e.getMessage
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
killProcess(Some(e.toString))
}
}
}
Expand Down
34 changes: 34 additions & 0 deletions dev/mima
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set -o pipefail

# Go to the Spark project root directory
FWDIR="$(cd `dirname $0`/..; pwd)"
cd $FWDIR

./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore
echo -e "q\n" | sbt/sbt mima-report-binary-issues | grep -v -e "info.*Resolving"
ret_val=$?

if [ $ret_val != 0 ]; then
echo "NOTE: Exceptions to binary compatibility can be added in project/MimaExcludes.scala"
fi

exit $ret_val
3 changes: 1 addition & 2 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,4 @@ fi
echo "========================================================================="
echo "Detecting binary incompatibilites with MiMa"
echo "========================================================================="
./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore
echo -e "q\n" | sbt/sbt mima-report-binary-issues | grep -v -e "info.*Resolving"
dev/mima
18 changes: 12 additions & 6 deletions docs/_layouts/global.html
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
<title>{{ page.title }} - Spark {{site.SPARK_VERSION_SHORT}} Documentation</title>
<meta name="description" content="">

{% if page.redirect %}
<meta http-equiv="refresh" content="0; url={{page.redirect}}">
<link rel="canonical" href="{{page.redirect}}" />
{% endif %}

<link rel="stylesheet" href="css/bootstrap.min.css">
<style>
body {
Expand Down Expand Up @@ -61,15 +66,13 @@
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Programming Guides<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="quick-start.html">Quick Start</a></li>
<li><a href="scala-programming-guide.html">Spark in Scala</a></li>
<li><a href="java-programming-guide.html">Spark in Java</a></li>
<li><a href="python-programming-guide.html">Spark in Python</a></li>
<li><a href="programming-guide.html">Spark Programming Guide</a></li>
<li class="divider"></li>
<li><a href="streaming-programming-guide.html">Spark Streaming</a></li>
<li><a href="sql-programming-guide.html">Spark SQL</a></li>
<li><a href="mllib-guide.html">MLlib (Machine Learning)</a></li>
<li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
<li><a href="graphx-programming-guide.html">GraphX (Graph Processing)</a></li>
<li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
</ul>
</li>

Expand All @@ -86,6 +89,8 @@
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Deploying<b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="cluster-overview.html">Overview</a></li>
<li><a href="submitting-applications.html">Submitting Applications</a></li>
<li class="divider"></li>
<li><a href="ec2-scripts.html">Amazon EC2</a></li>
<li><a href="spark-standalone.html">Standalone Mode</a></li>
<li><a href="running-on-mesos.html">Mesos</a></li>
Expand All @@ -99,9 +104,10 @@
<li><a href="configuration.html">Configuration</a></li>
<li><a href="monitoring.html">Monitoring</a></li>
<li><a href="tuning.html">Tuning Guide</a></li>
<li><a href="hadoop-third-party-distributions.html">Running with CDH/HDP</a></li>
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
<li><a href="job-scheduling.html">Job Scheduling</a></li>
<li><a href="security.html">Security</a></li>
<li><a href="hardware-provisioning.html">Hardware Provisioning</a></li>
<li><a href="hadoop-third-party-distributions.html">3<sup>rd</sup>-Party Hadoop Distros</a></li>
<li class="divider"></li>
<li><a href="building-with-maven.html">Building Spark with Maven</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark">Contributing to Spark</a></li>
Expand Down
2 changes: 1 addition & 1 deletion docs/bagel-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ To use Bagel in your program, add the following SBT or Maven dependency:

# Programming Model

Bagel operates on a graph represented as a [distributed dataset](scala-programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
Bagel operates on a graph represented as a [distributed dataset](programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.

For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to.

Expand Down
Loading

0 comments on commit e3fd6a6

Please sign in to comment.