Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
msiddalingaiah committed Dec 17, 2014
2 parents 38faca4 + 3d0c37b commit 51d14b9
Show file tree
Hide file tree
Showing 393 changed files with 8,104 additions and 3,155 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*.ipr
*.iml
*.iws
*.pyc
.idea/
.idea_modules/
sbt/*.jar
Expand Down Expand Up @@ -49,6 +50,8 @@ dependency-reduced-pom.xml
checkpoint
derby.log
dist/
dev/create-release/*txt
dev/create-release/*final
spark-*-bin-*.tgz
unit-tests.log
/lib/
Expand Down
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ dist/*
logs
.*scalastyle-output.xml
.*dependency-reduced-pom.xml
known_translations
3 changes: 2 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,8 @@ THE SOFTWARE.

========================================================================
For Scala Interpreter classes (all .scala files in repl/src/main/scala
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala):
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
and for SerializableMapWrapper in JavaUtils.scala:
========================================================================

Copyright (c) 2002-2013 EPFL
Expand Down
10 changes: 0 additions & 10 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,6 @@
</build>

<profiles>
<profile>
<id>yarn-alpha</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn-alpha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>yarn</id>
<dependencies>
Expand Down
21 changes: 21 additions & 0 deletions bin/beeline.cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
@echo off

rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

set SPARK_HOME=%~dp0..
cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.hive.beeline.BeeLine %*
6 changes: 3 additions & 3 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ else
assembly_folder="$ASSEMBLY_DIR"
fi

num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)"
num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" ]; then
echo "Failed to find Spark assembly in $assembly_folder"
echo "You need to build Spark before running this program."
exit 1
fi
if [ "$num_jars" -gt "1" ]; then
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar")
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar$")
echo "Found multiple Spark assembly jars in $assembly_folder:"
echo "$jars_list"
echo "Please remove all but one jar."
Expand Down Expand Up @@ -108,7 +108,7 @@ else
datanucleus_dir="$FWDIR"/lib_managed/jars
fi

datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar")"
datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar$")"
datanucleus_jars="$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)"

if [ -n "$datanucleus_jars" ]; then
Expand Down
7 changes: 7 additions & 0 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ source "$FWDIR"/bin/utils.sh
SUBMIT_USAGE_FUNCTION=usage
gatherSparkSubmitOpts "$@"

# SPARK-4161: scala does not assume use of the java classpath,
# so we need to add the "-Dscala.usejavacp=true" flag mnually. We
# do this specifically for the Spark shell because the scala REPL
# has its own class loader, and any additional classpath specified
# through spark.driver.extraClassPath is not automatically propagated.
SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"

function main() {
if $cygwin; then
# Workaround for issue involving JLine and Cygwin
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/spark/SparkJobInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark;

import java.io.Serializable;

/**
* Exposes information about Spark Jobs.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkJobInfo {
public interface SparkJobInfo extends Serializable {
int jobId();
int[] stageIds();
JobExecutionStatus status();
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/spark/SparkStageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark;

import java.io.Serializable;

/**
* Exposes information about Spark Stages.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkStageInfo {
public interface SparkStageInfo extends Serializable {
int stageId();
int currentAttemptId();
long submissionTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,6 @@ span.additional-metric-title {

/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .gc_time, .deserialization_time, .serialization_time, .getting_result_time {
.scheduler_delay, .deserialization_time, .serialization_time, .getting_result_time {
display: none;
}
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.{ObjectInputStream, Serializable}
import java.util.concurrent.atomic.AtomicLong
import java.lang.ThreadLocal

import scala.collection.generic.Growable
import scala.collection.mutable.Map
Expand Down Expand Up @@ -278,10 +279,12 @@ object AccumulatorParam {

// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private object Accumulators {
private[spark] object Accumulators {
// TODO: Use soft references? => need to make readObject work properly then
val originals = Map[Long, Accumulable[_, _]]()
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
}
var lastId: Long = 0

def newId(): Long = synchronized {
Expand All @@ -293,22 +296,21 @@ private object Accumulators {
if (original) {
originals(a.id) = a
} else {
val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
accums(a.id) = a
localAccums.get()(a.id) = a
}
}

// Clear the local (non-original) accumulators for the current thread
def clear() {
synchronized {
localAccums.remove(Thread.currentThread)
localAccums.get.clear
}
}

// Get the values of the local accumulators for the current thread (by ID)
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
for ((id, accum) <- localAccums.get) {
ret(id) = accum.localValue
}
return ret
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {

private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
// When spilling is enabled sorting will happen externally, but not necessarily with an
// ExternalSorter.
private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)

@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
combineValuesByKey(iter, null)

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
if (!externalSorting) {
if (!isSpillEnabled) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
Expand Down Expand Up @@ -71,9 +73,9 @@ case class Aggregator[K, V, C] (
combineCombinersByKey(iter, null)

def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext)
: Iterator[(K, C)] =
: Iterator[(K, C)] =
{
if (!externalSorting) {
if (!isSpillEnabled) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
val update = (hadValue: Boolean, oldValue: C) => {
Expand Down
34 changes: 16 additions & 18 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
/**
* Class that keeps track of the location of the map output of
* a stage. This is abstract because different versions of MapOutputTracker
* (driver and worker) use different HashMap to store its metadata.
* (driver and executor) use different HashMap to store its metadata.
*/
private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging {
private val timeout = AkkaUtils.askTimeout(conf)
Expand All @@ -81,11 +81,11 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
var trackerActor: ActorRef = _

/**
* This HashMap has different behavior for the master and the workers.
* This HashMap has different behavior for the driver and the executors.
*
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
* master's corresponding HashMap.
* On the driver, it serves as the source of map outputs recorded from ShuffleMapTasks.
* On the executors, it simply serves as a cache, in which a miss triggers a fetch from the
* driver's corresponding HashMap.
*
* Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
* thread-safe map.
Expand All @@ -99,7 +99,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
protected var epoch: Long = 0
protected val epochLock = new AnyRef

/** Remembers which map output locations are currently being fetched on a worker. */
/** Remembers which map output locations are currently being fetched on an executor. */
private val fetching = new HashSet[Int]

/**
Expand Down Expand Up @@ -136,14 +136,12 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
var fetchedStatuses: Array[MapStatus] = null
fetching.synchronized {
if (fetching.contains(shuffleId)) {
// Someone else is fetching it; wait for them to be done
while (fetching.contains(shuffleId)) {
try {
fetching.wait()
} catch {
case e: InterruptedException =>
}
// Someone else is fetching it; wait for them to be done
while (fetching.contains(shuffleId)) {
try {
fetching.wait()
} catch {
case e: InterruptedException =>
}
}

Expand Down Expand Up @@ -198,8 +196,8 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging

/**
* Called from executors to update the epoch number, potentially clearing old outputs
* because of a fetch failure. Each worker task calls this with the latest epoch
* number on the master at the time it was created.
* because of a fetch failure. Each executor task calls this with the latest epoch
* number on the driver at the time it was created.
*/
def updateEpoch(newEpoch: Long) {
epochLock.synchronized {
Expand Down Expand Up @@ -231,7 +229,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
private var cacheEpoch = epoch

/**
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the master,
* Timestamp based HashMap for storing mapStatuses and cached serialized statuses in the driver,
* so that statuses are dropped only by explicit de-registering or by TTL-based cleaning (if set).
* Other than these two scenarios, nothing should be dropped from this HashMap.
*/
Expand Down Expand Up @@ -341,7 +339,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
}

/**
* MapOutputTracker for the workers, which fetches map output information from the driver's
* MapOutputTracker for the executors, which fetches map output information from the driver's
* MapOutputTrackerMaster.
*/
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,19 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* Note that SASL is pluggable as to what mechanism it uses. We currently use
* DIGEST-MD5 but this could be changed to use Kerberos or other in the future.
* Spark currently supports "auth" for the quality of protection, which means
* the connection is not supporting integrity or privacy protection (encryption)
* the connection does not support integrity or privacy protection (encryption)
* after authentication. SASL also supports "auth-int" and "auth-conf" which
* SPARK could be support in the future to allow the user to specify the quality
* SPARK could support in the future to allow the user to specify the quality
* of protection they want. If we support those, the messages will also have to
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
*
* Since the NioBlockTransferService does asynchronous messages passing, the SASL
* authentication is a bit more complex. A ConnectionManager can be both a client
* and a Server, so for a particular connection is has to determine what to do.
* and a Server, so for a particular connection it has to determine what to do.
* A ConnectionId was added to be able to track connections and is used to
* match up incoming messages with connections waiting for authentication.
* The ConnectionManager tracks all the sendingConnections using the ConnectionId
* and waits for the response from the server and does the handshake before sending
* The ConnectionManager tracks all the sendingConnections using the ConnectionId,
* waits for the response from the server, and does the handshake before sending
* the real message.
*
* The NettyBlockTransferService ensures that SASL authentication is performed
Expand All @@ -114,14 +114,14 @@ import org.apache.spark.network.sasl.SecretKeyHolder
*
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
* properly. For non-Yarn deployments, users can write a filter to go through a
* companies normal login service. If an authentication filter is in place then the
* properly. For non-Yarn deployments, users can write a filter to go through their
* organization's normal login service. If an authentication filter is in place then the
* SparkUI can be configured to check the logged in user against the list of users who
* have view acls to see if that user is authorized.
* The filters can also be used for many different purposes. For instance filters
* could be used for logging, encryption, or compression.
*
* The exact mechanisms used to generate/distributed the shared secret is deployment specific.
* The exact mechanisms used to generate/distribute the shared secret are deployment-specific.
*
* For Yarn deployments, the secret is automatically generated using the Akka remote
* Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed
Expand All @@ -138,7 +138,7 @@ import org.apache.spark.network.sasl.SecretKeyHolder
* All the nodes (Master and Workers) and the applications need to have the same shared secret.
* This again is not ideal as one user could potentially affect another users application.
* This should be enhanced in the future to provide better protection.
* If the UI needs to be secured the user needs to install a javax servlet filter to do the
* If the UI needs to be secure, the user needs to install a javax servlet filter to do the
* authentication. Spark will then use that user to compare against the view acls to do
* authorization. If not filter is in place the user is generally null and no authorization
* can take place.
Expand Down
Loading

0 comments on commit 51d14b9

Please sign in to comment.