@@ -191,7 +187,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
}
- private def appRow(app: ApplicationInfo, active: Boolean): Seq[Node] = {
+ private def appRow(app: ApplicationInfo): Seq[Node] = {
val killLink = if (parent.killEnabled &&
(app.state == ApplicationState.RUNNING || app.state == ApplicationState.WAITING)) {
val killLinkUri = s"app/kill?id=${app.id}&terminate=true"
@@ -201,7 +197,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
(
kill)
}
-
{app.id}
@@ -210,15 +205,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
|
{app.desc.name}
|
- {
- if (active) {
-
- {app.coresGranted}
- |
- }
- }
- {if (app.requestedCores == Int.MaxValue) "*" else app.requestedCores}
+ {app.coresGranted}
|
{Utils.megabytesToString(app.desc.memoryPerSlave)}
@@ -230,14 +218,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
|
}
- private def activeAppRow(app: ApplicationInfo): Seq[Node] = {
- appRow(app, active = true)
- }
-
- private def completeAppRow(app: ApplicationInfo): Seq[Node] = {
- appRow(app, active = false)
- }
-
private def driverRow(driver: DriverInfo): Seq[Node] = {
val killLink = if (parent.killEnabled &&
(driver.state == DriverState.RUNNING ||
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index 27a9eabb1ede7..e0948e16ef354 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -56,8 +56,14 @@ private[deploy] class DriverRunner(
private var finalExitCode: Option[Int] = None
// Decoupled for testing
- def setClock(_clock: Clock) = clock = _clock
- def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper
+ def setClock(_clock: Clock): Unit = {
+ clock = _clock
+ }
+
+ def setSleeper(_sleeper: Sleeper): Unit = {
+ sleeper = _sleeper
+ }
+
private var clock: Clock = new SystemClock()
private var sleeper = new Sleeper {
def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed})
@@ -155,7 +161,7 @@ private[deploy] class DriverRunner(
private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean) {
builder.directory(baseDir)
- def initialize(process: Process) = {
+ def initialize(process: Process): Unit = {
// Redirect stdout and stderr to files
val stdout = new File(baseDir, "stdout")
CommandUtils.redirectStream(process.getInputStream, stdout)
@@ -169,8 +175,8 @@ private[deploy] class DriverRunner(
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
}
- def runCommandWithRetry(command: ProcessBuilderLike, initialize: Process => Unit,
- supervise: Boolean) {
+ def runCommandWithRetry(
+ command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Unit = {
// Time to wait between submission retries.
var waitSeconds = 1
// A run of this many seconds resets the exponential back-off.
@@ -216,8 +222,8 @@ private[deploy] trait ProcessBuilderLike {
}
private[deploy] object ProcessBuilderLike {
- def apply(processBuilder: ProcessBuilder) = new ProcessBuilderLike {
- def start() = processBuilder.start()
- def command = processBuilder.command()
+ def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike {
+ override def start(): Process = processBuilder.start()
+ override def command: Seq[String] = processBuilder.command()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c1b0a295f9f74..c4c24a7866aa3 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -275,7 +275,7 @@ private[worker] class Worker(
}
}
- override def receiveWithLogging = {
+ override def receiveWithLogging: PartialFunction[Any, Unit] = {
case RegisteredWorker(masterUrl, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterUrl)
registered = true
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
index 09d866fb0cd90..e0790274d7d3e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala
@@ -50,7 +50,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
private def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1)
- override def receiveWithLogging = {
+ override def receiveWithLogging: PartialFunction[Any, Unit] = {
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
logInfo(s"Successfully connected to $workerUrl")
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index dd19e4947db1e..b5205d4e997ae 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -62,7 +62,7 @@ private[spark] class CoarseGrainedExecutorBackend(
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
}
- override def receiveWithLogging = {
+ override def receiveWithLogging: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
val (hostname, _) = Utils.parseHostPort(hostPort)
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
index 41925f7e97e84..3e47d13f7545d 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorActor.scala
@@ -33,7 +33,7 @@ private[spark] case object TriggerThreadDump
private[spark]
class ExecutorActor(executorId: String) extends Actor with ActorLogReceive with Logging {
- override def receiveWithLogging = {
+ override def receiveWithLogging: PartialFunction[Any, Unit] = {
case TriggerThreadDump =>
sender ! Utils.getThreadDump()
}
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 07b152651dedf..06152f16ae618 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -17,13 +17,10 @@
package org.apache.spark.executor
-import java.util.concurrent.atomic.AtomicLong
-
-import org.apache.spark.executor.DataReadMethod.DataReadMethod
-
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.executor.DataReadMethod.DataReadMethod
import org.apache.spark.storage.{BlockId, BlockStatus}
/**
@@ -44,14 +41,14 @@ class TaskMetrics extends Serializable {
* Host's name the task runs on
*/
private var _hostname: String = _
- def hostname = _hostname
+ def hostname: String = _hostname
private[spark] def setHostname(value: String) = _hostname = value
/**
* Time taken on the executor to deserialize this task
*/
private var _executorDeserializeTime: Long = _
- def executorDeserializeTime = _executorDeserializeTime
+ def executorDeserializeTime: Long = _executorDeserializeTime
private[spark] def setExecutorDeserializeTime(value: Long) = _executorDeserializeTime = value
@@ -59,14 +56,14 @@ class TaskMetrics extends Serializable {
* Time the executor spends actually running the task (including fetching shuffle data)
*/
private var _executorRunTime: Long = _
- def executorRunTime = _executorRunTime
+ def executorRunTime: Long = _executorRunTime
private[spark] def setExecutorRunTime(value: Long) = _executorRunTime = value
/**
* The number of bytes this task transmitted back to the driver as the TaskResult
*/
private var _resultSize: Long = _
- def resultSize = _resultSize
+ def resultSize: Long = _resultSize
private[spark] def setResultSize(value: Long) = _resultSize = value
@@ -74,31 +71,31 @@ class TaskMetrics extends Serializable {
* Amount of time the JVM spent in garbage collection while executing this task
*/
private var _jvmGCTime: Long = _
- def jvmGCTime = _jvmGCTime
+ def jvmGCTime: Long = _jvmGCTime
private[spark] def setJvmGCTime(value: Long) = _jvmGCTime = value
/**
* Amount of time spent serializing the task result
*/
private var _resultSerializationTime: Long = _
- def resultSerializationTime = _resultSerializationTime
+ def resultSerializationTime: Long = _resultSerializationTime
private[spark] def setResultSerializationTime(value: Long) = _resultSerializationTime = value
/**
* The number of in-memory bytes spilled by this task
*/
private var _memoryBytesSpilled: Long = _
- def memoryBytesSpilled = _memoryBytesSpilled
- private[spark] def incMemoryBytesSpilled(value: Long) = _memoryBytesSpilled += value
- private[spark] def decMemoryBytesSpilled(value: Long) = _memoryBytesSpilled -= value
+ def memoryBytesSpilled: Long = _memoryBytesSpilled
+ private[spark] def incMemoryBytesSpilled(value: Long): Unit = _memoryBytesSpilled += value
+ private[spark] def decMemoryBytesSpilled(value: Long): Unit = _memoryBytesSpilled -= value
/**
* The number of on-disk bytes spilled by this task
*/
private var _diskBytesSpilled: Long = _
- def diskBytesSpilled = _diskBytesSpilled
- def incDiskBytesSpilled(value: Long) = _diskBytesSpilled += value
- def decDiskBytesSpilled(value: Long) = _diskBytesSpilled -= value
+ def diskBytesSpilled: Long = _diskBytesSpilled
+ def incDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled += value
+ def decDiskBytesSpilled(value: Long): Unit = _diskBytesSpilled -= value
/**
* If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
@@ -106,7 +103,7 @@ class TaskMetrics extends Serializable {
*/
private var _inputMetrics: Option[InputMetrics] = None
- def inputMetrics = _inputMetrics
+ def inputMetrics: Option[InputMetrics] = _inputMetrics
/**
* This should only be used when recreating TaskMetrics, not when updating input metrics in
@@ -128,7 +125,7 @@ class TaskMetrics extends Serializable {
*/
private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None
- def shuffleReadMetrics = _shuffleReadMetrics
+ def shuffleReadMetrics: Option[ShuffleReadMetrics] = _shuffleReadMetrics
/**
* This should only be used when recreating TaskMetrics, not when updating read metrics in
@@ -177,17 +174,18 @@ class TaskMetrics extends Serializable {
* Once https://issues.apache.org/jira/browse/SPARK-5225 is addressed,
* we can store all the different inputMetrics (one per readMethod).
*/
- private[spark] def getInputMetricsForReadMethod(
- readMethod: DataReadMethod): InputMetrics = synchronized {
- _inputMetrics match {
- case None =>
- val metrics = new InputMetrics(readMethod)
- _inputMetrics = Some(metrics)
- metrics
- case Some(metrics @ InputMetrics(method)) if method == readMethod =>
- metrics
- case Some(InputMetrics(method)) =>
- new InputMetrics(readMethod)
+ private[spark] def getInputMetricsForReadMethod(readMethod: DataReadMethod): InputMetrics = {
+ synchronized {
+ _inputMetrics match {
+ case None =>
+ val metrics = new InputMetrics(readMethod)
+ _inputMetrics = Some(metrics)
+ metrics
+ case Some(metrics @ InputMetrics(method)) if method == readMethod =>
+ metrics
+ case Some(InputMetrics(method)) =>
+ new InputMetrics(readMethod)
+ }
}
}
@@ -256,14 +254,14 @@ case class InputMetrics(readMethod: DataReadMethod.Value) {
*/
private var _bytesRead: Long = _
def bytesRead: Long = _bytesRead
- def incBytesRead(bytes: Long) = _bytesRead += bytes
+ def incBytesRead(bytes: Long): Unit = _bytesRead += bytes
/**
* Total records read.
*/
private var _recordsRead: Long = _
def recordsRead: Long = _recordsRead
- def incRecordsRead(records: Long) = _recordsRead += records
+ def incRecordsRead(records: Long): Unit = _recordsRead += records
/**
* Invoke the bytesReadCallback and mutate bytesRead.
@@ -293,15 +291,15 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) {
* Total bytes written
*/
private var _bytesWritten: Long = _
- def bytesWritten = _bytesWritten
- private[spark] def setBytesWritten(value : Long) = _bytesWritten = value
+ def bytesWritten: Long = _bytesWritten
+ private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value
/**
* Total records written
*/
private var _recordsWritten: Long = 0L
- def recordsWritten = _recordsWritten
- private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value
+ def recordsWritten: Long = _recordsWritten
+ private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value
}
/**
@@ -314,7 +312,7 @@ class ShuffleReadMetrics extends Serializable {
* Number of remote blocks fetched in this shuffle by this task
*/
private var _remoteBlocksFetched: Int = _
- def remoteBlocksFetched = _remoteBlocksFetched
+ def remoteBlocksFetched: Int = _remoteBlocksFetched
private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value
private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value
@@ -322,7 +320,7 @@ class ShuffleReadMetrics extends Serializable {
* Number of local blocks fetched in this shuffle by this task
*/
private var _localBlocksFetched: Int = _
- def localBlocksFetched = _localBlocksFetched
+ def localBlocksFetched: Int = _localBlocksFetched
private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value
private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value
@@ -332,7 +330,7 @@ class ShuffleReadMetrics extends Serializable {
* still not finished processing block A, it is not considered to be blocking on block B.
*/
private var _fetchWaitTime: Long = _
- def fetchWaitTime = _fetchWaitTime
+ def fetchWaitTime: Long = _fetchWaitTime
private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value
private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value
@@ -340,7 +338,7 @@ class ShuffleReadMetrics extends Serializable {
* Total number of remote bytes read from the shuffle by this task
*/
private var _remoteBytesRead: Long = _
- def remoteBytesRead = _remoteBytesRead
+ def remoteBytesRead: Long = _remoteBytesRead
private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value
private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value
@@ -348,24 +346,24 @@ class ShuffleReadMetrics extends Serializable {
* Shuffle data that was read from the local disk (as opposed to from a remote executor).
*/
private var _localBytesRead: Long = _
- def localBytesRead = _localBytesRead
+ def localBytesRead: Long = _localBytesRead
private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value
/**
* Total bytes fetched in the shuffle by this task (both remote and local).
*/
- def totalBytesRead = _remoteBytesRead + _localBytesRead
+ def totalBytesRead: Long = _remoteBytesRead + _localBytesRead
/**
* Number of blocks fetched in this shuffle by this task (remote or local)
*/
- def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched
+ def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched
/**
* Total number of records read from the shuffle by this task
*/
private var _recordsRead: Long = _
- def recordsRead = _recordsRead
+ def recordsRead: Long = _recordsRead
private[spark] def incRecordsRead(value: Long) = _recordsRead += value
private[spark] def decRecordsRead(value: Long) = _recordsRead -= value
}
@@ -380,7 +378,7 @@ class ShuffleWriteMetrics extends Serializable {
* Number of bytes written for the shuffle by this task
*/
@volatile private var _shuffleBytesWritten: Long = _
- def shuffleBytesWritten = _shuffleBytesWritten
+ def shuffleBytesWritten: Long = _shuffleBytesWritten
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
@@ -388,7 +386,7 @@ class ShuffleWriteMetrics extends Serializable {
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
@volatile private var _shuffleWriteTime: Long = _
- def shuffleWriteTime= _shuffleWriteTime
+ def shuffleWriteTime: Long = _shuffleWriteTime
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
@@ -396,7 +394,7 @@ class ShuffleWriteMetrics extends Serializable {
* Total number of records written to the shuffle by this task
*/
@volatile private var _shuffleRecordsWritten: Long = _
- def shuffleRecordsWritten = _shuffleRecordsWritten
+ def shuffleRecordsWritten: Long = _shuffleRecordsWritten
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 593a62b3e3b32..6cda7772f77bc 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -73,16 +73,16 @@ private[spark] abstract class StreamBasedRecordReader[T](
private var key = ""
private var value: T = null.asInstanceOf[T]
- override def initialize(split: InputSplit, context: TaskAttemptContext) = {}
- override def close() = {}
+ override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {}
+ override def close(): Unit = {}
- override def getProgress = if (processed) 1.0f else 0.0f
+ override def getProgress: Float = if (processed) 1.0f else 0.0f
- override def getCurrentKey = key
+ override def getCurrentKey: String = key
- override def getCurrentValue = value
+ override def getCurrentValue: T = value
- override def nextKeyValue = {
+ override def nextKeyValue: Boolean = {
if (!processed) {
val fileIn = new PortableDataStream(split, context, index)
value = parseStream(fileIn)
@@ -119,7 +119,8 @@ private[spark] class StreamRecordReader(
* The format for the PortableDataStream files
*/
private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] {
- override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext) = {
+ override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)
+ : CombineFileRecordReader[String, PortableDataStream] = {
new CombineFileRecordReader[String, PortableDataStream](
split.asInstanceOf[CombineFileSplit], taContext, classOf[StreamRecordReader])
}
@@ -204,7 +205,7 @@ class PortableDataStream(
/**
* Close the file (if it is currently open)
*/
- def close() = {
+ def close(): Unit = {
if (isOpen) {
try {
fileIn.close()
diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
index 21b782edd2a9e..87c2aa481095d 100644
--- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala
@@ -52,7 +52,7 @@ trait SparkHadoopMapRedUtil {
jobId: Int,
isMap: Boolean,
taskId: Int,
- attemptId: Int) = {
+ attemptId: Int): TaskAttemptID = {
new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
}
diff --git a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
index 3340673f91156..cfd20392d12f1 100644
--- a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
+++ b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala
@@ -45,7 +45,7 @@ trait SparkHadoopMapReduceUtil {
jobId: Int,
isMap: Boolean,
taskId: Int,
- attemptId: Int) = {
+ attemptId: Int): TaskAttemptID = {
val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID")
try {
// First, attempt to use the old-style constructor that takes a boolean isMap
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 345db36630fd5..9150ad35712a1 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
+import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
@@ -84,7 +85,7 @@ private[spark] class MetricsSystem private (
/**
* Get any UI handlers used by this metrics system; can only be called after start().
*/
- def getServletHandlers = {
+ def getServletHandlers: Array[ServletContextHandler] = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
metricsServlet.map(_.getHandlers).getOrElse(Array())
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
index 2f65bc8b46609..0c2e212a33074 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala
@@ -30,8 +30,12 @@ import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.spark.SecurityManager
import org.apache.spark.ui.JettyUtils._
-private[spark] class MetricsServlet(val property: Properties, val registry: MetricRegistry,
- securityMgr: SecurityManager) extends Sink {
+private[spark] class MetricsServlet(
+ val property: Properties,
+ val registry: MetricRegistry,
+ securityMgr: SecurityManager)
+ extends Sink {
+
val SERVLET_KEY_PATH = "path"
val SERVLET_KEY_SAMPLE = "sample"
@@ -45,10 +49,12 @@ private[spark] class MetricsServlet(val property: Properties, val registry: Metr
val mapper = new ObjectMapper().registerModule(
new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
- def getHandlers = Array[ServletContextHandler](
- createServletHandler(servletPath,
- new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr)
- )
+ def getHandlers: Array[ServletContextHandler] = {
+ Array[ServletContextHandler](
+ createServletHandler(servletPath,
+ new ServletParams(request => getMetricsSnapshot(request), "text/json"), securityMgr)
+ )
+ }
def getMetricsSnapshot(request: HttpServletRequest): String = {
mapper.writeValueAsString(registry)
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
index 0d83d8c425ca4..9fad4e7deacb6 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/Sink.scala
@@ -18,7 +18,7 @@
package org.apache.spark.metrics.sink
private[spark] trait Sink {
- def start: Unit
- def stop: Unit
+ def start(): Unit
+ def stop(): Unit
def report(): Unit
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala
index a1a2c00ed1542..1ba25aa74aa02 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala
@@ -32,11 +32,11 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage])
def this() = this(null.asInstanceOf[Seq[BlockMessage]])
- def apply(i: Int) = blockMessages(i)
+ def apply(i: Int): BlockMessage = blockMessages(i)
- def iterator = blockMessages.iterator
+ def iterator: Iterator[BlockMessage] = blockMessages.iterator
- def length = blockMessages.length
+ def length: Int = blockMessages.length
def set(bufferMessage: BufferMessage) {
val startTime = System.currentTimeMillis
diff --git a/core/src/main/scala/org/apache/spark/network/nio/BufferMessage.scala b/core/src/main/scala/org/apache/spark/network/nio/BufferMessage.scala
index 3b245c5c7a4f3..9a9e22b0c2366 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/BufferMessage.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/BufferMessage.scala
@@ -31,9 +31,9 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
val initialSize = currentSize()
var gotChunkForSendingOnce = false
- def size = initialSize
+ def size: Int = initialSize
- def currentSize() = {
+ def currentSize(): Int = {
if (buffers == null || buffers.isEmpty) {
0
} else {
@@ -100,11 +100,11 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
buffers.foreach(_.flip)
}
- def hasAckId() = (ackId != 0)
+ def hasAckId(): Boolean = ackId != 0
- def isCompletelyReceived() = !buffers(0).hasRemaining
+ def isCompletelyReceived: Boolean = !buffers(0).hasRemaining
- override def toString = {
+ override def toString: String = {
if (hasAckId) {
"BufferAckMessage(aid = " + ackId + ", id = " + id + ", size = " + size + ")"
} else {
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
index c2d9578be7ebb..04eb2bf9ba4ab 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala
@@ -101,9 +101,11 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
socketRemoteConnectionManagerId
}
- def key() = channel.keyFor(selector)
+ def key(): SelectionKey = channel.keyFor(selector)
- def getRemoteAddress() = channel.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]
+ def getRemoteAddress(): InetSocketAddress = {
+ channel.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]
+ }
// Returns whether we have to register for further reads or not.
def read(): Boolean = {
@@ -280,7 +282,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
/* channel.socket.setSendBufferSize(256 * 1024) */
- override def getRemoteAddress() = address
+ override def getRemoteAddress(): InetSocketAddress = address
val DEFAULT_INTEREST = SelectionKey.OP_READ
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionId.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionId.scala
index 764dc5e5503ed..b3b281ff465f1 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionId.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionId.scala
@@ -18,7 +18,9 @@
package org.apache.spark.network.nio
private[nio] case class ConnectionId(connectionManagerId: ConnectionManagerId, uniqId: Int) {
- override def toString = connectionManagerId.host + "_" + connectionManagerId.port + "_" + uniqId
+ override def toString: String = {
+ connectionManagerId.host + "_" + connectionManagerId.port + "_" + uniqId
+ }
}
private[nio] object ConnectionId {
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index ee22c6656e69e..741fe3e1ea750 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -188,7 +188,7 @@ private[nio] class ConnectionManager(
private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
private val selectorThread = new Thread("connection-manager-thread") {
- override def run() = ConnectionManager.this.run()
+ override def run(): Unit = ConnectionManager.this.run()
}
selectorThread.setDaemon(true)
// start this thread last, since it invokes run(), which accesses members above
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManagerId.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManagerId.scala
index cbb37ec5ced1f..1cd13d887c6f6 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManagerId.scala
@@ -26,7 +26,7 @@ private[nio] case class ConnectionManagerId(host: String, port: Int) {
Utils.checkHost(host)
assert (port > 0)
- def toSocketAddress() = new InetSocketAddress(host, port)
+ def toSocketAddress(): InetSocketAddress = new InetSocketAddress(host, port)
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/Message.scala b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
index fb4a979b824c3..85d2fe2bf9c20 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/Message.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/Message.scala
@@ -42,7 +42,9 @@ private[nio] abstract class Message(val typ: Long, val id: Int) {
def timeTaken(): String = (finishTime - startTime).toString + " ms"
- override def toString = this.getClass.getSimpleName + "(id = " + id + ", size = " + size + ")"
+ override def toString: String = {
+ this.getClass.getSimpleName + "(id = " + id + ", size = " + size + ")"
+ }
}
@@ -51,7 +53,7 @@ private[nio] object Message {
var lastId = 1
- def getNewId() = synchronized {
+ def getNewId(): Int = synchronized {
lastId += 1
if (lastId == 0) {
lastId += 1
diff --git a/core/src/main/scala/org/apache/spark/network/nio/MessageChunk.scala b/core/src/main/scala/org/apache/spark/network/nio/MessageChunk.scala
index 278c5ac356ef2..a4568e849fa13 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/MessageChunk.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/MessageChunk.scala
@@ -24,9 +24,9 @@ import scala.collection.mutable.ArrayBuffer
private[nio]
class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) {
- val size = if (buffer == null) 0 else buffer.remaining
+ val size: Int = if (buffer == null) 0 else buffer.remaining
- lazy val buffers = {
+ lazy val buffers: ArrayBuffer[ByteBuffer] = {
val ab = new ArrayBuffer[ByteBuffer]()
ab += header.buffer
if (buffer != null) {
@@ -35,7 +35,7 @@ class MessageChunk(val header: MessageChunkHeader, val buffer: ByteBuffer) {
ab
}
- override def toString = {
+ override def toString: String = {
"" + this.getClass.getSimpleName + " (id = " + header.id + ", size = " + size + ")"
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/MessageChunkHeader.scala b/core/src/main/scala/org/apache/spark/network/nio/MessageChunkHeader.scala
index 6e20f291c5cec..7b3da4bb9d5ee 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/MessageChunkHeader.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/MessageChunkHeader.scala
@@ -50,8 +50,10 @@ private[nio] class MessageChunkHeader(
flip.asInstanceOf[ByteBuffer]
}
- override def toString = "" + this.getClass.getSimpleName + ":" + id + " of type " + typ +
+ override def toString: String = {
+ "" + this.getClass.getSimpleName + ":" + id + " of type " + typ +
" and sizes " + totalSize + " / " + chunkSize + " bytes, securityNeg: " + securityNeg
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
index cadd0c7ed19ba..53c4b32c95ab3 100644
--- a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
+++ b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
@@ -99,7 +99,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
case None => "(partial: " + initialValue + ")"
}
}
- def getFinalValueInternal() = PartialResult.this.getFinalValueInternal().map(f)
+ def getFinalValueInternal(): Option[T] = PartialResult.this.getFinalValueInternal().map(f)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index 1cbd684224b7c..9059eb13bb5d8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -70,7 +70,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
(rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
}
- override def compute(split: Partition, context: TaskContext) = {
+ override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
val currSplit = split.asInstanceOf[CartesianPartition]
for (x <- rdd1.iterator(currSplit.s1, context);
y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index b073eba8a1574..5117ccfabfcc2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -186,7 +186,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
override val isEmpty = !it.hasNext
// initializes/resets to start iterating from the beginning
- def resetIterator() = {
+ def resetIterator(): Iterator[(String, Partition)] = {
val iterators = (0 to 2).map( x =>
prev.partitions.iterator.flatMap(p => {
if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
@@ -196,10 +196,10 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
}
// hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
- def hasNext(): Boolean = { !isEmpty }
+ override def hasNext: Boolean = { !isEmpty }
// return the next preferredLocation of some partition of the RDD
- def next(): (String, Partition) = {
+ override def next(): (String, Partition) = {
if (it.hasNext) {
it.next()
} else {
@@ -237,7 +237,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
val rotIt = new LocationIterator(prev)
// deal with empty case, just create targetLen partition groups with no preferred location
- if (!rotIt.hasNext()) {
+ if (!rotIt.hasNext) {
(1 to targetLen).foreach(x => groupArr += PartitionGroup())
return
}
@@ -343,7 +343,7 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack:
private case class PartitionGroup(prefLoc: Option[String] = None) {
var arr = mutable.ArrayBuffer[Partition]()
- def size = arr.size
+ def size: Int = arr.size
}
private object PartitionGroup {
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 03afc289736bb..71e6e300fec5f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -191,25 +191,23 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
}
}
// Determine the bucket function in constant time. Requires that buckets are evenly spaced
- def fastBucketFunction(min: Double, increment: Double, count: Int)(e: Double): Option[Int] = {
+ def fastBucketFunction(min: Double, max: Double, count: Int)(e: Double): Option[Int] = {
// If our input is not a number unless the increment is also NaN then we fail fast
- if (e.isNaN()) {
- return None
- }
- val bucketNumber = (e - min)/(increment)
- // We do this rather than buckets.lengthCompare(bucketNumber)
- // because Array[Double] fails to override it (for now).
- if (bucketNumber > count || bucketNumber < 0) {
+ if (e.isNaN || e < min || e > max) {
None
} else {
- Some(bucketNumber.toInt.min(count - 1))
+ // Compute ratio of e's distance along range to total range first, for better precision
+ val bucketNumber = (((e - min) / (max - min)) * count).toInt
+ // should be less than count, but will equal count if e == max, in which case
+ // it's part of the last end-range-inclusive bucket, so return count-1
+ Some(math.min(bucketNumber, count - 1))
}
}
// Decide which bucket function to pass to histogramPartition. We decide here
- // rather than having a general function so that the decission need only be made
+ // rather than having a general function so that the decision need only be made
// once rather than once per shard
val bucketFunction = if (evenBuckets) {
- fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _
+ fastBucketFunction(buckets.head, buckets.last, buckets.length - 1) _
} else {
basicBucketFunction _
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 486e86ce1bb19..f77abac42b623 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -215,8 +215,7 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()
- val inputMetrics = context.taskMetrics
- .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
+ val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
@@ -240,7 +239,7 @@ class HadoopRDD[K, V](
val key: K = reader.createKey()
val value: V = reader.createValue()
- override def getNext() = {
+ override def getNext(): (K, V) = {
try {
finished = !reader.next(key, value)
} catch {
@@ -337,11 +336,11 @@ private[spark] object HadoopRDD extends Logging {
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process.
*/
- def getCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.get(key)
+ def getCachedMetadata(key: String): Any = SparkEnv.get.hadoopJobMetadata.get(key)
- def containsCachedMetadata(key: String) = SparkEnv.get.hadoopJobMetadata.containsKey(key)
+ def containsCachedMetadata(key: String): Boolean = SparkEnv.get.hadoopJobMetadata.containsKey(key)
- def putCachedMetadata(key: String, value: Any) =
+ private def putCachedMetadata(key: String, value: Any): Unit =
SparkEnv.get.hadoopJobMetadata.put(key, value)
/** Add Hadoop configuration specific to a single partition and attempt. */
@@ -371,7 +370,7 @@ private[spark] object HadoopRDD extends Logging {
override def getPartitions: Array[Partition] = firstParent[T].partitions
- override def compute(split: Partition, context: TaskContext) = {
+ override def compute(split: Partition, context: TaskContext): Iterator[U] = {
val partition = split.asInstanceOf[HadoopPartition]
val inputSplit = partition.inputSplit.value
f(inputSplit, firstParent[T].iterator(split, context))
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index e2267861e79df..0c28f045e46e9 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.sql.{Connection, ResultSet}
+import java.sql.{PreparedStatement, Connection, ResultSet}
import scala.reflect.ClassTag
@@ -28,8 +28,9 @@ import org.apache.spark.util.NextIterator
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
- override def index = idx
+ override def index: Int = idx
}
+
// TODO: Expose a jdbcRDD function in SparkContext and mark this as semi-private
/**
* An RDD that executes an SQL query on a JDBC connection and reads results.
@@ -70,7 +71,8 @@ class JdbcRDD[T: ClassTag](
}).toArray
}
- override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
+ override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]
+ {
context.addTaskCompletionListener{ context => closeIfNeeded() }
val part = thePart.asInstanceOf[JdbcPartition]
val conn = getConnection()
@@ -88,7 +90,7 @@ class JdbcRDD[T: ClassTag](
stmt.setLong(2, part.upper)
val rs = stmt.executeQuery()
- override def getNext: T = {
+ override def getNext(): T = {
if (rs.next()) {
mapRow(rs)
} else {
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index 4883fb828814c..a838aac6e8d1a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -31,6 +31,6 @@ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
override def getPartitions: Array[Partition] = firstParent[T].partitions
- override def compute(split: Partition, context: TaskContext) =
+ override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 7fb94840df99c..2ab967f4bb313 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -238,7 +238,7 @@ private[spark] object NewHadoopRDD {
override def getPartitions: Array[Partition] = firstParent[T].partitions
- override def compute(split: Partition, context: TaskContext) = {
+ override def compute(split: Partition, context: TaskContext): Iterator[U] = {
val partition = split.asInstanceOf[NewHadoopPartition]
val inputSplit = partition.serializableHadoopSplit.value
f(inputSplit, firstParent[T].iterator(split, context))
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index f12d0cffaba34..e2394e28f8d26 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -98,7 +98,7 @@ private[spark] class ParallelCollectionRDD[T: ClassTag](
slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray
}
- override def compute(s: Partition, context: TaskContext) = {
+ override def compute(s: Partition, context: TaskContext): Iterator[T] = {
new InterruptibleIterator(context, s.asInstanceOf[ParallelCollectionPartition[T]].iterator)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index f781a8d776f2a..a00f4c1cdff91 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -40,7 +40,7 @@ private[spark] class PruneDependency[T](rdd: RDD[T], @transient partitionFilterF
.filter(s => partitionFilterFunc(s.index)).zipWithIndex
.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
- override def getParents(partitionId: Int) = {
+ override def getParents(partitionId: Int): List[Int] = {
List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
}
}
@@ -59,8 +59,10 @@ class PartitionPruningRDD[T: ClassTag](
@transient partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
- override def compute(split: Partition, context: TaskContext) = firstParent[T].iterator(
- split.asInstanceOf[PartitionPruningRDDPartition].parentSplit, context)
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ firstParent[T].iterator(
+ split.asInstanceOf[PartitionPruningRDDPartition].parentSplit, context)
+ }
override protected def getPartitions: Array[Partition] =
getDependencies.head.asInstanceOf[PruneDependency[T]].partitions
@@ -74,7 +76,7 @@ object PartitionPruningRDD {
* Create a PartitionPruningRDD. This function can be used to create the PartitionPruningRDD
* when its type T is not known at compile time.
*/
- def create[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean) = {
+ def create[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean): PartitionPruningRDD[T] = {
new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassTag)
}
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index ed79032893d33..dc60d48927624 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -149,10 +149,10 @@ private[spark] class PipedRDD[T: ClassTag](
}.start()
// Return an iterator that read lines from the process's stdout
- val lines = Source.fromInputStream(proc.getInputStream).getLines
+ val lines = Source.fromInputStream(proc.getInputStream).getLines()
new Iterator[String] {
- def next() = lines.next()
- def hasNext = {
+ def next(): String = lines.next()
+ def hasNext: Boolean = {
if (lines.hasNext) {
true
} else {
@@ -162,7 +162,7 @@ private[spark] class PipedRDD[T: ClassTag](
}
// cleanup task working directory if used
- if (workInTaskDirectory == true) {
+ if (workInTaskDirectory) {
scala.util.control.Exception.ignoring(classOf[IOException]) {
Utils.deleteRecursively(new File(taskDirectory))
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a4c74ed03e330..ddbfd5624e741 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -186,7 +186,7 @@ abstract class RDD[T: ClassTag](
}
/** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
- def getStorageLevel = storageLevel
+ def getStorageLevel: StorageLevel = storageLevel
// Our dependencies and partitions will be gotten by calling subclass's methods below, and will
// be overwritten when we're checkpointed
@@ -746,13 +746,13 @@ abstract class RDD[T: ClassTag](
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = {
zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>
new Iterator[(T, U)] {
- def hasNext = (thisIter.hasNext, otherIter.hasNext) match {
+ def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
case (true, true) => true
case (false, false) => false
case _ => throw new SparkException("Can only zip RDDs with " +
"same number of elements in each partition")
}
- def next = (thisIter.next, otherIter.next)
+ def next(): (T, U) = (thisIter.next(), otherIter.next())
}
}
}
@@ -868,8 +868,8 @@ abstract class RDD[T: ClassTag](
// Our partitioner knows how to handle T (which, since we have a partitioner, is
// really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
val p2 = new Partitioner() {
- override def numPartitions = p.numPartitions
- override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)
+ override def numPartitions: Int = p.numPartitions
+ override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
}
// Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
// anyway, and when calling .keys, will not have a partitioner set, even though
@@ -1394,7 +1394,7 @@ abstract class RDD[T: ClassTag](
}
/** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
- def context = sc
+ def context: SparkContext = sc
/**
* Private API for changing an RDD's ClassTag.
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index d9fe6847254fa..2dc47f95937cb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -17,14 +17,12 @@
package org.apache.spark.rdd
-import scala.reflect.ClassTag
-
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.Serializer
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
- override val index = idx
+ override val index: Int = idx
override def hashCode(): Int = idx
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index ed24ea22a661c..c27f435eb9c5a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -105,7 +105,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
seq
}
}
- def integrate(dep: CoGroupSplitDep, op: Product2[K, V] => Unit) = dep match {
+ def integrate(dep: CoGroupSplitDep, op: Product2[K, V] => Unit): Unit = dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index aece683ff3199..4239e7e22af89 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -44,7 +44,7 @@ private[spark] class UnionPartition[T: ClassTag](
var parentPartition: Partition = rdd.partitions(parentRddPartitionIndex)
- def preferredLocations() = rdd.preferredLocations(parentPartition)
+ def preferredLocations(): Seq[String] = rdd.preferredLocations(parentPartition)
override val index: Int = idx
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index 95b2dd954e9f4..d0be304762e1f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -32,7 +32,7 @@ private[spark] class ZippedPartitionsPartition(
override val index: Int = idx
var partitionValues = rdds.map(rdd => rdd.partitions(idx))
- def partitions = partitionValues
+ def partitions: Seq[Partition] = partitionValues
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
index fa83372bb4d11..e0edd7d4ae968 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala
@@ -39,8 +39,11 @@ class AccumulableInfo (
}
object AccumulableInfo {
- def apply(id: Long, name: String, update: Option[String], value: String) =
+ def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = {
new AccumulableInfo(id, name, update, value)
+ }
- def apply(id: Long, name: String, value: String) = new AccumulableInfo(id, name, None, value)
+ def apply(id: Long, name: String, value: String): AccumulableInfo = {
+ new AccumulableInfo(id, name, None, value)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8feac6cb6b7a1..b405bd3338e7c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -946,7 +946,7 @@ class DAGScheduler(
val stage = stageIdToStage(task.stageId)
- def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
+ def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
val serviceTime = stage.latestInfo.submissionTime match {
case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
case _ => "Unknown"
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 34fa6d27c3a45..c0d889360ae99 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -149,47 +149,60 @@ private[spark] class EventLoggingListener(
}
// Events that do not trigger a flush
- override def onStageSubmitted(event: SparkListenerStageSubmitted) =
- logEvent(event)
- override def onTaskStart(event: SparkListenerTaskStart) =
- logEvent(event)
- override def onTaskGettingResult(event: SparkListenerTaskGettingResult) =
- logEvent(event)
- override def onTaskEnd(event: SparkListenerTaskEnd) =
- logEvent(event)
- override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate) =
- logEvent(event)
+ override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)
+
+ override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event)
+
+ override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = logEvent(event)
+
+ override def onTaskEnd(event: SparkListenerTaskEnd): Unit = logEvent(event)
+
+ override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = logEvent(event)
// Events that trigger a flush
- override def onStageCompleted(event: SparkListenerStageCompleted) =
- logEvent(event, flushLogger = true)
- override def onJobStart(event: SparkListenerJobStart) =
- logEvent(event, flushLogger = true)
- override def onJobEnd(event: SparkListenerJobEnd) =
+ override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
logEvent(event, flushLogger = true)
- override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded) =
+ }
+
+ override def onJobStart(event: SparkListenerJobStart): Unit = logEvent(event, flushLogger = true)
+
+ override def onJobEnd(event: SparkListenerJobEnd): Unit = logEvent(event, flushLogger = true)
+
+ override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
logEvent(event, flushLogger = true)
- override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved) =
+ }
+
+ override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = {
logEvent(event, flushLogger = true)
- override def onUnpersistRDD(event: SparkListenerUnpersistRDD) =
+ }
+
+ override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
logEvent(event, flushLogger = true)
- override def onApplicationStart(event: SparkListenerApplicationStart) =
+ }
+
+ override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
logEvent(event, flushLogger = true)
- override def onApplicationEnd(event: SparkListenerApplicationEnd) =
+ }
+
+ override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
logEvent(event, flushLogger = true)
- override def onExecutorAdded(event: SparkListenerExecutorAdded) =
+ }
+ override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
logEvent(event, flushLogger = true)
- override def onExecutorRemoved(event: SparkListenerExecutorRemoved) =
+ }
+
+ override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
logEvent(event, flushLogger = true)
+ }
// No-op because logging every update would be overkill
- override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate) { }
+ override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
/**
* Stop logging events. The event log file will be renamed so that it loses the
* ".inprogress" suffix.
*/
- def stop() = {
+ def stop(): Unit = {
writer.foreach(_.close())
val target = new Path(logPath)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 8aa528ac573d0..e55b76c36cc5f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -57,7 +57,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
private val stageIdToJobId = new HashMap[Int, Int]
private val jobIdToStageIds = new HashMap[Int, Seq[Int]]
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
- override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+ override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}
createLogDir()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index 29879b374b801..382b09422a4a0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -34,7 +34,7 @@ private[spark] class JobWaiter[T](
@volatile
private var _jobFinished = totalTasks == 0
- def jobFinished = _jobFinished
+ def jobFinished: Boolean = _jobFinished
// If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero
// partition RDDs), we set the jobResult directly to JobSucceeded.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 759df023a6dcf..a3caa9f000c89 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -160,7 +160,7 @@ private[spark] object OutputCommitCoordinator {
class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)
extends Actor with ActorLogReceive with Logging {
- override def receiveWithLogging = {
+ override def receiveWithLogging: PartialFunction[Any, Unit] = {
case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
sender ! outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt)
case StopCoordinator =>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 4a9ff918afe25..e074ce6ebff0b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -64,5 +64,5 @@ private[spark] class ResultTask[T, U](
// This is only callable on the driver side.
override def preferredLocations: Seq[TaskLocation] = preferredLocs
- override def toString = "ResultTask(" + stageId + ", " + partitionId + ")"
+ override def toString: String = "ResultTask(" + stageId + ", " + partitionId + ")"
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 79709089c0da4..fd0d484b45460 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -47,7 +47,7 @@ private[spark] class ShuffleMapTask(
/** A constructor used only in test suites. This does not require passing in an RDD. */
def this(partitionId: Int) {
- this(0, null, new Partition { override def index = 0 }, null)
+ this(0, null, new Partition { override def index: Int = 0 }, null)
}
@transient private val preferredLocs: Seq[TaskLocation] = {
@@ -83,5 +83,5 @@ private[spark] class ShuffleMapTask(
override def preferredLocations: Seq[TaskLocation] = preferredLocs
- override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partitionId)
+ override def toString: String = "ShuffleMapTask(%d, %d)".format(stageId, partitionId)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 52720d48ca67f..b711ff209af94 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -300,7 +300,7 @@ private[spark] object StatsReportListener extends Logging {
}
def showDistribution(heading: String, dOpt: Option[Distribution], format:String) {
- def f(d: Double) = format.format(d)
+ def f(d: Double): String = format.format(d)
showDistribution(heading, dOpt, f _)
}
@@ -346,7 +346,7 @@ private[spark] object StatsReportListener extends Logging {
/**
* Reformat a time interval in milliseconds to a prettier format for output
*/
- def millisToString(ms: Long) = {
+ def millisToString(ms: Long): String = {
val (size, units) =
if (ms > hours) {
(ms.toDouble / hours, "hours")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index cc13f57a49b89..4cbc6e84a6bdd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -133,7 +133,7 @@ private[spark] class Stage(
def attemptId: Int = nextAttemptId
- override def toString = "Stage " + id
+ override def toString: String = "Stage " + id
override def hashCode(): Int = id
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 6fa1f2c880f7a..132a9ced77700 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -81,9 +81,11 @@ class TaskInfo(
def status: String = {
if (running) {
- "RUNNING"
- } else if (gettingResult) {
- "GET RESULT"
+ if (gettingResult) {
+ "GET RESULT"
+ } else {
+ "RUNNING"
+ }
} else if (failed) {
"FAILED"
} else if (successful) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
index 10c685f29d3ac..da07ce2c6ea49 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
@@ -29,23 +29,22 @@ private[spark] sealed trait TaskLocation {
/**
* A location that includes both a host and an executor id on that host.
*/
-private [spark] case class ExecutorCacheTaskLocation(override val host: String,
- val executorId: String) extends TaskLocation {
-}
+private [spark]
+case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
+ extends TaskLocation
/**
* A location on a host.
*/
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
- override def toString = host
+ override def toString: String = host
}
/**
* A location on a host that is cached by HDFS.
*/
-private [spark] case class HDFSCacheTaskLocation(override val host: String)
- extends TaskLocation {
- override def toString = TaskLocation.inMemoryLocationTag + host
+private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
+ override def toString: String = TaskLocation.inMemoryLocationTag + host
}
private[spark] object TaskLocation {
@@ -54,14 +53,16 @@ private[spark] object TaskLocation {
// confusion. See RFC 952 and RFC 1123 for information about the format of hostnames.
val inMemoryLocationTag = "hdfs_cache_"
- def apply(host: String, executorId: String) = new ExecutorCacheTaskLocation(host, executorId)
+ def apply(host: String, executorId: String): TaskLocation = {
+ new ExecutorCacheTaskLocation(host, executorId)
+ }
/**
* Create a TaskLocation from a string returned by getPreferredLocations.
* These strings have the form [hostname] or hdfs_cache_[hostname], depending on whether the
* location is cached.
*/
- def apply(str: String) = {
+ def apply(str: String): TaskLocation = {
val hstr = str.stripPrefix(inMemoryLocationTag)
if (hstr.equals(str)) {
new HostTaskLocation(str)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index f33fd4450b2a6..076b36e86c0ce 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -373,17 +373,17 @@ private[spark] class TaskSchedulerImpl(
}
def handleSuccessfulTask(
- taskSetManager: TaskSetManager,
- tid: Long,
- taskResult: DirectTaskResult[_]) = synchronized {
+ taskSetManager: TaskSetManager,
+ tid: Long,
+ taskResult: DirectTaskResult[_]): Unit = synchronized {
taskSetManager.handleSuccessfulTask(tid, taskResult)
}
def handleFailedTask(
- taskSetManager: TaskSetManager,
- tid: Long,
- taskState: TaskState,
- reason: TaskEndReason) = synchronized {
+ taskSetManager: TaskSetManager,
+ tid: Long,
+ taskState: TaskState,
+ reason: TaskEndReason): Unit = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason)
if (!taskSetManager.isZombie && taskState != TaskState.KILLED) {
// Need to revive offers again now that the task set manager state has been updated to
@@ -423,7 +423,7 @@ private[spark] class TaskSchedulerImpl(
starvationTimer.cancel()
}
- override def defaultParallelism() = backend.defaultParallelism()
+ override def defaultParallelism(): Int = backend.defaultParallelism()
// Check for speculatable tasks in all our active jobs.
def checkSpeculatableTasks() {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 529237f0d35dc..d509881c74fef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.io.NotSerializableException
import java.nio.ByteBuffer
import java.util.Arrays
+import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@@ -29,6 +30,7 @@ import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.SchedulingMode._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.util.{Clock, SystemClock, Utils}
@@ -97,7 +99,8 @@ private[spark] class TaskSetManager(
var calculatedTasks = 0
val runningTasksSet = new HashSet[Long]
- override def runningTasks = runningTasksSet.size
+
+ override def runningTasks: Int = runningTasksSet.size
// True once no more tasks should be launched for this task set manager. TaskSetManagers enter
// the zombie state once at least one attempt of each task has completed successfully, or if the
@@ -168,9 +171,9 @@ private[spark] class TaskSetManager(
var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level
- override def schedulableQueue = null
+ override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null
- override def schedulingMode = SchedulingMode.NONE
+ override def schedulingMode: SchedulingMode = SchedulingMode.NONE
var emittedTaskSizeWarning = false
@@ -585,7 +588,7 @@ private[spark] class TaskSetManager(
/**
* Marks the task as getting result and notifies the DAG Scheduler
*/
- def handleTaskGettingResult(tid: Long) = {
+ def handleTaskGettingResult(tid: Long): Unit = {
val info = taskInfos(tid)
info.markGettingResult()
sched.dagScheduler.taskGettingResult(info)
@@ -612,7 +615,7 @@ private[spark] class TaskSetManager(
/**
* Marks the task as successful and notifies the DAGScheduler that a task has ended.
*/
- def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
+ def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
info.markSuccessful()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 87ebf31139ce9..5d258d9da4d1a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -85,7 +85,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}
- def receiveWithLogging = {
+ def receiveWithLogging: PartialFunction[Any, Unit] = {
case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index f14aaeea0a25c..5a38ad9f2b12c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -109,7 +109,7 @@ private[spark] abstract class YarnSchedulerBackend(
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
- override def receive = {
+ override def receive: PartialFunction[Any, Unit] = {
case RegisterClusterManager =>
logInfo(s"ApplicationMaster registered as $sender")
amActor = Some(sender)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
index aa3ec0f8cfb9c..8df4f3b554c41 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
@@ -24,7 +24,7 @@ private[spark] object MemoryUtils {
val OVERHEAD_FRACTION = 0.10
val OVERHEAD_MINIMUM = 384
- def calculateTotalMemory(sc: SparkContext) = {
+ def calculateTotalMemory(sc: SparkContext): Int = {
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) + sc.executorMemory
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 06bb527522141..b381436839227 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -387,7 +387,7 @@ private[spark] class MesosSchedulerBackend(
}
// TODO: query Mesos for number of cores
- override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
+ override def defaultParallelism(): Int = sc.conf.getInt("spark.default.parallelism", 8)
override def applicationId(): String =
Option(appId).getOrElse {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index d95426d918e19..eb3f999b5b375 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -59,7 +59,7 @@ private[spark] class LocalActor(
private val executor = new Executor(
localExecutorId, localExecutorHostname, SparkEnv.get, isLocal = true)
- override def receiveWithLogging = {
+ override def receiveWithLogging: PartialFunction[Any, Unit] = {
case ReviveOffers =>
reviveOffers()
@@ -117,7 +117,7 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
localActor ! ReviveOffers
}
- override def defaultParallelism() =
+ override def defaultParallelism(): Int =
scheduler.conf.getInt("spark.default.parallelism", totalCores)
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 1baa0e009f3ae..dfbde7c8a1b0d 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -59,9 +59,10 @@ private[spark] class JavaSerializationStream(
}
private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
-extends DeserializationStream {
+ extends DeserializationStream {
+
private val objIn = new ObjectInputStream(in) {
- override def resolveClass(desc: ObjectStreamClass) =
+ override def resolveClass(desc: ObjectStreamClass): Class[_] =
Class.forName(desc.getName, false, loader)
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index dc7aa99738c17..579fb6624e692 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -49,10 +49,20 @@ class KryoSerializer(conf: SparkConf)
with Logging
with Serializable {
- private val bufferSize =
- (conf.getDouble("spark.kryoserializer.buffer.mb", 0.064) * 1024 * 1024).toInt
+ private val bufferSizeMb = conf.getDouble("spark.kryoserializer.buffer.mb", 0.064)
+ if (bufferSizeMb >= 2048) {
+ throw new IllegalArgumentException("spark.kryoserializer.buffer.mb must be less than " +
+ s"2048 mb, got: + $bufferSizeMb mb.")
+ }
+ private val bufferSize = (bufferSizeMb * 1024 * 1024).toInt
+
+ val maxBufferSizeMb = conf.getInt("spark.kryoserializer.buffer.max.mb", 64)
+ if (maxBufferSizeMb >= 2048) {
+ throw new IllegalArgumentException("spark.kryoserializer.buffer.max.mb must be less than " +
+ s"2048 mb, got: + $maxBufferSizeMb mb.")
+ }
+ private val maxBufferSize = maxBufferSizeMb * 1024 * 1024
- private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
private val userRegistrator = conf.getOption("spark.kryo.registrator")
@@ -60,7 +70,7 @@ class KryoSerializer(conf: SparkConf)
.split(',')
.filter(!_.isEmpty)
- def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
+ def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
def newKryo(): Kryo = {
val instantiator = new EmptyScalaKryoInstantiator
diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
index 7de2f9cbb2866..d0178dfde6935 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala
@@ -106,12 +106,13 @@ class FileShuffleBlockManager(conf: SparkConf)
* when the writers are closed successfully
*/
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
- writeMetrics: ShuffleWriteMetrics) = {
+ writeMetrics: ShuffleWriteMetrics): ShuffleWriterGroup = {
new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null
+ val openStartTime = System.nanoTime
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
@@ -135,6 +136,9 @@ class FileShuffleBlockManager(conf: SparkConf)
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
}
}
+ // Creating the file to write to and creating a disk writer both involve interacting with
+ // the disk, so should be included in the shuffle write time.
+ writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
override def releaseWriters(success: Boolean) {
if (consolidateShuffleFiles) {
@@ -268,7 +272,7 @@ object FileShuffleBlockManager {
new PrimitiveVector[Long]()
}
- def apply(bucketId: Int) = files(bucketId)
+ def apply(bucketId: Int): File = files(bucketId)
def recordMapOutput(mapId: Int, offsets: Array[Long], lengths: Array[Long]) {
assert(offsets.length == lengths.length)
diff --git a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
index b292587d37028..87fd161e06c85 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockManager.scala
@@ -80,7 +80,7 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
* end of the output file. This will be used by getBlockLocation to figure out where each block
* begins and ends.
* */
- def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]) = {
+ def writeIndexFile(shuffleId: Int, mapId: Int, lengths: Array[Long]): Unit = {
val indexFile = getIndexFile(shuffleId, mapId)
val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile)))
try {
@@ -121,5 +121,5 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager {
}
}
- override def stop() = {}
+ override def stop(): Unit = {}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index fa2e617762f55..55ea0f17b156a 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -63,6 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C](
sorter.insertAll(records)
}
+ // Don't bother including the time to open the merged output file in the shuffle write time,
+ // because it just opens a single file, so is typically too fast to measure accurately
+ // (see SPARK-3570).
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index 1f012941c85ab..c186fd360fef6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -35,13 +35,13 @@ sealed abstract class BlockId {
def name: String
// convenience methods
- def asRDDId = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
- def isRDD = isInstanceOf[RDDBlockId]
- def isShuffle = isInstanceOf[ShuffleBlockId]
- def isBroadcast = isInstanceOf[BroadcastBlockId]
+ def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
+ def isRDD: Boolean = isInstanceOf[RDDBlockId]
+ def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
+ def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
- override def toString = name
- override def hashCode = name.hashCode
+ override def toString: String = name
+ override def hashCode: Int = name.hashCode
override def equals(other: Any): Boolean = other match {
case o: BlockId => getClass == o.getClass && name.equals(o.name)
case _ => false
@@ -50,54 +50,54 @@ sealed abstract class BlockId {
@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
- def name = "rdd_" + rddId + "_" + splitIndex
+ override def name: String = "rdd_" + rddId + "_" + splitIndex
}
// Format of the shuffle block ids (including data and index) should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData().
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
- def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
+ override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}
@DeveloperApi
case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
- def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
+ override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
}
@DeveloperApi
case class ShuffleIndexBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
- def name = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
+ override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
}
@DeveloperApi
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
- def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
+ override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}
@DeveloperApi
case class TaskResultBlockId(taskId: Long) extends BlockId {
- def name = "taskresult_" + taskId
+ override def name: String = "taskresult_" + taskId
}
@DeveloperApi
case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
- def name = "input-" + streamId + "-" + uniqueId
+ override def name: String = "input-" + streamId + "-" + uniqueId
}
/** Id associated with temporary local data managed as blocks. Not serializable. */
private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
- def name = "temp_local_" + id
+ override def name: String = "temp_local_" + id
}
/** Id associated with temporary shuffle data managed as blocks. Not serializable. */
private[spark] case class TempShuffleBlockId(id: UUID) extends BlockId {
- def name = "temp_shuffle_" + id
+ override def name: String = "temp_shuffle_" + id
}
// Intended only for testing purposes
private[spark] case class TestBlockId(id: String) extends BlockId {
- def name = "test_" + id
+ override def name: String = "test_" + id
}
@DeveloperApi
@@ -112,7 +112,7 @@ object BlockId {
val TEST = "test_(.*)".r
/** Converts a BlockId "name" String back into a BlockId. */
- def apply(id: String) = id match {
+ def apply(id: String): BlockId = id match {
case RDD(rddId, splitIndex) =>
RDDBlockId(rddId.toInt, splitIndex.toInt)
case SHUFFLE(shuffleId, mapId, reduceId) =>
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 80d66e59132da..1dff09a75d038 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -535,9 +535,14 @@ private[spark] class BlockManager(
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
- val copyForMemory = ByteBuffer.allocate(bytes.limit)
- copyForMemory.put(bytes)
- memoryStore.putBytes(blockId, copyForMemory, level)
+ memoryStore.putBytes(blockId, bytes.limit, () => {
+ // https://issues.apache.org/jira/browse/SPARK-6076
+ // If the file size is bigger than the free memory, OOM will happen. So if we cannot
+ // put it into MemoryStore, copyForMemory should not be created. That's why this
+ // action is put into a `() => ByteBuffer` and created lazily.
+ val copyForMemory = ByteBuffer.allocate(bytes.limit)
+ copyForMemory.put(bytes)
+ })
bytes.rewind()
}
if (!asBlockResult) {
@@ -991,15 +996,23 @@ private[spark] class BlockManager(
putIterator(blockId, Iterator(value), level, tellMaster)
}
+ def dropFromMemory(
+ blockId: BlockId,
+ data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
+ dropFromMemory(blockId, () => data)
+ }
+
/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*
+ * If `data` is not put on disk, it won't be created.
+ *
* Return the block status if the given block has been updated, else None.
*/
def dropFromMemory(
blockId: BlockId,
- data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
+ data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
@@ -1023,7 +1036,7 @@ private[spark] class BlockManager(
// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
- data match {
+ data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index b177a59c721df..a6f1ebf325a7c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -77,11 +77,11 @@ class BlockManagerId private (
@throws(classOf[IOException])
private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
- override def toString = s"BlockManagerId($executorId, $host, $port)"
+ override def toString: String = s"BlockManagerId($executorId, $host, $port)"
override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port
- override def equals(that: Any) = that match {
+ override def equals(that: Any): Boolean = that match {
case id: BlockManagerId =>
executorId == id.executorId && port == id.port && host == id.host
case _ =>
@@ -100,10 +100,10 @@ private[spark] object BlockManagerId {
* @param port Port of the block manager.
* @return A new [[org.apache.spark.storage.BlockManagerId]].
*/
- def apply(execId: String, host: String, port: Int) =
+ def apply(execId: String, host: String, port: Int): BlockManagerId =
getCachedBlockManagerId(new BlockManagerId(execId, host, port))
- def apply(in: ObjectInput) = {
+ def apply(in: ObjectInput): BlockManagerId = {
val obj = new BlockManagerId()
obj.readExternal(in)
getCachedBlockManagerId(obj)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 654796f23c96e..061964826f08b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -79,7 +79,7 @@ class BlockManagerMaster(
* Check if block manager master has a block. Note that this can be used to check for only
* those blocks that are reported to block manager master.
*/
- def contains(blockId: BlockId) = {
+ def contains(blockId: BlockId): Boolean = {
!getLocations(blockId).isEmpty
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 787b0f96bec32..5b5328016124e 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -52,7 +52,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
private val akkaTimeout = AkkaUtils.askTimeout(conf)
- override def receiveWithLogging = {
+ override def receiveWithLogging: PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
register(blockManagerId, maxMemSize, slaveActor)
sender ! true
@@ -421,7 +421,7 @@ private[spark] class BlockManagerInfo(
// Mapping from block id to its status.
private val _blocks = new JHashMap[BlockId, BlockStatus]
- def getStatus(blockId: BlockId) = Option(_blocks.get(blockId))
+ def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.get(blockId))
def updateLastSeenMs() {
_lastSeenMs = System.currentTimeMillis()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
index 8462871e798a5..52fb896c4e21f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveActor.scala
@@ -38,7 +38,7 @@ class BlockManagerSlaveActor(
import context.dispatcher
// Operations that involve removing blocks may be slow and should be done asynchronously
- override def receiveWithLogging = {
+ override def receiveWithLogging: PartialFunction[Any, Unit] = {
case RemoveBlock(blockId) =>
doAsync[Boolean]("removing block " + blockId, sender) {
blockManager.removeBlock(blockId)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 81164178b9e8e..f703e50b6b0ac 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -82,11 +82,13 @@ private[spark] class DiskBlockObjectWriter(
{
/** Intercepts write calls and tracks total time spent writing. Not thread safe. */
private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
- def write(i: Int): Unit = callWithTiming(out.write(i))
- override def write(b: Array[Byte]) = callWithTiming(out.write(b))
- override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len))
- override def close() = out.close()
- override def flush() = out.flush()
+ override def write(i: Int): Unit = callWithTiming(out.write(i))
+ override def write(b: Array[Byte]): Unit = callWithTiming(out.write(b))
+ override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+ callWithTiming(out.write(b, off, len))
+ }
+ override def close(): Unit = out.close()
+ override def flush(): Unit = out.flush()
}
/** The file channel, used for repositioning / truncating the file. */
@@ -141,8 +143,9 @@ private[spark] class DiskBlockObjectWriter(
if (syncWrites) {
// Force outstanding writes to disk and track how long it takes
objOut.flush()
- def sync = fos.getFD.sync()
- callWithTiming(sync)
+ callWithTiming {
+ fos.getFD.sync()
+ }
}
objOut.close()
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 12cd8ea3bdf1f..2883137872600 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -47,6 +47,8 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
+ // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content
+ // of subDirs(i) is protected by the lock of subDirs(i)
private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
private val shutdownHook = addShutdownHook()
@@ -61,20 +63,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
- var subDir = subDirs(dirId)(subDirId)
- if (subDir == null) {
- subDir = subDirs(dirId).synchronized {
- val old = subDirs(dirId)(subDirId)
- if (old != null) {
- old
- } else {
- val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
- if (!newDir.exists() && !newDir.mkdir()) {
- throw new IOException(s"Failed to create local dir in $newDir.")
- }
- subDirs(dirId)(subDirId) = newDir
- newDir
+ val subDir = subDirs(dirId).synchronized {
+ val old = subDirs(dirId)(subDirId)
+ if (old != null) {
+ old
+ } else {
+ val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
+ if (!newDir.exists() && !newDir.mkdir()) {
+ throw new IOException(s"Failed to create local dir in $newDir.")
}
+ subDirs(dirId)(subDirId) = newDir
+ newDir
}
}
@@ -91,7 +90,12 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
/** List all the files currently stored on disk by the disk manager. */
def getAllFiles(): Seq[File] = {
// Get all the files inside the array of array of directories
- subDirs.flatten.filter(_ != null).flatMap { dir =>
+ subDirs.flatMap { dir =>
+ dir.synchronized {
+ // Copy the content of dir because it may be modified in other threads
+ dir.clone()
+ }
+ }.filter(_ != null).flatMap { dir =>
val files = dir.listFiles()
if (files != null) files else Seq.empty
}
diff --git a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
index 132502b75f8cd..95e2d688d9b17 100644
--- a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
@@ -24,5 +24,7 @@ import java.io.File
* based off an offset and a length.
*/
private[spark] class FileSegment(val file: File, val offset: Long, val length: Long) {
- override def toString = "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length)
+ override def toString: String = {
+ "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 1be860aea63d0..ed609772e6979 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -98,6 +98,26 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
+ /**
+ * Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
+ * put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
+ *
+ * The caller should guarantee that `size` is correct.
+ */
+ def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
+ // Work on a duplicate - since the original input might be used elsewhere.
+ lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
+ val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
+ val data =
+ if (putAttempt.success) {
+ assert(bytes.limit == size)
+ Right(bytes.duplicate())
+ } else {
+ null
+ }
+ PutResult(size, data, putAttempt.droppedBlocks)
+ }
+
override def putArray(
blockId: BlockId,
values: Array[Any],
@@ -312,11 +332,22 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
blockId.asRDDId.map(_.rddId)
}
+ private def tryToPut(
+ blockId: BlockId,
+ value: Any,
+ size: Long,
+ deserialized: Boolean): ResultWithDroppedBlocks = {
+ tryToPut(blockId, () => value, size, deserialized)
+ }
+
/**
* Try to put in a set of values, if we can free up enough space. The value should either be
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
* must also be passed by the caller.
*
+ * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
+ * created to avoid OOM since it may be a big ByteBuffer.
+ *
* Synchronize on `accountingLock` to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
@@ -326,7 +357,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
*/
private def tryToPut(
blockId: BlockId,
- value: Any,
+ value: () => Any,
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {
@@ -345,7 +376,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
droppedBlocks ++= freeSpaceResult.droppedBlocks
if (enoughFreeSpace) {
- val entry = new MemoryEntry(value, size, deserialized)
+ val entry = new MemoryEntry(value(), size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
@@ -357,12 +388,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
- val data = if (deserialized) {
- Left(value.asInstanceOf[Array[Any]])
+ lazy val data = if (deserialized) {
+ Left(value().asInstanceOf[Array[Any]])
} else {
- Right(value.asInstanceOf[ByteBuffer].duplicate())
+ Right(value().asInstanceOf[ByteBuffer].duplicate())
}
- val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
+ val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
// Release the unroll memory used because we no longer need the underlying Array
diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
index 120c327a7e580..0186eb30a1905 100644
--- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
@@ -36,7 +36,7 @@ class RDDInfo(
def isCached: Boolean = (memSize + diskSize + tachyonSize > 0) && numCachedPartitions > 0
- override def toString = {
+ override def toString: String = {
import Utils.bytesToString
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
"MemorySize: %s; TachyonSize: %s; DiskSize: %s").format(
@@ -44,7 +44,7 @@ class RDDInfo(
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
}
- override def compare(that: RDDInfo) = {
+ override def compare(that: RDDInfo): Int = {
this.id - that.id
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index e5e1cf5a69a19..134abea866218 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -50,11 +50,11 @@ class StorageLevel private(
def this() = this(false, true, false, false) // For deserialization
- def useDisk = _useDisk
- def useMemory = _useMemory
- def useOffHeap = _useOffHeap
- def deserialized = _deserialized
- def replication = _replication
+ def useDisk: Boolean = _useDisk
+ def useMemory: Boolean = _useMemory
+ def useOffHeap: Boolean = _useOffHeap
+ def deserialized: Boolean = _deserialized
+ def replication: Int = _replication
assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes")
@@ -80,7 +80,7 @@ class StorageLevel private(
false
}
- def isValid = (useMemory || useDisk || useOffHeap) && (replication > 0)
+ def isValid: Boolean = (useMemory || useDisk || useOffHeap) && (replication > 0)
def toInt: Int = {
var ret = 0
@@ -183,7 +183,7 @@ object StorageLevel {
useMemory: Boolean,
useOffHeap: Boolean,
deserialized: Boolean,
- replication: Int) = {
+ replication: Int): StorageLevel = {
getCachedStorageLevel(
new StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication))
}
@@ -197,7 +197,7 @@ object StorageLevel {
useDisk: Boolean,
useMemory: Boolean,
deserialized: Boolean,
- replication: Int = 1) = {
+ replication: Int = 1): StorageLevel = {
getCachedStorageLevel(new StorageLevel(useDisk, useMemory, false, deserialized, replication))
}
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
index def49e80a3605..7d75929b96f75 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala
@@ -19,7 +19,6 @@ package org.apache.spark.storage
import scala.collection.mutable
-import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
@@ -32,7 +31,7 @@ class StorageStatusListener extends SparkListener {
// This maintains only blocks that are cached (i.e. storage level is not StorageLevel.NONE)
private[storage] val executorIdToStorageStatus = mutable.Map[String, StorageStatus]()
- def storageStatusList = executorIdToStorageStatus.values.toSeq
+ def storageStatusList: Seq[StorageStatus] = executorIdToStorageStatus.values.toSeq
/** Update storage status list to reflect updated block statuses */
private def updateStorageStatus(execId: String, updatedBlocks: Seq[(BlockId, BlockStatus)]) {
@@ -56,7 +55,7 @@ class StorageStatusListener extends SparkListener {
}
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
val metrics = taskEnd.taskMetrics
if (info != null && metrics != null) {
@@ -67,7 +66,7 @@ class StorageStatusListener extends SparkListener {
}
}
- override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) = synchronized {
+ override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = synchronized {
updateStorageStatus(unpersistRDD.rddId)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index 2ab6a8f3ec1d4..af873034215a9 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -20,8 +20,8 @@ package org.apache.spark.storage
import java.text.SimpleDateFormat
import java.util.{Date, Random}
-import tachyon.TachyonURI
-import tachyon.client.{TachyonFile, TachyonFS}
+import tachyon.client.TachyonFS
+import tachyon.client.TachyonFile
import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
@@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager(
val master: String)
extends Logging {
- val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null
+ val client = if (master != null && master != "") TachyonFS.get(master) else null
if (client == null) {
logError("Failed to connect to the Tachyon as the master address is not configured")
@@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager(
addShutdownHook()
def removeFile(file: TachyonFile): Boolean = {
- client.delete(new TachyonURI(file.getPath()), false)
+ client.delete(file.getPath(), false)
}
def fileExists(file: TachyonFile): Boolean = {
- client.exist(new TachyonURI(file.getPath()))
+ client.exist(file.getPath())
}
def getFile(filename: String): TachyonFile = {
@@ -81,7 +81,7 @@ private[spark] class TachyonBlockManager(
if (old != null) {
old
} else {
- val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
+ val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId)
client.mkdir(path)
val newDir = client.getFile(path)
subDirs(dirId)(subDirId) = newDir
@@ -89,7 +89,7 @@ private[spark] class TachyonBlockManager(
}
}
}
- val filePath = new TachyonURI(s"$subDir/$filename")
+ val filePath = subDir + "/" + filename
if(!client.exist(filePath)) {
client.createFile(filePath)
}
@@ -101,7 +101,7 @@ private[spark] class TachyonBlockManager(
// TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore.
private def createTachyonDirs(): Array[TachyonFile] = {
- logDebug(s"Creating tachyon directories at root dirs '$rootDirs'")
+ logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").map { rootDir =>
var foundLocalDir = false
@@ -113,21 +113,22 @@ private[spark] class TachyonBlockManager(
tries += 1
try {
tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
- val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
+ val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId
if (!client.exist(path)) {
foundLocalDir = client.mkdir(path)
tachyonDir = client.getFile(path)
}
} catch {
case e: Exception =>
- logWarning(s"Attempt $tries to create tachyon dir $tachyonDir failed", e)
+ logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
}
}
if (!foundLocalDir) {
- logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create tachyon dir in $rootDir")
+ logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
+ rootDir)
System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
}
- logInfo(s"Created tachyon directory at $tachyonDir")
+ logInfo("Created tachyon directory at " + tachyonDir)
tachyonDir
}
}
@@ -144,7 +145,7 @@ private[spark] class TachyonBlockManager(
}
} catch {
case e: Exception =>
- logError(s"Exception while deleting tachyon spark dir: $tachyonDir", e)
+ logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
}
client.close()
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
index b86abbda1d3e7..65fa81704c365 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonFileSegment.scala
@@ -24,5 +24,7 @@ import tachyon.client.TachyonFile
* a length.
*/
private[spark] class TachyonFileSegment(val file: TachyonFile, val offset: Long, val length: Long) {
- override def toString = "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length)
+ override def toString: String = {
+ "(name=%s, offset=%d, length=%d)".format(file.getPath(), offset, length)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 0c24ad2760e08..adfa6bbada256 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -60,7 +60,7 @@ private[spark] class SparkUI private (
}
initialize()
- def getAppName = appName
+ def getAppName: String = appName
/** Set the app name for this UI. */
def setAppName(name: String) {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index b5022fe853c49..f07864141a21c 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -149,9 +149,11 @@ private[spark] object UIUtils extends Logging {
}
}
- def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
+ def prependBaseUri(basePath: String = "", resource: String = ""): String = {
+ uiRoot + basePath + resource
+ }
- def commonHeaderNodes = {
+ def commonHeaderNodes: Seq[Node] = {
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index fc1844600f1cb..5fbcd6bb8ad94 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -17,6 +17,8 @@
package org.apache.spark.ui
+import java.util.concurrent.Semaphore
+
import scala.util.Random
import org.apache.spark.{SparkConf, SparkContext}
@@ -51,7 +53,7 @@ private[spark] object UIWorkloadGenerator {
val nJobSet = args(2).toInt
val sc = new SparkContext(conf)
- def setProperties(s: String) = {
+ def setProperties(s: String): Unit = {
if(schedulingMode == SchedulingMode.FAIR) {
sc.setLocalProperty("spark.scheduler.pool", s)
}
@@ -59,7 +61,7 @@ private[spark] object UIWorkloadGenerator {
}
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
- def nextFloat() = new Random().nextFloat()
+ def nextFloat(): Float = new Random().nextFloat()
val jobs = Seq[(String, () => Long)](
("Count", baseData.count),
@@ -88,6 +90,8 @@ private[spark] object UIWorkloadGenerator {
("Job with delays", baseData.map(x => Thread.sleep(100)).count)
)
+ val barrier = new Semaphore(-nJobSet * jobs.size + 1)
+
(1 to nJobSet).foreach { _ =>
for ((desc, job) <- jobs) {
new Thread {
@@ -99,12 +103,17 @@ private[spark] object UIWorkloadGenerator {
} catch {
case e: Exception =>
println("Job Failed: " + desc)
+ } finally {
+ barrier.release()
}
}
}.start
Thread.sleep(INTER_JOB_WAIT_MS)
}
}
+
+ // Waiting for threads.
+ barrier.acquire()
sc.stop()
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 3afd7ef07d7c9..69053fe44d7e4 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark.ExceptionFailure
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
-import org.apache.spark.storage.StorageStatusListener
+import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
import org.apache.spark.ui.{SparkUI, SparkUITab}
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
@@ -55,19 +55,19 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToShuffleWrite = HashMap[String, Long]()
val executorToLogUrls = HashMap[String, Map[String, String]]()
- def storageStatusList = storageStatusListener.storageStatusList
+ def storageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
- override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized {
+ override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
}
- override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
if (info != null) {
val eid = info.executorId
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 937d95a934b59..625596885faa1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -44,6 +44,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// These type aliases are public because they're used in the types of public fields:
type JobId = Int
+ type JobGroupId = String
type StageId = Int
type StageAttemptId = Int
type PoolName = String
@@ -54,6 +55,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val completedJobs = ListBuffer[JobUIData]()
val failedJobs = ListBuffer[JobUIData]()
val jobIdToData = new HashMap[JobId, JobUIData]
+ val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]
// Stages:
val pendingStages = new HashMap[StageId, StageInfo]
@@ -73,7 +75,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// Misc:
val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
- def blockManagerIds = executorIdToBlockManagerId.values.toSeq
+ def blockManagerIds: Seq[BlockManagerId] = executorIdToBlockManagerId.values.toSeq
var schedulingMode: Option[SchedulingMode] = None
@@ -119,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
Map(
"jobIdToData" -> jobIdToData.size,
"stageIdToData" -> stageIdToData.size,
- "stageIdToStageInfo" -> stageIdToInfo.size
+ "stageIdToStageInfo" -> stageIdToInfo.size,
+ "jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum,
+ // Since jobGroupToJobIds is map of sets, check that we don't leak keys with empty values:
+ "jobGroupToJobIds keySet" -> jobGroupToJobIds.keys.size
)
}
@@ -140,13 +145,25 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
if (jobs.size > retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
jobs.take(toRemove).foreach { job =>
- jobIdToData.remove(job.jobId)
+ // Remove the job's UI data, if it exists
+ jobIdToData.remove(job.jobId).foreach { removedJob =>
+ // A null jobGroupId is used for jobs that are run without a job group
+ val jobGroupId = removedJob.jobGroup.orNull
+ // Remove the job group -> job mapping entry, if it exists
+ jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup =>
+ jobsInGroup.remove(job.jobId)
+ // If this was the last job in this job group, remove the map entry for the job group
+ if (jobsInGroup.isEmpty) {
+ jobGroupToJobIds.remove(jobGroupId)
+ }
+ }
+ }
}
jobs.trimStart(toRemove)
}
}
- override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
val jobGroup = for (
props <- Option(jobStart.properties);
group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
@@ -158,6 +175,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
+ // A null jobGroupId is used for jobs that are run without a job group
+ jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
// This may be an underestimate because the job start event references all of the result
@@ -182,7 +201,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}
}
- override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
+ override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
new JobUIData(jobId = jobEnd.jobId)
@@ -219,7 +238,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}
}
- override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
+ override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
val stage = stageCompleted.stageInfo
stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
@@ -260,7 +279,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}
/** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = synchronized {
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val stage = stageSubmitted.stageInfo
activeStages(stage.stageId) = stage
pendingStages.remove(stage.stageId)
@@ -288,7 +307,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}
}
- override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
+ override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
@@ -312,7 +331,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// stageToTaskInfos already has the updated status.
}
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
// If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
// completion event is for. Let's just drop it here. This means we might have some speculation
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index b2bbfdee56946..7ffcf291b5cc6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -24,7 +24,7 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
val sc = parent.sc
val killEnabled = parent.killEnabled
- def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
+ def isFairScheduler: Boolean = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
val listener = parent.jobProgressListener
attachPage(new AllJobsPage(this))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 110f8780a9a12..797c9404bc449 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs
import java.util.Date
import javax.servlet.http.HttpServletRequest
-import scala.xml.{Node, Unparsed}
+import scala.xml.{Elem, Node, Unparsed}
import org.apache.commons.lang3.StringEscapeUtils
@@ -170,7 +170,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
- def accumulableRow(acc: AccumulableInfo) =