Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Jul 9, 2014
2 parents d8e51f9 + ac9cdc1 commit eb2b3ba
Show file tree
Hide file tree
Showing 120 changed files with 2,011 additions and 579 deletions.
8 changes: 4 additions & 4 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)
# Verify that versions of java used to build the jars and run Spark are compatible
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
echo "Loading Spark jar with '$JAR_CMD' failed. "
echo "This is likely because Spark was compiled with Java 7 and run "
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark "
echo "or build Spark with Java 6."
echo "Loading Spark jar with '$JAR_CMD' failed. " 1>&2
echo "This is likely because Spark was compiled with Java 7 and run " 1>&2
echo "with Java 6. (see SPARK-1703). Please use Java 7 to run Spark " 1>&2
echo "or build Spark with Java 6." 1>&2
exit 1
fi

Expand Down
6 changes: 3 additions & 3 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export SPARK_HOME="$FWDIR"
SCALA_VERSION=2.10

if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./bin/pyspark [options]"
echo "Usage: ./bin/pyspark [options]" 1>&2
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi
Expand All @@ -36,8 +36,8 @@ if [ ! -f "$FWDIR/RELEASE" ]; then
# Exit if the user hasn't compiled Spark
ls "$FWDIR"/assembly/target/scala-$SCALA_VERSION/spark-assembly*hadoop*.jar >& /dev/null
if [[ $? != 0 ]]; then
echo "Failed to find Spark assembly in $FWDIR/assembly/target" >&2
echo "You need to build Spark before running this program" >&2
echo "Failed to find Spark assembly in $FWDIR/assembly/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
exit 1
fi
fi
Expand Down
10 changes: 5 additions & 5 deletions bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ if [ -n "$1" ]; then
EXAMPLE_CLASS="$1"
shift
else
echo "Usage: ./bin/run-example <example-class> [example-args]"
echo " - set MASTER=XX to use a specific master"
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)"
echo "Usage: ./bin/run-example <example-class> [example-args]" 1>&2
echo " - set MASTER=XX to use a specific master" 1>&2
echo " - can use abbreviated example class name (e.g. SparkPi, mllib.LinearRegression)" 1>&2
exit 1
fi

Expand All @@ -40,8 +40,8 @@ elif [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.ja
fi

if [[ -z $SPARK_EXAMPLES_JAR ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" >&2
echo "You need to build Spark before running this program" >&2
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
exit 1
fi

Expand Down
13 changes: 6 additions & 7 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ export SPARK_HOME="$FWDIR"
. $FWDIR/bin/load-spark-env.sh

if [ -z "$1" ]; then
echo "Usage: spark-class <class> [<args>]" >&2
echo "Usage: spark-class <class> [<args>]" 1>&2
exit 1
fi

if [ -n "$SPARK_MEM" ]; then
echo "Warning: SPARK_MEM is deprecated, please use a more specific config option"
echo "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)."
echo -e "Warning: SPARK_MEM is deprecated, please use a more specific config option" 1>&2
echo -e "(e.g., spark.executor.memory or SPARK_DRIVER_MEMORY)." 1>&2
fi

# Use SPARK_MEM or 512m as the default memory, to be overridden by specific options
Expand Down Expand Up @@ -147,10 +147,9 @@ fi
export CLASSPATH

if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
echo -n "Spark Command: "
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
echo "========================================"
echo
echo -n "Spark Command: " 1>&2
echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@" 1>&2
echo -e "========================================\n" 1>&2
fi

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
6 changes: 6 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ span.expand-details {
float: right;
}

pre {
font-size: 0.8em;
}

.stage-details {
max-height: 100px;
overflow-y: auto;
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import scala.collection.mutable.{ArrayBuffer, HashSet}

import org.apache.spark.executor.InputMetrics
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._

Expand All @@ -41,9 +42,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(values) =>
case Some(blockResult) =>
// Partition is already materialized, so just return its values
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
// Acquire a lock for loading this partition
Expand Down Expand Up @@ -110,7 +112,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
loading.add(id)
}
values.map(_.asInstanceOf[Iterator[T]])
values.map(_.data.asInstanceOf[Iterator[T]])
}
}
}
Expand All @@ -132,7 +134,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
* exceptions that can be avoided. */
updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
blockManager.get(key) match {
case Some(v) => v.asInstanceOf[Iterator[T]]
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
Expand Down
52 changes: 45 additions & 7 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark

import java.io.{IOException, ObjectInputStream, ObjectOutputStream}

import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.util.CollectionsUtils
import org.apache.spark.util.Utils
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.{CollectionsUtils, Utils}

/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
Expand Down Expand Up @@ -96,15 +98,15 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* the value of `partitions`.
*/
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
@transient partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]],
private val ascending: Boolean = true)
private var ascending: Boolean = true)
extends Partitioner {

private val ordering = implicitly[Ordering[K]]
private var ordering = implicitly[Ordering[K]]

// An array of upper bounds for the first (partitions - 1) partitions
private val rangeBounds: Array[K] = {
private var rangeBounds: Array[K] = {
if (partitions == 1) {
Array()
} else {
Expand All @@ -127,7 +129,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](

def numPartitions = rangeBounds.length + 1

private val binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]
private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
Expand Down Expand Up @@ -173,4 +175,40 @@ class RangePartitioner[K : Ordering : ClassTag, V](
result = prime * result + ascending.hashCode
result
}

@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream) {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => out.defaultWriteObject()
case _ =>
out.writeBoolean(ascending)
out.writeObject(ordering)
out.writeObject(binarySearch)

val ser = sfactory.newInstance()
Utils.serializeViaNestedStream(out, ser) { stream =>
stream.writeObject(scala.reflect.classTag[Array[K]])
stream.writeObject(rangeBounds)
}
}
}

@throws(classOf[IOException])
private def readObject(in: ObjectInputStream) {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
case _ =>
ascending = in.readBoolean()
ordering = in.readObject().asInstanceOf[Ordering[K]]
binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]

val ser = sfactory.newInstance()
Utils.deserializeViaNestedStream(in, ser) { ds =>
implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
rangeBounds = ds.readObject[Array[K]]()
}
}
}
}
12 changes: 10 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1203,9 +1203,17 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
* If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
* check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
* if not.
*
* @param f the closure to clean
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
* serializable
*/
private[spark] def clean[F <: AnyRef](f: F): F = {
ClosureCleaner.clean(f)
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
f
}

Expand Down
15 changes: 11 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class SparkEnv (

private[spark] def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()
Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
shuffleManager.stop()
broadcastManager.stop()
Expand Down Expand Up @@ -183,6 +183,7 @@ object SparkEnv extends Logging {

val serializer = instantiateClass[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")

val closureSerializer = instantiateClass[Serializer](
"spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")
Expand Down Expand Up @@ -227,9 +228,15 @@ object SparkEnv extends Logging {

val cacheManager = new CacheManager(blockManager)

val httpFileServer = new HttpFileServer(securityManager)
httpFileServer.initialize()
conf.set("spark.fileserver.uri", httpFileServer.serverUri)
val httpFileServer =
if (isDriver) {
val server = new HttpFileServer(securityManager)
server.initialize()
conf.set("spark.fileserver.uri", server.serverUri)
server
} else {
null
}

val metricsSystem = if (isDriver) {
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,11 @@ class SparkException(message: String, cause: Throwable)

def this(message: String) = this(message, null)
}

/**
* Exception thrown when execution of some user code in the driver process fails, e.g.
* accumulator update fails or failure in takeOrdered (user supplies an Ordering implementation
* that can be misbehaving.
*/
private[spark] class SparkDriverExecutionException(cause: Throwable)
extends SparkException("Execution error", cause)
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,8 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort:
} else {
// This happens on the master, where we pass the updates to Python through a socket
val socket = new Socket(serverHost, serverPort)
// SPARK-2282: Immediately reuse closed sockets because we create one per task.
socket.setReuseAddress(true)
val in = socket.getInputStream
val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize))
out.writeInt(val2.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
import org.apache.spark.util.{SignalLogger, Utils}

/**
* A web server that renders SparkUIs of completed applications.
Expand Down Expand Up @@ -169,10 +169,11 @@ class HistoryServer(
*
* This launches the HistoryServer as a Spark daemon.
*/
object HistoryServer {
object HistoryServer extends Logging {
private val conf = new SparkConf

def main(argStrings: Array[String]) {
SignalLogger.register(log)
initSecurity()
val args = new HistoryServerArguments(conf, argStrings)
val securityManager = new SecurityManager(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}

private[spark] class Master(
host: String,
Expand Down Expand Up @@ -481,7 +481,7 @@ private[spark] class Master(
// First schedule drivers, they take strict precedence over applications
val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
for (driver <- waitingDrivers) {
for (driver <- List(waitingDrivers: _*)) { // iterate over a copy of waitingDrivers
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
Expand Down Expand Up @@ -755,12 +755,13 @@ private[spark] class Master(
}
}

private[spark] object Master {
private[spark] object Master extends Logging {
val systemName = "sparkMaster"
private val actorName = "Master"
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r

def main(argStrings: Array[String]) {
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}

/**
* @param masterUrls Each url should look like spark://host:port.
Expand Down Expand Up @@ -365,8 +365,9 @@ private[spark] class Worker(
}
}

private[spark] object Worker {
private[spark] object Worker extends Logging {
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
Expand Down
Loading

0 comments on commit eb2b3ba

Please sign in to comment.