Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Augustin Borsu committed Feb 17, 2015
2 parents 196cd7a + c06e42f commit 2e89719
Show file tree
Hide file tree
Showing 96 changed files with 1,982 additions and 721 deletions.
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1363,6 +1363,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
cleaner.foreach(_.stop())
dagScheduler.stop()
dagScheduler = null
progressBar.foreach(_.stop())
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.python

import java.io.DataOutputStream
import java.net.Socket

import py4j.GatewayServer

import org.apache.spark.Logging
import org.apache.spark.util.Utils

/**
* Process that starts a Py4J GatewayServer on an ephemeral port and communicates the bound port
* back to its caller via a callback port specified by the caller.
*
* This process is launched (via SparkSubmit) by the PySpark driver (see java_gateway.py).
*/
private[spark] object PythonGatewayServer extends Logging {
def main(args: Array[String]): Unit = Utils.tryOrExit {
// Start a GatewayServer on an ephemeral port
val gatewayServer: GatewayServer = new GatewayServer(null, 0)
gatewayServer.start()
val boundPort: Int = gatewayServer.getListeningPort
if (boundPort == -1) {
logError("GatewayServer failed to bind; exiting")
System.exit(1)
} else {
logDebug(s"Started PythonGatewayServer on port $boundPort")
}

// Communicate the bound port back to the caller via the caller-specified callback port
val callbackHost = sys.env("_PYSPARK_DRIVER_CALLBACK_HOST")
val callbackPort = sys.env("_PYSPARK_DRIVER_CALLBACK_PORT").toInt
logDebug(s"Communicating GatewayServer port to Python driver at $callbackHost:$callbackPort")
val callbackSocket = new Socket(callbackHost, callbackPort)
val dos = new DataOutputStream(callbackSocket.getOutputStream)
dos.writeInt(boundPort)
dos.close()
callbackSocket.close()

// Exit on EOF or broken pipe to ensure that this process dies when the Python driver dies:
while (System.in.read() != -1) {
// Do nothing
}
logDebug("Exiting due to broken pipe from Python driver")
System.exit(0)
}
}
25 changes: 19 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,24 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}

// Check whether the worker is ready to be re-used.
if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
if (reuse_worker) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
released = true
if (reuse_worker) {
// It has a high possibility that the ending mark is already available,
// And current task should not be blocked by checking it

if (stream.available() >= 4) {
val ending = stream.readInt()
if (ending == SpecialLengths.END_OF_STREAM) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
released = true
logInfo(s"Communication with worker ended cleanly, re-use it: $worker")
} else {
logInfo(s"Communication with worker did not end cleanly " +
s"(ending with $ending), close it: $worker")
}
} else {
logInfo(s"The ending mark from worker is not available, close it: $worker")
}
}
null
Expand Down Expand Up @@ -248,13 +261,13 @@ private[spark] class PythonRDD(
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
worker.shutdownOutput()
Utils.tryLog(worker.shutdownOutput())

case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
worker.shutdownOutput()
Utils.tryLog(worker.shutdownOutput())
} finally {
// Release memory used by this thread for shuffles
env.shuffleMemoryManager.releaseMemoryForThisThread()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.executor._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}

/**
Expand Down Expand Up @@ -284,8 +283,7 @@ object SparkSubmit {
// If we're running a python app, set the main class to our specific python runner
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
} else {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.deploy.rest

import scala.util.Try

import com.fasterxml.jackson.annotation._
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility
import com.fasterxml.jackson.annotation.JsonInclude.Include
Expand Down Expand Up @@ -111,12 +109,14 @@ private[spark] object SubmitRestProtocolMessage {
* If the action field is not found, throw a [[SubmitRestMissingFieldException]].
*/
def parseAction(json: String): String = {
parse(json).asInstanceOf[JObject].obj
.find { case (f, _) => f == "action" }
.map { case (_, v) => v.asInstanceOf[JString].s }
.getOrElse {
throw new SubmitRestMissingFieldException(s"Action field not found in JSON:\n$json")
}
val value: Option[String] = parse(json) match {
case JObject(fields) =>
fields.collectFirst { case ("action", v) => v }.collect { case JString(s) => s }
case _ => None
}
value.getOrElse {
throw new SubmitRestMissingFieldException(s"Action field not found in JSON:\n$json")
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))

addShutdownHook()
private val shutdownHook = addShutdownHook()

/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
Expand Down Expand Up @@ -134,17 +134,22 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
}

private def addShutdownHook() {
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
private def addShutdownHook(): Thread = {
val shutdownHook = new Thread("delete Spark local dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
DiskBlockManager.this.stop()
}
})
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
shutdownHook
}

/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
// Remove the shutdown hook. It causes memory leaks if we leave it around.
Runtime.getRuntime.removeShutdownHook(shutdownHook)

// Only perform cleanup if an external service is not serving our shuffle files.
if (!blockManager.externalShuffleServiceEnabled || blockManager.blockManagerId.isDriver) {
localDirs.foreach { localDir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark._
* of them will be combined together, showed in one line.
*/
private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {

// Carrige return
val CR = '\r'
// Update period of progress bar, in milliseconds
Expand Down Expand Up @@ -121,4 +120,10 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
clear()
lastFinishTime = System.currentTimeMillis()
}

/**
* Tear down the timer thread. The timer thread is a GC root, and it retains the entire
* SparkContext if it's not terminated.
*/
def stop(): Unit = timer.cancel()
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,13 +245,15 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
val goodJson = constructSubmitRequest(masterUrl).toJson
val badJson1 = goodJson.replaceAll("action", "fraction") // invalid JSON
val badJson2 = goodJson.substring(goodJson.size / 2) // malformed JSON
val notJson = "\"hello, world\""
val (response1, code1) = sendHttpRequestWithResponse(submitRequestPath, "POST") // missing JSON
val (response2, code2) = sendHttpRequestWithResponse(submitRequestPath, "POST", badJson1)
val (response3, code3) = sendHttpRequestWithResponse(submitRequestPath, "POST", badJson2)
val (response4, code4) = sendHttpRequestWithResponse(killRequestPath, "POST") // missing ID
val (response5, code5) = sendHttpRequestWithResponse(s"$killRequestPath/", "POST")
val (response6, code6) = sendHttpRequestWithResponse(statusRequestPath, "GET") // missing ID
val (response7, code7) = sendHttpRequestWithResponse(s"$statusRequestPath/", "GET")
val (response8, code8) = sendHttpRequestWithResponse(submitRequestPath, "POST", notJson)
// these should all fail as error responses
getErrorResponse(response1)
getErrorResponse(response2)
Expand All @@ -260,13 +262,15 @@ class StandaloneRestSubmitSuite extends FunSuite with BeforeAndAfterEach {
getErrorResponse(response5)
getErrorResponse(response6)
getErrorResponse(response7)
getErrorResponse(response8)
assert(code1 === HttpServletResponse.SC_BAD_REQUEST)
assert(code2 === HttpServletResponse.SC_BAD_REQUEST)
assert(code3 === HttpServletResponse.SC_BAD_REQUEST)
assert(code4 === HttpServletResponse.SC_BAD_REQUEST)
assert(code5 === HttpServletResponse.SC_BAD_REQUEST)
assert(code6 === HttpServletResponse.SC_BAD_REQUEST)
assert(code7 === HttpServletResponse.SC_BAD_REQUEST)
assert(code8 === HttpServletResponse.SC_BAD_REQUEST)
}

test("bad request paths") {
Expand Down
6 changes: 3 additions & 3 deletions docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ To produce a Spark package compiled with Scala 2.11, use the `-Dscala-2.11` prop
dev/change-version-to-2.11.sh
mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package

Scala 2.11 support in Spark is experimental and does not support a few features.
Specifically, Spark's external Kafka library and JDBC component are not yet
supported in Scala 2.11 builds.
Scala 2.11 support in Spark does not support a few features due to dependencies
which are themselves not Scala 2.11 ready. Specifically, Spark's external
Kafka library and JDBC component are not yet supported in Scala 2.11 builds.

# Spark Tests in Maven

Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-flume-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ configuring Flume agents.

3. **Deploying:** Package `spark-streaming-flume_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).

## Approach 2 (Experimental): Pull-based Approach using a Custom Sink
## Approach 2: Pull-based Approach using a Custom Sink
Instead of Flume pushing data directly to Spark Streaming, this approach runs a custom Flume sink that allows the following.

- Flume pushes data into the sink, and the data stays buffered.
Expand Down
3 changes: 1 addition & 2 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -908,8 +908,7 @@ JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFu
The update function will be called for each word, with `newValues` having a sequence of 1's (from
the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete
Java code, take a look at the example
[JavaStatefulNetworkWordCount.java]({{site
.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming
[JavaStatefulNetworkWordCount.java]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming
/JavaStatefulNetworkWordCount.java).

</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.streaming.flume

import java.net.InetSocketAddress

import org.apache.spark.annotation.Experimental
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext}
Expand Down Expand Up @@ -121,7 +120,6 @@ object FlumeUtils {
* @param port Port of the host at which the Spark Sink is listening
* @param storageLevel Storage level to use for storing the received objects
*/
@Experimental
def createPollingStream(
ssc: StreamingContext,
hostname: String,
Expand All @@ -138,7 +136,6 @@ object FlumeUtils {
* @param addresses List of InetSocketAddresses representing the hosts to connect to.
* @param storageLevel Storage level to use for storing the received objects
*/
@Experimental
def createPollingStream(
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
Expand All @@ -159,7 +156,6 @@ object FlumeUtils {
* result in this stream using more threads
* @param storageLevel Storage level to use for storing the received objects
*/
@Experimental
def createPollingStream(
ssc: StreamingContext,
addresses: Seq[InetSocketAddress],
Expand All @@ -178,7 +174,6 @@ object FlumeUtils {
* @param hostname Hostname of the host on which the Spark Sink is running
* @param port Port of the host at which the Spark Sink is listening
*/
@Experimental
def createPollingStream(
jssc: JavaStreamingContext,
hostname: String,
Expand All @@ -195,7 +190,6 @@ object FlumeUtils {
* @param port Port of the host at which the Spark Sink is listening
* @param storageLevel Storage level to use for storing the received objects
*/
@Experimental
def createPollingStream(
jssc: JavaStreamingContext,
hostname: String,
Expand All @@ -212,7 +206,6 @@ object FlumeUtils {
* @param addresses List of InetSocketAddresses on which the Spark Sink is running.
* @param storageLevel Storage level to use for storing the received objects
*/
@Experimental
def createPollingStream(
jssc: JavaStreamingContext,
addresses: Array[InetSocketAddress],
Expand All @@ -233,7 +226,6 @@ object FlumeUtils {
* result in this stream using more threads
* @param storageLevel Storage level to use for storing the received objects
*/
@Experimental
def createPollingStream(
jssc: JavaStreamingContext,
addresses: Array[InetSocketAddress],
Expand Down
Loading

0 comments on commit 2e89719

Please sign in to comment.