Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
Browse files Browse the repository at this point in the history
…cture-improvement
  • Loading branch information
sarutak committed Sep 15, 2014
2 parents 08e627e + f493f79 commit ead8966
Show file tree
Hide file tree
Showing 47 changed files with 953 additions and 469 deletions.
14 changes: 14 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<!-- Use the shade plugin to create a big JAR with all the dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ class SparkEnv (
pythonWorkers.get(key).foreach(_.stopWorker(worker))
}
}

private[spark]
def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
synchronized {
val key = (pythonExec, envVars)
pythonWorkers.get(key).foreach(_.releaseWorker(worker))
}
}
}

object SparkEnv extends Logging {
Expand Down
62 changes: 50 additions & 12 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.charset.Charset
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.{Try, Success, Failure}
Expand Down Expand Up @@ -52,6 +53,7 @@ private[spark] class PythonRDD(
extends RDD[Array[Byte]](parent) {

val bufferSize = conf.getInt("spark.buffer.size", 65536)
val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)

override def getPartitions = parent.partitions

Expand All @@ -63,19 +65,26 @@ private[spark] class PythonRDD(
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread
if (reuse_worker) {
envVars += ("SPARK_REUSE_WORKER" -> "1")
}
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)

// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)

var complete_cleanly = false
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()

// Cleanup the worker socket. This will also cause the Python worker to exit.
try {
worker.close()
} catch {
case e: Exception => logWarning("Failed to close worker socket", e)
if (reuse_worker && complete_cleanly) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
} else {
try {
worker.close()
} catch {
case e: Exception =>
logWarning("Failed to close worker socket", e)
}
}
}

Expand Down Expand Up @@ -115,6 +124,10 @@ private[spark] class PythonRDD(
val total = finishTime - startTime
logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot,
init, finish))
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += diskBytesSpilled
read()
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
// Signals that an exception has been thrown in python
Expand All @@ -133,6 +146,7 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
complete_cleanly = true
null
}
} catch {
Expand Down Expand Up @@ -195,29 +209,45 @@ private[spark] class PythonRDD(
PythonRDD.writeUTF(include, dataOut)
}
// Broadcast variables
dataOut.writeInt(broadcastVars.length)
val oldBids = PythonRDD.getWorkerBroadcasts(worker)
val newBids = broadcastVars.map(_.id).toSet
// number of different broadcasts
val cnt = oldBids.diff(newBids).size + newBids.diff(oldBids).size
dataOut.writeInt(cnt)
for (bid <- oldBids) {
if (!newBids.contains(bid)) {
// remove the broadcast from worker
dataOut.writeLong(- bid - 1) // bid >= 0
oldBids.remove(bid)
}
}
for (broadcast <- broadcastVars) {
dataOut.writeLong(broadcast.id)
dataOut.writeInt(broadcast.value.length)
dataOut.write(broadcast.value)
if (!oldBids.contains(broadcast.id)) {
// send new broadcast
dataOut.writeLong(broadcast.id)
dataOut.writeInt(broadcast.value.length)
dataOut.write(broadcast.value)
oldBids.add(broadcast.id)
}
}
dataOut.flush()
// Serialized command:
dataOut.writeInt(command.length)
dataOut.write(command)
// Data values
PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
dataOut.flush()
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
worker.shutdownOutput()

case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
} finally {
Try(worker.shutdownOutput()) // kill Python worker process
worker.shutdownOutput()
}
}
}
Expand Down Expand Up @@ -278,6 +308,14 @@ private object SpecialLengths {
private[spark] object PythonRDD extends Logging {
val UTF8 = Charset.forName("UTF-8")

// remember the broadcasts sent to each worker
private val workerBroadcasts = new mutable.WeakHashMap[Socket, mutable.Set[Long]]()
private def getWorkerBroadcasts(worker: Socket) = {
synchronized {
workerBroadcasts.getOrElseUpdate(worker, new mutable.HashSet[Long]())
}
}

/**
* Adapter for calling SparkContext#runJob from Python.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
var daemon: Process = null
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
var daemonPort: Int = 0
var daemonWorkers = new mutable.WeakHashMap[Socket, Int]()
val daemonWorkers = new mutable.WeakHashMap[Socket, Int]()
val idleWorkers = new mutable.Queue[Socket]()
var lastActivity = 0L
new MonitorThread().start()

var simpleWorkers = new mutable.WeakHashMap[Socket, Process]()

Expand All @@ -51,6 +54,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

def create(): Socket = {
if (useDaemon) {
synchronized {
if (idleWorkers.size > 0) {
return idleWorkers.dequeue()
}
}
createThroughDaemon()
} else {
createSimpleWorker()
Expand Down Expand Up @@ -199,9 +207,44 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
}

/**
* Monitor all the idle workers, kill them after timeout.
*/
private class MonitorThread extends Thread(s"Idle Worker Monitor for $pythonExec") {

setDaemon(true)

override def run() {
while (true) {
synchronized {
if (lastActivity + IDLE_WORKER_TIMEOUT_MS < System.currentTimeMillis()) {
cleanupIdleWorkers()
lastActivity = System.currentTimeMillis()
}
}
Thread.sleep(10000)
}
}
}

private def cleanupIdleWorkers() {
while (idleWorkers.length > 0) {
val worker = idleWorkers.dequeue()
try {
// the worker will exit after closing the socket
worker.close()
} catch {
case e: Exception =>
logWarning("Failed to close worker socket", e)
}
}
}

private def stopDaemon() {
synchronized {
if (useDaemon) {
cleanupIdleWorkers()

// Request shutdown of existing daemon by sending SIGTERM
if (daemon != null) {
daemon.destroy()
Expand All @@ -220,23 +263,43 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}

def stopWorker(worker: Socket) {
if (useDaemon) {
if (daemon != null) {
daemonWorkers.get(worker).foreach { pid =>
// tell daemon to kill worker by pid
val output = new DataOutputStream(daemon.getOutputStream)
output.writeInt(pid)
output.flush()
daemon.getOutputStream.flush()
synchronized {
if (useDaemon) {
if (daemon != null) {
daemonWorkers.get(worker).foreach { pid =>
// tell daemon to kill worker by pid
val output = new DataOutputStream(daemon.getOutputStream)
output.writeInt(pid)
output.flush()
daemon.getOutputStream.flush()
}
}
} else {
simpleWorkers.get(worker).foreach(_.destroy())
}
} else {
simpleWorkers.get(worker).foreach(_.destroy())
}
worker.close()
}

def releaseWorker(worker: Socket) {
if (useDaemon) {
synchronized {
lastActivity = System.currentTimeMillis()
idleWorkers.enqueue(worker)
}
} else {
// Cleanup the worker socket. This will also cause the Python worker to exit.
try {
worker.close()
} catch {
case e: Exception =>
logWarning("Failed to close worker socket", e)
}
}
}
}

private object PythonWorkerFactory {
val PROCESS_WAIT_TIMEOUT_MS = 10000
val IDLE_WORKER_TIMEOUT_MS = 60000 // kill idle workers after 1 minute
}
10 changes: 10 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,16 @@ Apart from these, the following properties are also available, and may be useful
used during aggregation goes above this amount, it will spill the data into disks.
</td>
</tr>
<tr>
<td><code>spark.python.worker.reuse</code></td>
<td>true</td>
<td>
Reuse Python worker or not. If yes, it will use a fixed number of Python workers,
does not need to fork() a Python process for every tasks. It will be very useful
if there is large broadcast, then the broadcast will not be needed to transfered
from JVM to Python worker for every task.
</td>
</tr>
<tr>
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
Expand Down
3 changes: 1 addition & 2 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,6 @@ options.
## Migration Guide for Shark User

### Scheduling
s
To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a JDBC client session,
users can set the `spark.sql.thriftserver.scheduler.pool` variable:

Expand Down Expand Up @@ -1110,7 +1109,7 @@ evaluated by the SQL execution engine. A full list of the functions supported c
The range of numbers is from `-9223372036854775808` to `9223372036854775807`.
- `FloatType`: Represents 4-byte single-precision floating point numbers.
- `DoubleType`: Represents 8-byte double-precision floating point numbers.
- `DecimalType`:
- `DecimalType`: Represents arbitrary-precision signed decimal numbers. Backed internally by `java.math.BigDecimal`. A `BigDecimal` consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.
* String type
- `StringType`: Represents character string values.
* Binary type
Expand Down
14 changes: 14 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,20 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down
14 changes: 14 additions & 0 deletions extras/java8-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,20 @@
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
Loading

0 comments on commit ead8966

Please sign in to comment.