Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into metrics-stru…
Browse files Browse the repository at this point in the history
…cture-improvement2
  • Loading branch information
sarutak committed Sep 19, 2014
2 parents 4776f9e + e76ef5c commit 4a93c7f
Show file tree
Hide file tree
Showing 34 changed files with 1,038 additions and 107 deletions.
13 changes: 7 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
*~
*.#*
*#*#
*.swp
*.ipr
*.iml
Expand All @@ -15,11 +17,11 @@ out/
third_party/libmesos.so
third_party/libmesos.dylib
conf/java-opts
conf/spark-env.sh
conf/streaming-env.sh
conf/log4j.properties
conf/spark-defaults.conf
conf/hive-site.xml
conf/*.sh
conf/*.cmd
conf/*.properties
conf/*.conf
conf/*.xml
docs/_site
docs/api
target/
Expand Down Expand Up @@ -50,7 +52,6 @@ unit-tests.log
/lib/
rat-results.txt
scalastyle.txt
conf/*.conf
scalastyle-output.xml

# For Hive
Expand Down
2 changes: 2 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ log4j.properties.template
metrics.properties.template
slaves
spark-env.sh
spark-env.cmd
spark-env.sh.template
log4j-defaults.properties
bootstrap-tooltip.js
Expand Down Expand Up @@ -58,3 +59,4 @@ dist/*
.*iws
logs
.*scalastyle-output.xml
.*dependency-reduced-pom.xml
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ submitting any copyrighted material via pull request, email, or other means
you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.

Please see [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
Please see the [Contributing to Spark wiki page](https://cwiki.apache.org/SPARK/Contributing+to+Spark)
for more information.
2 changes: 1 addition & 1 deletion bin/spark-sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
set -o posix

CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
CLASS_NOT_FOUND_EXIT_STATUS=1
CLASS_NOT_FOUND_EXIT_STATUS=101

# Figure out where Spark is installed
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object SparkSubmit {
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"

private val CLASS_NOT_FOUND_EXIT_STATUS = 1
private val CLASS_NOT_FOUND_EXIT_STATUS = 101

// Exposed for testing
private[spark] var exitFn: () => Unit = () => System.exit(-1)
Expand Down Expand Up @@ -172,7 +172,7 @@ object SparkSubmit {
// All cluster managers
OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.master"),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.app.name"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, sysProp = "spark.jars"),
OptionAssigner(args.jars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
sysProp = "spark.driver.memory"),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
Expand All @@ -183,6 +183,7 @@ object SparkSubmit {
sysProp = "spark.driver.extraLibraryPath"),

// Standalone cluster only
OptionAssigner(args.jars, STANDALONE, CLUSTER, sysProp = "spark.jars"),
OptionAssigner(args.driverMemory, STANDALONE, CLUSTER, clOption = "--memory"),
OptionAssigner(args.driverCores, STANDALONE, CLUSTER, clOption = "--cores"),

Expand Down Expand Up @@ -261,7 +262,7 @@ object SparkSubmit {
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
}

private val appHeader = Seq(
"App ID",
"App Name",
"Started",
"Completed",
Expand All @@ -81,7 +82,8 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
val duration = UIUtils.formatDuration(info.endTime - info.startTime)
val lastUpdated = UIUtils.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{info.name}</a></td>
<td><a href={uiAddress}>{info.id}</a></td>
<td>{info.name}</td>
<td>{startTime}</td>
<td>{endTime}</td>
<td>{duration}</td>
Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,23 +489,24 @@ private[spark] class Master(
// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val aliveWorkerNum = shuffledAliveWorkers.size
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0

for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
curPos = (curPos + 1) % aliveWorkerNum
val startPos = curPos
var launched = false
while (curPos != startPos && !launched) {
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % aliveWorkerNum
curPos = (curPos + 1) % numWorkersAlive
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ private[spark] class ReceivingConnection(
if (currId != null) currId else super.getRemoteConnectionManagerId()
}

// The reciever's remote address is the local socket on remote side : which is NOT
// The receiver's remote address is the local socket on remote side : which is NOT
// the connection manager id of the receiver.
// We infer that from the messages we receive on the receiver socket.
private def processConnectionManagerId(header: MessageChunkHeader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ private[nio] class ConnectionManager(

def changeConnectionKeyInterest(connection: Connection, ops: Int) {
keyInterestChangeRequests += ((connection.key, ops))
// so that registerations happen !
// so that registrations happen !
wakeupSelector()
}

Expand Down Expand Up @@ -832,7 +832,7 @@ private[nio] class ConnectionManager(
}

/**
* Send a message and block until an acknowldgment is received or an error occurs.
* Send a message and block until an acknowledgment is received or an error occurs.
* @param connectionManagerId the message's destination
* @param message the message being sent
* @return a Future that either returns the acknowledgment message or captures an exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private[spark] class SparkDeploySchedulerBackend(
var client: AppClient = null
var stopping = false
var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
var appId: String = _
@volatile var appId: String = _

val registrationLock = new Object()
var registrationDone = false
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1382,15 +1382,15 @@ private[spark] object Utils extends Logging {
}

/**
* Default number of retries in binding to a port.
* Default maximum number of retries when binding to a port before giving up.
*/
val portMaxRetries: Int = {
if (sys.props.contains("spark.testing")) {
// Set a higher number of retries for tests...
sys.props.get("spark.ports.maxRetries").map(_.toInt).getOrElse(100)
sys.props.get("spark.port.maxRetries").map(_.toInt).getOrElse(100)
} else {
Option(SparkEnv.get)
.flatMap(_.conf.getOption("spark.ports.maxRetries"))
.flatMap(_.conf.getOption("spark.port.maxRetries"))
.map(_.toInt)
.getOrElse(16)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,13 @@ class JsonProtocolSuite extends FunSuite {
workerInfo.lastHeartbeat = JsonConstants.currTimeInMillis
workerInfo
}

def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, ExecutorState.RUNNING)
}

def createDriverRunner(): DriverRunner = {
new DriverRunner(new SparkConf(), "driverId", new File("workDir"), new File("sparkHome"),
createDriverDesc(), null, "akka://worker")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
sysProps("spark.app.name") should be ("beauty")
sysProps("spark.shuffle.spill") should be ("false")
sysProps("SPARK_SUBMIT") should be ("true")
sysProps.keys should not contain ("spark.jars")
}

test("handles YARN client mode") {
Expand Down
Loading

0 comments on commit 4a93c7f

Please sign in to comment.