Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into SPARK-3278
Browse files Browse the repository at this point in the history
  • Loading branch information
zapletal-martin committed Jan 23, 2015
2 parents 7aca4cc + ea74365 commit 12151e6
Show file tree
Hide file tree
Showing 173 changed files with 5,274 additions and 3,494 deletions.
27 changes: 15 additions & 12 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,25 @@ else
assembly_folder="$ASSEMBLY_DIR"
fi

num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" ]; then
echo "Failed to find Spark assembly in $assembly_folder"
echo "You need to build Spark before running this program."
exit 1
fi
num_jars=0

for f in ${assembly_folder}/spark-assembly*hadoop*.jar; do
if [[ ! -e "$f" ]]; then
echo "Failed to find Spark assembly in $assembly_folder" 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
ASSEMBLY_JAR="$f"
num_jars=$((num_jars+1))
done

if [ "$num_jars" -gt "1" ]; then
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar$")
echo "Found multiple Spark assembly jars in $assembly_folder:"
echo "$jars_list"
echo "Please remove all but one jar."
echo "Found multiple Spark assembly jars in $assembly_folder:" 1>&2
ls ${assembly_folder}/spark-assembly*hadoop*.jar 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi

ASSEMBLY_JAR="$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)"

# Verify that versions of java used to build the jars and run Spark are compatible
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
Expand Down
27 changes: 21 additions & 6 deletions bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,32 @@ else
fi

if [ -f "$FWDIR/RELEASE" ]; then
export SPARK_EXAMPLES_JAR="`ls "$FWDIR"/lib/spark-examples-*hadoop*.jar`"
elif [ -e "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar ]; then
export SPARK_EXAMPLES_JAR="`ls "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar`"
JAR_PATH="${FWDIR}/lib"
else
JAR_PATH="${EXAMPLES_DIR}/target/scala-${SPARK_SCALA_VERSION}"
fi

if [[ -z "$SPARK_EXAMPLES_JAR" ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
JAR_COUNT=0

for f in ${JAR_PATH}/spark-examples-*hadoop*.jar; do
if [[ ! -e "$f" ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
exit 1
fi
SPARK_EXAMPLES_JAR="$f"
JAR_COUNT=$((JAR_COUNT+1))
done

if [ "$JAR_COUNT" -gt "1" ]; then
echo "Found multiple Spark examples assembly jars in ${JAR_PATH}" 1>&2
ls ${JAR_PATH}/spark-examples-*hadoop*.jar 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi

export SPARK_EXAMPLES_JAR

EXAMPLE_MASTER=${MASTER:-"local[*]"}

if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
Expand Down
2 changes: 2 additions & 0 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ case "$1" in
'org.apache.spark.executor.MesosExecutorBackend')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
export PYTHONPATH="$FWDIR/python:$PYTHONPATH"
export PYTHONPATH="$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
;;

# Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN
8 changes: 8 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ pre {
border: none;
}

.description-input {
overflow: hidden;
text-overflow: ellipsis;
width: 100%;
white-space: nowrap;
display: block;
}

.stacktrace-details {
max-height: 300px;
overflow-y: auto;
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
Expand Down Expand Up @@ -95,8 +95,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private[spark] class ExecutorAllocationManager(
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutor == 0) {
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.cores")
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.")
}
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
import scala.collection.mutable.{HashMap, LinkedHashSet}
import org.apache.spark.serializer.KryoSerializer

Expand Down Expand Up @@ -46,7 +47,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)

private[spark] val settings = new HashMap[String, String]()
private[spark] val settings = new TrieMap[String, String]()

if (loadDefaults) {
// Load any spark.* system properties
Expand Down Expand Up @@ -177,7 +178,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}

/** Get all parameters as a list of pairs */
def getAll: Array[(String, String)] = settings.clone().toArray
def getAll: Array[(String, String)] = settings.toArray

/** Get a parameter as an integer, falling back to a default if not set */
def getInt(key: String, defaultValue: Int): Int = {
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils

/**
* Defines operations common to several Java RDD implementations.
* Note that this trait is not intended to be implemented by user code.
*/
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This

Expand Down Expand Up @@ -435,6 +439,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def first(): T = rdd.first()

/**
* @return true if and only if the RDD contains no elements at all. Note that an RDD
* may be empty even when it has at least 1 partition.
*/
def isEmpty(): Boolean = rdd.isEmpty()

/**
* Save this RDD as a text file, using string representations of elements.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ private[spark] class PythonRDD(
init, finish))
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += diskBytesSpilled
context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled)
read()
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
// Signals that an exception has been thrown in python
Expand Down
19 changes: 10 additions & 9 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ private[spark] class Executor(
val afterSerialization = System.currentTimeMillis()

for (m <- task.metrics) {
m.executorDeserializeTime = taskStart - deserializeStartTime
m.executorRunTime = taskFinish - taskStart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = afterSerialization - beforeSerialization
m.setExecutorDeserializeTime(taskStart - deserializeStartTime)
m.setExecutorRunTime(taskFinish - taskStart)
m.setJvmGCTime(gcTime - startGCTime)
m.setResultSerializationTime(afterSerialization - beforeSerialization)
}

val accumUpdates = Accumulators.values
Expand Down Expand Up @@ -257,8 +257,8 @@ private[spark] class Executor(
val serviceTime = System.currentTimeMillis() - taskStart
val metrics = attemptedTask.flatMap(t => t.metrics)
for (m <- metrics) {
m.executorRunTime = serviceTime
m.jvmGCTime = gcTime - startGCTime
m.setExecutorRunTime(serviceTime)
m.setJvmGCTime(gcTime - startGCTime)
}
val reason = new ExceptionFailure(t, metrics)
execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
Expand Down Expand Up @@ -376,11 +376,12 @@ private[spark] class Executor(
val curGCTime = gcTime

for (taskRunner <- runningTasks.values()) {
if (!taskRunner.attemptedTask.isEmpty) {
if (taskRunner.attemptedTask.nonEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
metrics.updateShuffleReadMetrics()
metrics.updateInputMetrics()
metrics.jvmGCTime = curGCTime - taskRunner.startGCTime
metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)

if (isLocal) {
// JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see
Expand Down
Loading

0 comments on commit 12151e6

Please sign in to comment.