Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed Oct 5, 2014
2 parents cedcc6f + cf1d32e commit 301eb4a
Show file tree
Hide file tree
Showing 144 changed files with 3,026 additions and 1,271 deletions.
10 changes: 10 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -349,5 +349,15 @@
</plugins>
</build>
</profile>
<profile>
<id>kinesis-asl</id>
<dependencies>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${commons.httpclient.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
8 changes: 7 additions & 1 deletion bin/compute-classpath.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ rem Load environment variables from conf\spark-env.cmd, if it exists
if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"

rem Build up classpath
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf
set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%

if "x%SPARK_CONF_DIR%"!="x" (
set CLASSPATH=%CLASSPATH%;%SPARK_CONF_DIR%
) else (
set CLASSPATH=%CLASSPATH%;%FWDIR%conf
)

if exist "%FWDIR%RELEASE" (
for %%d in ("%FWDIR%lib\spark-assembly*.jar") do (
Expand Down
8 changes: 7 additions & 1 deletion bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,14 @@ FWDIR="$(cd "`dirname "$0"`"/..; pwd)"

. "$FWDIR"/bin/load-spark-env.sh

CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH"

# Build up classpath
CLASSPATH="$SPARK_CLASSPATH:$SPARK_SUBMIT_CLASSPATH:$FWDIR/conf"
if [ -n "$SPARK_CONF_DIR" ]; then
CLASSPATH="$CLASSPATH:$SPARK_CONF_DIR"
else
CLASSPATH="$CLASSPATH:$FWDIR/conf"
fi

ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

Expand Down
24 changes: 12 additions & 12 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,20 @@ fi

# Figure out which Python executable to use
if [[ -z "$PYSPARK_PYTHON" ]]; then
PYSPARK_PYTHON="python"
if [[ "$IPYTHON" = "1" || -n "$IPYTHON_OPTS" ]]; then
# for backward compatibility
PYSPARK_PYTHON="ipython"
else
PYSPARK_PYTHON="python"
fi
fi
export PYSPARK_PYTHON

if [[ -z "$PYSPARK_PYTHON_OPTS" && -n "$IPYTHON_OPTS" ]]; then
# for backward compatibility
PYSPARK_PYTHON_OPTS="$IPYTHON_OPTS"
fi

# Add the PySpark classes to the Python path:
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
Expand All @@ -64,11 +74,6 @@ export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
export PYTHONSTARTUP="$FWDIR/python/pyspark/shell.py"

# If IPython options are specified, assume user wants to run IPython
if [[ -n "$IPYTHON_OPTS" ]]; then
IPYTHON=1
fi

# Build up arguments list manually to preserve quotes and backslashes.
# We export Spark submit arguments as an environment variable because shell.py must run as a
# PYTHONSTARTUP script, which does not take in arguments. This is required for IPython notebooks.
Expand Down Expand Up @@ -106,10 +111,5 @@ if [[ "$1" =~ \.py$ ]]; then
else
# PySpark shell requires special handling downstream
export PYSPARK_SHELL=1
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" ]]; then
exec ${PYSPARK_PYTHON:-ipython} $IPYTHON_OPTS
else
exec "$PYSPARK_PYTHON"
fi
exec "$PYSPARK_PYTHON" $PYSPARK_PYTHON_OPTS
fi
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*
)
if [%FOUND_JAR%] == [0] (
echo Failed to find Spark assembly JAR.
echo You need to build Spark with sbt\sbt assembly before running this program.
echo You need to build Spark before running this program.
goto exit
)
:skip_build_test
Expand Down
2 changes: 1 addition & 1 deletion bin/run-example2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ if exist "%FWDIR%RELEASE" (
)
if "x%SPARK_EXAMPLES_JAR%"=="x" (
echo Failed to find Spark examples assembly JAR.
echo You need to build Spark with sbt\sbt assembly before running this program.
echo You need to build Spark before running this program.
goto exit
)

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ fi
if [[ "$1" =~ org.apache.spark.tools.* ]]; then
if test -z "$SPARK_TOOLS_JAR"; then
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SCALA_VERSION/" 1>&2
echo "You need to build spark before running $1." 1>&2
echo "You need to build Spark before running $1." 1>&2
exit 1
fi
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*
)
if "%FOUND_JAR%"=="0" (
echo Failed to find Spark assembly JAR.
echo You need to build Spark with sbt\sbt assembly before running this program.
echo You need to build Spark before running this program.
goto exit
)
:skip_build_test
Expand Down
2 changes: 1 addition & 1 deletion bin/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# limitations under the License.
#

# Gather all all spark-submit options into SUBMISSION_OPTS
# Gather all spark-submit options into SUBMISSION_OPTS
function gatherSparkSubmitOpts() {

if [ -z "$SUBMIT_USAGE_FUNCTION" ]; then
Expand Down
19 changes: 17 additions & 2 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ trait FutureAction[T] extends Future[T] {
*/
@throws(classOf[Exception])
def get(): T = Await.result(this, Duration.Inf)

/**
* Returns the job IDs run by the underlying async operation.
*
* This returns the current snapshot of the job list. Certain operations may run multiple
* jobs, so multiple calls to this method may return different lists.
*/
def jobIds: Seq[Int]

}


Expand Down Expand Up @@ -150,8 +159,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
}
}

/** Get the corresponding job id for this action. */
def jobId = jobWaiter.jobId
def jobIds = Seq(jobWaiter.jobId)
}


Expand All @@ -171,6 +179,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
// is cancelled before the action was even run (and thus we have no thread to interrupt).
@volatile private var _cancelled: Boolean = false

@volatile private var jobs: Seq[Int] = Nil

// A promise used to signal the future.
private val p = promise[T]()

Expand Down Expand Up @@ -219,6 +229,8 @@ class ComplexFutureAction[T] extends FutureAction[T] {
}
}

this.jobs = jobs ++ job.jobIds

// Wait for the job to complete. If the action is cancelled (with an interrupt),
// cancel the job and stop the execution. This is not in a synchronized block because
// Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
Expand Down Expand Up @@ -255,4 +267,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
override def isCompleted: Boolean = p.isCompleted

override def value: Option[Try[T]] = p.future.value

def jobIds = jobs

}
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
* and a Server, so for a particular connection is has to determine what to do.
* A ConnectionId was added to be able to track connections and is used to
* match up incoming messages with connections waiting for authentication.
* If its acting as a client and trying to send a message to another ConnectionManager,
* it blocks the thread calling sendMessage until the SASL negotiation has occurred.
* The ConnectionManager tracks all the sendingConnections using the ConnectionId
* and waits for the response from the server and does the handshake.
* and waits for the response from the server and does the handshake before sending
* the real message.
*
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
Expand Down
52 changes: 36 additions & 16 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging {
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")

private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
private[spark] val eventLogDir: Option[String] = {
if (isEventLogEnabled) {
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
} else {
None
}
}

// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
Expand All @@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val listenerBus = new LiveListenerBus

// Create the Spark execution environment (cache, map output tracker, etc)
conf.set("spark.executor.id", "driver")
private[spark] val env = SparkEnv.create(
conf,
"<driver>",
Expand Down Expand Up @@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

val startTime = System.currentTimeMillis()

// Add each JAR given through the constructor
Expand Down Expand Up @@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging {
// constructor
taskScheduler.start()

val applicationId: String = taskScheduler.applicationId()
conf.set("spark.app.id", applicationId)

val metricsSystem = env.metricsSystem

// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
metricsSystem.start()

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}

// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()

private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
Expand Down Expand Up @@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging {
// Post init
taskScheduler.postStartHook()

private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)

private def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
Expand Down Expand Up @@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging {
private def postApplicationStart() {
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser))
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,15 @@ object SparkEnv extends Logging {
}

val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
MetricsSystem.createMetricsSystem("executor", conf, securityManager)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start()
ms
}
metricsSystem.start()

// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ object PythonRunner {
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)

// Launch Python process
val builder = new ProcessBuilder(Seq(pythonExec, "-u", formattedPythonFile) ++ otherArgs)
val builder = new ProcessBuilder(Seq(pythonExec, formattedPythonFile) ++ otherArgs)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
Expand Down
Loading

0 comments on commit 301eb4a

Please sign in to comment.