No tasks have started yet
@@ -45,23 +47,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
"Details for Stage %s".format(stageId), parent.headerTabs, parent)
}
- val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
+ val stageData = stageDataOption.get
+ val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
val numCompleted = tasks.count(_.taskInfo.finished)
- val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L)
- val hasInput = inputBytes > 0
- val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L)
- val hasShuffleRead = shuffleReadBytes > 0
- val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
- val hasShuffleWrite = shuffleWriteBytes > 0
- val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L)
- val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L)
- val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
-
- var activeTime = 0L
- val now = System.currentTimeMillis
- val tasksActive = listener.stageIdToTasksActive(stageId).values
- tasksActive.foreach(activeTime += _.timeRunning(now))
+ val hasInput = stageData.inputBytes > 0
+ val hasShuffleRead = stageData.shuffleReadBytes > 0
+ val hasShuffleWrite = stageData.shuffleWriteBytes > 0
+ val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0
// scalastyle:off
val summary =
@@ -69,34 +62,34 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index fd8d0b5cdde00..5f45c0ced5ec5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -17,12 +17,11 @@
package org.apache.spark.ui.jobs
-import java.util.Date
-
-import scala.collection.mutable.HashMap
import scala.xml.Node
-import org.apache.spark.scheduler.{StageInfo, TaskInfo}
+import java.util.Date
+
+import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.util.Utils
@@ -71,14 +70,14 @@ private[ui] class StageTableBase(
}
- private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] =
+ private def makeProgressBar(started: Int, completed: Int, failed: Int, total: Int): Seq[Node] =
{
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)
- {completed}/{total} {failed}
+ {completed}/{total} { if (failed > 0) s"($failed failed)" else "" }
@@ -108,13 +107,23 @@ private[ui] class StageTableBase(
{s.details}
}
- listener.stageIdToDescription.get(s.stageId)
- .map(d =>
{d}
{nameLink} {killLink}
)
- .getOrElse(
{nameLink} {killLink} {details}
)
+ val stageDataOption = listener.stageIdToData.get(s.stageId)
+ // Too many nested map/flatMaps with options are just annoying to read. Do this imperatively.
+ if (stageDataOption.isDefined && stageDataOption.get.description.isDefined) {
+ val desc = stageDataOption.get.description
+
{desc}
{nameLink} {killLink}
+ } else {
+
{killLink} {nameLink} {details}
+ }
}
protected def stageRow(s: StageInfo): Seq[Node] = {
- val poolName = listener.stageIdToPool.get(s.stageId)
+ val stageDataOption = listener.stageIdToData.get(s.stageId)
+ if (stageDataOption.isEmpty) {
+ return
{s.stageId} | No data available for this stage |
+ }
+
+ val stageData = stageDataOption.get
val submissionTime = s.submissionTime match {
case Some(t) => UIUtils.formatDate(new Date(t))
case None => "Unknown"
@@ -124,35 +133,20 @@ private[ui] class StageTableBase(
if (finishTime > t) finishTime - t else System.currentTimeMillis - t
}
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
- val startedTasks =
- listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size
- val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0)
- val failedTasks = listener.stageIdToTasksFailed.getOrElse(s.stageId, 0) match {
- case f if f > 0 => "(%s failed)".format(f)
- case _ => ""
- }
- val totalTasks = s.numTasks
- val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L)
- val inputRead = inputSortable match {
- case 0 => ""
- case b => Utils.bytesToString(b)
- }
- val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L)
- val shuffleRead = shuffleReadSortable match {
- case 0 => ""
- case b => Utils.bytesToString(b)
- }
- val shuffleWriteSortable = listener.stageIdToShuffleWrite.getOrElse(s.stageId, 0L)
- val shuffleWrite = shuffleWriteSortable match {
- case 0 => ""
- case b => Utils.bytesToString(b)
- }
+
+ val inputRead = stageData.inputBytes
+ val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
+ val shuffleRead = stageData.shuffleReadBytes
+ val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else ""
+ val shuffleWrite = stageData.shuffleWriteBytes
+ val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
+
{s.stageId} | ++
{if (isFairScheduler) {
- {poolName.get}
+ .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}>
+ {stageData.schedulingPool}
|
} else {
@@ -162,11 +156,12 @@ private[ui] class StageTableBase(
{submissionTime} |
{formattedDuration} |
- {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
+ {makeProgressBar(stageData.numActiveTasks, stageData.numCompleteTasks,
+ stageData.numFailedTasks, s.numTasks)}
|
-
{inputRead} |
-
{shuffleRead} |
-
{shuffleWrite} |
+
{inputReadWithUnit} |
+
{shuffleReadWithUnit} |
+
{shuffleWriteWithUnit} |
}
/** Render an HTML row that represents a stage */
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
new file mode 100644
index 0000000000000..be11a11695b01
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.TaskInfo
+
+import scala.collection.mutable.HashMap
+
+private[jobs] object UIData {
+
+ class ExecutorSummary {
+ var taskTime : Long = 0
+ var failedTasks : Int = 0
+ var succeededTasks : Int = 0
+ var inputBytes : Long = 0
+ var shuffleRead : Long = 0
+ var shuffleWrite : Long = 0
+ var memoryBytesSpilled : Long = 0
+ var diskBytesSpilled : Long = 0
+ }
+
+ class StageUIData {
+ var numActiveTasks: Int = _
+ var numCompleteTasks: Int = _
+ var numFailedTasks: Int = _
+
+ var executorRunTime: Long = _
+
+ var inputBytes: Long = _
+ var shuffleReadBytes: Long = _
+ var shuffleWriteBytes: Long = _
+ var memoryBytesSpilled: Long = _
+ var diskBytesSpilled: Long = _
+
+ var schedulingPool: String = ""
+ var description: Option[String] = None
+
+ var taskData = new HashMap[Long, TaskUIData]
+ var executorSummary = new HashMap[String, ExecutorSummary]
+ }
+
+ case class TaskUIData(
+ taskInfo: TaskInfo,
+ taskMetrics: Option[TaskMetrics] = None,
+ errorMessage: Option[String] = None)
+}
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 6a95dc06e155d..9dcdafdd6350e 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -196,6 +196,5 @@ private[spark] class FileLogger(
def stop() {
hadoopDataStream.foreach(_.close())
writer.foreach(_.close())
- fileSystem.close()
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 47eb44b530379..2ff8b25a56d10 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -527,8 +527,9 @@ private[spark] object JsonProtocol {
metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long]
metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long]
metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long]
- metrics.shuffleReadMetrics =
- Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
+ Utils.jsonOption(json \ "Shuffle Read Metrics").map { shuffleReadMetrics =>
+ metrics.updateShuffleReadMetrics(shuffleReadMetricsFromJson(shuffleReadMetrics))
+ }
metrics.shuffleWriteMetrics =
Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
metrics.inputMetrics =
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a2454e120a8ab..5784e974fbb67 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -44,7 +44,7 @@ import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
/** CallSite represents a place in user code. It can have a short and a long form. */
-private[spark] case class CallSite(val short: String, val long: String)
+private[spark] case class CallSite(short: String, long: String)
/**
* Various utility methods used by Spark.
@@ -809,7 +809,12 @@ private[spark] object Utils extends Logging {
*/
def getCallSite: CallSite = {
val trace = Thread.currentThread.getStackTrace()
- .filterNot(_.getMethodName.contains("getStackTrace"))
+ .filterNot { ste:StackTraceElement =>
+ // When running under some profilers, the current stack trace might contain some bogus
+ // frames. This is intended to ensure that we don't crash in these situations by
+ // ignoring any frames that we can't examine.
+ (ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace"))
+ }
// Keep crawling up the stack trace until we find the first function not inside of the spark
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
@@ -1286,4 +1291,19 @@ private[spark] object Utils extends Logging {
}
}
+ /** Return a nice string representation of the exception, including the stack trace. */
+ def exceptionString(e: Exception): String = {
+ if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
+ }
+
+ /** Return a nice string representation of the exception, including the stack trace. */
+ def exceptionString(
+ className: String,
+ description: String,
+ stackTrace: Array[StackTraceElement]): String = {
+ val desc = if (description == null) "" else description
+ val st = if (stackTrace == null) "" else stackTrace.map(" " + _).mkString("\n")
+ s"$className: $desc\n$st"
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 292d0962f4fdb..765254bf4c36e 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -268,10 +268,10 @@ class ExternalAppendOnlyMap[K, V, C](
private def mergeIfKeyExists(key: K, baseCombiner: C, buffer: StreamBuffer): C = {
var i = 0
while (i < buffer.pairs.length) {
- val (k, c) = buffer.pairs(i)
- if (k == key) {
+ val pair = buffer.pairs(i)
+ if (pair._1 == key) {
buffer.pairs.remove(i)
- return mergeCombiners(baseCombiner, c)
+ return mergeCombiners(baseCombiner, pair._2)
}
i += 1
}
@@ -293,9 +293,11 @@ class ExternalAppendOnlyMap[K, V, C](
}
// Select a key from the StreamBuffer that holds the lowest key hash
val minBuffer = mergeHeap.dequeue()
- val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
+ val minPairs = minBuffer.pairs
+ val minHash = minBuffer.minKeyHash
val minPair = minPairs.remove(0)
- var (minKey, minCombiner) = minPair
+ val minKey = minPair._1
+ var minCombiner = minPair._2
assert(getKeyHashCode(minPair) == minHash)
// For all other streams that may have this key (i.e. have the same minimum key hash),
diff --git a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
index a79e3ee756fc6..d10141b90e621 100644
--- a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
@@ -17,8 +17,54 @@
package org.apache.spark.util.random
+import scala.reflect.ClassTag
+import scala.util.Random
+
private[spark] object SamplingUtils {
+ /**
+ * Reservoir sampling implementation that also returns the input size.
+ *
+ * @param input input size
+ * @param k reservoir size
+ * @param seed random seed
+ * @return (samples, input size)
+ */
+ def reservoirSampleAndCount[T: ClassTag](
+ input: Iterator[T],
+ k: Int,
+ seed: Long = Random.nextLong())
+ : (Array[T], Int) = {
+ val reservoir = new Array[T](k)
+ // Put the first k elements in the reservoir.
+ var i = 0
+ while (i < k && input.hasNext) {
+ val item = input.next()
+ reservoir(i) = item
+ i += 1
+ }
+
+ // If we have consumed all the elements, return them. Otherwise do the replacement.
+ if (i < k) {
+ // If input size < k, trim the array to return only an array of input size.
+ val trimReservoir = new Array[T](i)
+ System.arraycopy(reservoir, 0, trimReservoir, 0, i)
+ (trimReservoir, i)
+ } else {
+ // If input size > k, continue the sampling process.
+ val rand = new XORShiftRandom(seed)
+ while (input.hasNext) {
+ val item = input.next()
+ val replacementIndex = rand.nextInt(i)
+ if (replacementIndex < k) {
+ reservoir(replacementIndex) = item
+ }
+ i += 1
+ }
+ (reservoir, i)
+ }
+ }
+
/**
* Returns a sampling rate that guarantees a sample of size >= sampleSizeLowerBound 99.99% of
* the time.
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index e755d2e309398..2229e6acc425d 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -104,8 +104,9 @@ class FailureSuite extends FunSuite with LocalSparkContext {
results.collect()
}
assert(thrown.getClass === classOf[SparkException])
- assert(thrown.getMessage.contains("NotSerializableException") ||
- thrown.getCause.getClass === classOf[NotSerializableException])
+ assert(thrown.getMessage.contains("serializable") ||
+ thrown.getCause.getClass === classOf[NotSerializableException],
+ "Exception does not contain \"serializable\": " + thrown.getMessage)
FailureSuiteState.clear()
}
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 070e974657860..c70e22cf09433 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -177,6 +177,31 @@ class FileSuite extends FunSuite with LocalSparkContext {
assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
}
+ test("object files of classes from a JAR") {
+ val original = Thread.currentThread().getContextClassLoader
+ val className = "FileSuiteObjectFileTest"
+ val jar = TestUtils.createJarWithClasses(Seq(className))
+ val loader = new java.net.URLClassLoader(Array(jar), Utils.getContextOrSparkClassLoader)
+ Thread.currentThread().setContextClassLoader(loader)
+ try {
+ sc = new SparkContext("local", "test")
+ val objs = sc.makeRDD(1 to 3).map { x =>
+ val loader = Thread.currentThread().getContextClassLoader
+ Class.forName(className, true, loader).newInstance()
+ }
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ objs.saveAsObjectFile(outputDir)
+ // Try reading the output back as an object file
+ val ct = reflect.ClassTag[Any](Class.forName(className, true, loader))
+ val output = sc.objectFile[Any](outputDir)
+ assert(output.collect().size === 3)
+ assert(output.collect().head.getClass.getName === className)
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(original)
+ }
+ }
+
test("write SequenceFile using new Hadoop API") {
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
sc = new SparkContext("local", "test")
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index c4f2f7e34f4d5..237e644b48e49 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -240,7 +240,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
}
assert(thrown.getClass === classOf[SparkException])
- assert(thrown.getMessage.contains("NotSerializableException"))
+ assert(thrown.getMessage.toLowerCase.contains("serializable"))
}
}
diff --git a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
similarity index 98%
rename from core/src/test/scala/org/apache/spark/BroadcastSuite.scala
rename to core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index c9936256a5b95..7c3d0208b195a 100644
--- a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -15,14 +15,12 @@
* limitations under the License.
*/
-package org.apache.spark
+package org.apache.spark.broadcast
+import org.apache.spark.storage.{BroadcastBlockId, _}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException}
import org.scalatest.FunSuite
-import org.apache.spark.storage._
-import org.apache.spark.broadcast.{Broadcast, HttpBroadcast}
-import org.apache.spark.storage.BroadcastBlockId
-
class BroadcastSuite extends FunSuite with LocalSparkContext {
private val httpConf = broadcastConf("HttpBroadcastFactory")
diff --git a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index 68a0ea36aa545..3f882a724b047 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -46,7 +46,13 @@ class CompressionCodecSuite extends FunSuite {
test("default compression codec") {
val codec = CompressionCodec.createCodec(conf)
- assert(codec.getClass === classOf[LZFCompressionCodec])
+ assert(codec.getClass === classOf[SnappyCompressionCodec])
+ testCodec(codec)
+ }
+
+ test("lz4 compression codec") {
+ val codec = CompressionCodec.createCodec(conf, classOf[LZ4CompressionCodec].getName)
+ assert(codec.getClass === classOf[LZ4CompressionCodec])
testCodec(codec)
}
diff --git a/core/src/test/scala/org/apache/spark/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
similarity index 97%
rename from core/src/test/scala/org/apache/spark/ConnectionManagerSuite.scala
rename to core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
index df6b2604c8d8a..415ad8c432c12 100644
--- a/core/src/test/scala/org/apache/spark/ConnectionManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala
@@ -15,15 +15,14 @@
* limitations under the License.
*/
-package org.apache.spark
-
-import org.scalatest.FunSuite
+package org.apache.spark.network
import java.nio._
-import org.apache.spark.network.{ConnectionManager, Message, ConnectionManagerId}
-import scala.concurrent.Await
-import scala.concurrent.TimeoutException
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.scalatest.FunSuite
+
+import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration._
import scala.language.postfixOps
diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
similarity index 95%
rename from core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
rename to core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index db56a4acdd6f5..be972c5e97a7e 100644
--- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -15,25 +15,21 @@
* limitations under the License.
*/
-package org.apache.spark
+package org.apache.spark.rdd
import java.io.File
-import org.scalatest.FunSuite
-
-import org.apache.spark.rdd.{HadoopRDD, PipedRDD, HadoopPartition}
-import org.apache.hadoop.mapred.{JobConf, TextInputFormat, FileSplit}
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapred.{FileSplit, JobConf, TextInputFormat}
+import org.apache.spark._
+import org.scalatest.FunSuite
import scala.collection.Map
import scala.language.postfixOps
import scala.sys.process._
import scala.util.Try
-import org.apache.hadoop.io.{Text, LongWritable}
-
-import org.apache.spark.executor.TaskMetrics
-
class PipedRDDSuite extends FunSuite with SharedSparkContext {
test("basic pipe") {
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 0f9cbe213ea17..2924de112934c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -351,6 +351,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
}
}
+ // Test for SPARK-2412 -- ensure that the second pass of the algorithm does not throw an exception
+ test("coalesced RDDs with locality, fail first pass") {
+ val initialPartitions = 1000
+ val targetLen = 50
+ val couponCount = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt // = 492
+
+ val blocks = (1 to initialPartitions).map { i =>
+ (i, List(if (i > couponCount) "m2" else "m1"))
+ }
+ val data = sc.makeRDD(blocks)
+ val coalesced = data.coalesce(targetLen)
+ assert(coalesced.partitions.length == targetLen)
+ }
+
test("zipped RDDs") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val zipped = nums.zip(nums.map(_ + 1.0))
@@ -379,6 +393,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("mapWith") {
import java.util.Random
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
+ @deprecated("suppress compile time deprecation warning", "1.0.0")
val randoms = ones.mapWith(
(index: Int) => new Random(index + 42))
{(t: Int, prng: Random) => prng.nextDouble * t}.collect()
@@ -397,6 +412,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("flatMapWith") {
import java.util.Random
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
+ @deprecated("suppress compile time deprecation warning", "1.0.0")
val randoms = ones.flatMapWith(
(index: Int) => new Random(index + 42))
{(t: Int, prng: Random) =>
@@ -418,6 +434,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("filterWith") {
import java.util.Random
val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
+ @deprecated("suppress compile time deprecation warning", "1.0.0")
val sample = ints.filterWith(
(index: Int) => new Random(index + 42))
{(t: Int, prng: Random) => prng.nextInt(3) == 0}.
diff --git a/core/src/test/scala/org/apache/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala
similarity index 95%
rename from core/src/test/scala/org/apache/spark/ZippedPartitionsSuite.scala
rename to core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala
index 4f87fd8654c4a..72596e86865b2 100644
--- a/core/src/test/scala/org/apache/spark/ZippedPartitionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark
+package org.apache.spark.rdd
+import org.apache.spark.SharedSparkContext
import org.scalatest.FunSuite
object ZippedPartitionsSuite {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 71f48e295ecca..3b0b8e2f68c97 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -258,8 +258,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
taskMetrics.shuffleReadMetrics should be ('defined)
val sm = taskMetrics.shuffleReadMetrics.get
- sm.totalBlocksFetched should be > (0)
- sm.localBlocksFetched should be > (0)
+ sm.totalBlocksFetched should be (128)
+ sm.localBlocksFetched should be (128)
sm.remoteBlocksFetched should be (0)
sm.remoteBytesRead should be (0l)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 9ff2a487005c4..86b443b18f2a6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -54,6 +54,23 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
}
}
+// Get the rack for a given host
+object FakeRackUtil {
+ private val hostToRack = new mutable.HashMap[String, String]()
+
+ def cleanUp() {
+ hostToRack.clear()
+ }
+
+ def assignHostToRack(host: String, rack: String) {
+ hostToRack(host) = rack
+ }
+
+ def getRackForHost(host: String) = {
+ hostToRack.get(host)
+ }
+}
+
/**
* A mock TaskSchedulerImpl implementation that just remembers information about tasks started and
* feedback received from the TaskSetManagers. Note that it's important to initialize this with
@@ -69,6 +86,9 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
val taskSetsFailed = new ArrayBuffer[String]
val executors = new mutable.HashMap[String, String] ++ liveExecutors
+ for ((execId, host) <- liveExecutors; rack <- getRackForHost(host)) {
+ hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
+ }
dagScheduler = new FakeDAGScheduler(sc, this)
@@ -82,7 +102,12 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
def addExecutor(execId: String, host: String) {
executors.put(execId, host)
+ for (rack <- getRackForHost(host)) {
+ hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
+ }
}
+
+ override def getRackForHost(value: String): Option[String] = FakeRackUtil.getRackForHost(value)
}
/**
@@ -419,6 +444,9 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
}
test("new executors get added") {
+ // Assign host2 to rack2
+ FakeRackUtil.cleanUp()
+ FakeRackUtil.assignHostToRack("host2", "rack2")
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(4,
@@ -444,8 +472,39 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
manager.executorAdded()
// No-pref list now only contains task 3
assert(manager.pendingTasksWithNoPrefs.size === 1)
- // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
- assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
+ // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY
+ assert(manager.myLocalityLevels.sameElements(
+ Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
+ }
+
+ test("test RACK_LOCAL tasks") {
+ FakeRackUtil.cleanUp()
+ // Assign host1 to rack1
+ FakeRackUtil.assignHostToRack("host1", "rack1")
+ // Assign host2 to rack1
+ FakeRackUtil.assignHostToRack("host2", "rack1")
+ // Assign host3 to rack2
+ FakeRackUtil.assignHostToRack("host3", "rack2")
+ sc = new SparkContext("local", "test")
+ val sched = new FakeTaskScheduler(sc,
+ ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
+ val taskSet = FakeTask.createTaskSet(2,
+ Seq(TaskLocation("host1", "execA")),
+ Seq(TaskLocation("host1", "execA")))
+ val clock = new FakeClock
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+ assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
+ // Set allowed locality to ANY
+ clock.advance(LOCALITY_WAIT * 3)
+ // Offer host3
+ // No task is scheduled if we restrict locality to RACK_LOCAL
+ assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None)
+ // Task 0 can be scheduled with ANY
+ assert(manager.resourceOffer("execC", "host3", ANY).get.index === 0)
+ // Offer host2
+ // Task 1 can be scheduled with RACK_LOCAL
+ assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1)
}
test("do not emit warning when serialized task is small") {
diff --git a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
index 5d15a68ac7e4f..aad6599589420 100644
--- a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala
@@ -15,15 +15,12 @@
* limitations under the License.
*/
-package org.apache.spark.serializer;
-
-import java.io.NotSerializableException
+package org.apache.spark.serializer
import org.scalatest.FunSuite
+import org.apache.spark.{SharedSparkContext, SparkException}
import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkException
-import org.apache.spark.SharedSparkContext
/* A trivial (but unserializable) container for trivial functions */
class UnserializableClass {
@@ -38,52 +35,50 @@ class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContex
test("throws expected serialization exceptions on actions") {
val (data, uc) = fixture
-
val ex = intercept[SparkException] {
- data.map(uc.op(_)).count
+ data.map(uc.op(_)).count()
}
-
assert(ex.getMessage.contains("Task not serializable"))
}
// There is probably a cleaner way to eliminate boilerplate here, but we're
// iterating over a map from transformation names to functions that perform that
// transformation on a given RDD, creating one test case for each
-
+
for (transformation <-
- Map("map" -> xmap _, "flatMap" -> xflatMap _, "filter" -> xfilter _,
- "mapWith" -> xmapWith _, "mapPartitions" -> xmapPartitions _,
+ Map("map" -> xmap _,
+ "flatMap" -> xflatMap _,
+ "filter" -> xfilter _,
+ "mapPartitions" -> xmapPartitions _,
"mapPartitionsWithIndex" -> xmapPartitionsWithIndex _,
- "mapPartitionsWithContext" -> xmapPartitionsWithContext _,
- "filterWith" -> xfilterWith _)) {
+ "mapPartitionsWithContext" -> xmapPartitionsWithContext _)) {
val (name, xf) = transformation
-
+
test(s"$name transformations throw proactive serialization exceptions") {
val (data, uc) = fixture
-
val ex = intercept[SparkException] {
xf(data, uc)
}
-
assert(ex.getMessage.contains("Task not serializable"),
s"RDD.$name doesn't proactively throw NotSerializableException")
}
}
-
+
private def xmap(x: RDD[String], uc: UnserializableClass): RDD[String] =
x.map(y=>uc.op(y))
- private def xmapWith(x: RDD[String], uc: UnserializableClass): RDD[String] =
- x.mapWith(x => x.toString)((x,y)=>x + uc.op(y))
+
private def xflatMap(x: RDD[String], uc: UnserializableClass): RDD[String] =
x.flatMap(y=>Seq(uc.op(y)))
+
private def xfilter(x: RDD[String], uc: UnserializableClass): RDD[String] =
x.filter(y=>uc.pred(y))
- private def xfilterWith(x: RDD[String], uc: UnserializableClass): RDD[String] =
- x.filterWith(x => x.toString)((x,y)=>uc.pred(y))
+
private def xmapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] =
x.mapPartitions(_.map(y=>uc.op(y)))
+
private def xmapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): RDD[String] =
x.mapPartitionsWithIndex((_, it) => it.map(y=>uc.op(y)))
+
private def xmapPartitionsWithContext(x: RDD[String], uc: UnserializableClass): RDD[String] =
x.mapPartitionsWithContext((_, it) => it.map(y=>uc.op(y)))
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index fa43b66c6cb5a..b52f81877d557 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -47,11 +47,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
}
listener.completedStages.size should be (5)
- listener.completedStages.filter(_.stageId == 50).size should be (1)
- listener.completedStages.filter(_.stageId == 49).size should be (1)
- listener.completedStages.filter(_.stageId == 48).size should be (1)
- listener.completedStages.filter(_.stageId == 47).size should be (1)
- listener.completedStages.filter(_.stageId == 46).size should be (1)
+ listener.completedStages.count(_.stageId == 50) should be (1)
+ listener.completedStages.count(_.stageId == 49) should be (1)
+ listener.completedStages.count(_.stageId == 48) should be (1)
+ listener.completedStages.count(_.stageId == 47) should be (1)
+ listener.completedStages.count(_.stageId == 46) should be (1)
}
test("test executor id to summary") {
@@ -59,20 +59,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val listener = new JobProgressListener(conf)
val taskMetrics = new TaskMetrics()
val shuffleReadMetrics = new ShuffleReadMetrics()
-
- // nothing in it
- assert(listener.stageIdToExecutorSummaries.size == 0)
+ assert(listener.stageIdToData.size === 0)
// finish this task, should get updated shuffleRead
shuffleReadMetrics.remoteBytesRead = 1000
- taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
+ taskMetrics.updateShuffleReadMetrics(shuffleReadMetrics)
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0, null, null, 0, null)
val taskType = Utils.getFormattedClassName(task)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
- .shuffleRead == 1000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
+ .shuffleRead === 1000)
// finish a task with unknown executor-id, nothing should happen
taskInfo =
@@ -80,27 +78,23 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.size == 1)
+ assert(listener.stageIdToData.size === 1)
// finish this task, should get updated duration
- shuffleReadMetrics.remoteBytesRead = 1000
- taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
- .shuffleRead == 2000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
+ .shuffleRead === 2000)
// finish this task, should get updated duration
- shuffleReadMetrics.remoteBytesRead = 1000
- taskMetrics.shuffleReadMetrics = Some(shuffleReadMetrics)
taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0, null, null, 0, null)
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
- .shuffleRead == 1000)
+ assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail())
+ .shuffleRead === 1000)
}
test("test task success vs failure counting for different task end reasons") {
@@ -121,13 +115,17 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
TaskKilled,
ExecutorLostFailure,
UnknownReason)
+ var failCount = 0
for (reason <- taskFailedReasons) {
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
- assert(listener.stageIdToTasksComplete.get(task.stageId) === None)
+ failCount += 1
+ assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0)
+ assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
}
// Make sure we count success as success.
listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
- assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1))
+ assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1)
+ assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
}
}
diff --git a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
similarity index 99%
rename from core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
rename to core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
index 4ab870e751778..c4765e53de17b 100644
--- a/core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.spark
-
-import org.scalatest.FunSuite
+package org.apache.spark.util
import akka.actor._
+import org.apache.spark._
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.AkkaUtils
+import org.scalatest.FunSuite
+
import scala.concurrent.Await
/**
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index ca37d707b06ca..d2bee448d4d3b 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -135,12 +135,11 @@ class FileAppenderSuite extends FunSuite with BeforeAndAfter with Logging {
val testOutputStream = new PipedOutputStream()
val testInputStream = new PipedInputStream(testOutputStream)
val appender = FileAppender(testInputStream, testFile, conf)
- assert(appender.isInstanceOf[ExpectedAppender])
+ //assert(appender.getClass === classTag[ExpectedAppender].getClass)
assert(appender.getClass.getSimpleName ===
classTag[ExpectedAppender].runtimeClass.getSimpleName)
if (appender.isInstanceOf[RollingFileAppender]) {
val rollingPolicy = appender.asInstanceOf[RollingFileAppender].rollingPolicy
- rollingPolicy.isInstanceOf[ExpectedRollingPolicy]
val policyParam = if (rollingPolicy.isInstanceOf[TimeBasedRollingPolicy]) {
rollingPolicy.asInstanceOf[TimeBasedRollingPolicy].rolloverIntervalMillis
} else {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 058d31453081a..11f70a6090d24 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -518,7 +518,7 @@ class JsonProtocolSuite extends FunSuite {
sr.localBlocksFetched = e
sr.fetchWaitTime = a + d
sr.remoteBlocksFetched = f
- t.shuffleReadMetrics = Some(sr)
+ t.updateShuffleReadMetrics(sr)
}
sw.shuffleBytesWritten = a + b + c
sw.shuffleWriteTime = b + c + d
diff --git a/core/src/test/scala/org/apache/spark/util/VectorSuite.scala b/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
index 7006571ef0ef6..794a55d61750b 100644
--- a/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/VectorSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.FunSuite
/**
* Tests org.apache.spark.util.Vector functionality
*/
+@deprecated("suppress compile time deprecation warning", "1.0.0")
class VectorSuite extends FunSuite {
def verifyVector(vector: Vector, expectedLength: Int) = {
diff --git a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
index accfe2e9b7f2a..73a9d029b0248 100644
--- a/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/SamplingUtilsSuite.scala
@@ -17,11 +17,32 @@
package org.apache.spark.util.random
+import scala.util.Random
+
import org.apache.commons.math3.distribution.{BinomialDistribution, PoissonDistribution}
import org.scalatest.FunSuite
class SamplingUtilsSuite extends FunSuite {
+ test("reservoirSampleAndCount") {
+ val input = Seq.fill(100)(Random.nextInt())
+
+ // input size < k
+ val (sample1, count1) = SamplingUtils.reservoirSampleAndCount(input.iterator, 150)
+ assert(count1 === 100)
+ assert(input === sample1.toSeq)
+
+ // input size == k
+ val (sample2, count2) = SamplingUtils.reservoirSampleAndCount(input.iterator, 100)
+ assert(count2 === 100)
+ assert(input === sample2.toSeq)
+
+ // input size > k
+ val (sample3, count3) = SamplingUtils.reservoirSampleAndCount(input.iterator, 10)
+ assert(count3 === 100)
+ assert(sample3.length === 10)
+ }
+
test("computeFraction") {
// test that the computed fraction guarantees enough data points
// in the sample with a failure rate <= 0.0001
diff --git a/mllib/data/als/test.data b/data/mllib/als/test.data
similarity index 100%
rename from mllib/data/als/test.data
rename to data/mllib/als/test.data
diff --git a/data/kmeans_data.txt b/data/mllib/kmeans_data.txt
similarity index 100%
rename from data/kmeans_data.txt
rename to data/mllib/kmeans_data.txt
diff --git a/mllib/data/lr-data/random.data b/data/mllib/lr-data/random.data
similarity index 100%
rename from mllib/data/lr-data/random.data
rename to data/mllib/lr-data/random.data
diff --git a/data/lr_data.txt b/data/mllib/lr_data.txt
similarity index 100%
rename from data/lr_data.txt
rename to data/mllib/lr_data.txt
diff --git a/data/pagerank_data.txt b/data/mllib/pagerank_data.txt
similarity index 100%
rename from data/pagerank_data.txt
rename to data/mllib/pagerank_data.txt
diff --git a/mllib/data/ridge-data/lpsa.data b/data/mllib/ridge-data/lpsa.data
similarity index 100%
rename from mllib/data/ridge-data/lpsa.data
rename to data/mllib/ridge-data/lpsa.data
diff --git a/mllib/data/sample_libsvm_data.txt b/data/mllib/sample_libsvm_data.txt
similarity index 100%
rename from mllib/data/sample_libsvm_data.txt
rename to data/mllib/sample_libsvm_data.txt
diff --git a/mllib/data/sample_naive_bayes_data.txt b/data/mllib/sample_naive_bayes_data.txt
similarity index 100%
rename from mllib/data/sample_naive_bayes_data.txt
rename to data/mllib/sample_naive_bayes_data.txt
diff --git a/mllib/data/sample_svm_data.txt b/data/mllib/sample_svm_data.txt
similarity index 100%
rename from mllib/data/sample_svm_data.txt
rename to data/mllib/sample_svm_data.txt
diff --git a/mllib/data/sample_tree_data.csv b/data/mllib/sample_tree_data.csv
similarity index 100%
rename from mllib/data/sample_tree_data.csv
rename to data/mllib/sample_tree_data.csv
diff --git a/dev/create-release/create-release.sh b/dev/create-release/create-release.sh
index 49bf78f60763a..38830103d1e8d 100755
--- a/dev/create-release/create-release.sh
+++ b/dev/create-release/create-release.sh
@@ -95,7 +95,7 @@ make_binary_release() {
cp -r spark spark-$RELEASE_VERSION-bin-$NAME
cd spark-$RELEASE_VERSION-bin-$NAME
- ./make-distribution.sh $FLAGS --name $NAME --tgz
+ ./make-distribution.sh --name $NAME --tgz $FLAGS
cd ..
cp spark-$RELEASE_VERSION-bin-$NAME/spark-$RELEASE_VERSION-bin-$NAME.tgz .
rm -rf spark-$RELEASE_VERSION-bin-$NAME
@@ -111,9 +111,10 @@ make_binary_release() {
spark-$RELEASE_VERSION-bin-$NAME.tgz.sha
}
-make_binary_release "hadoop1" "--with-hive --hadoop 1.0.4"
-make_binary_release "cdh4" "--with-hive --hadoop 2.0.0-mr1-cdh4.2.0"
-make_binary_release "hadoop2" "--with-hive --with-yarn --hadoop 2.2.0"
+make_binary_release "hadoop1" "-Phive -Dhadoop.version=1.0.4"
+make_binary_release "cdh4" "-Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0"
+make_binary_release "hadoop2" \
+ "-Phive -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0"
# Copy data
echo "Copying release tarballs"
diff --git a/dev/run-tests b/dev/run-tests
index edd17b53b3d8c..51e4def0f835a 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -21,8 +21,7 @@
FWDIR="$(cd `dirname $0`/..; pwd)"
cd $FWDIR
-export SPARK_HADOOP_VERSION=2.3.0
-export SPARK_YARN=true
+export SBT_MAVEN_PROFILES="-Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0"
# Remove work directory
rm -rf ./work
@@ -66,8 +65,8 @@ echo "========================================================================="
# (either resolution or compilation) prompts the user for input either q, r,
# etc to quit or retry. This echo is there to make it not block.
if [ -n "$_RUN_SQL_TESTS" ]; then
- echo -e "q\n" | SPARK_HIVE=true sbt/sbt clean package assembly/assembly test | \
- grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
+ echo -e "q\n" | SBT_MAVEN_PROFILES="$SBT_MAVEN_PROFILES -Phive" sbt/sbt clean package \
+ assembly/assembly test | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
else
echo -e "q\n" | sbt/sbt clean package assembly/assembly test | \
grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
diff --git a/dev/scalastyle b/dev/scalastyle
index 0e8fd5cc8d64c..a02d06912f238 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -17,12 +17,12 @@
# limitations under the License.
#
-echo -e "q\n" | SPARK_HIVE=true sbt/sbt scalastyle > scalastyle.txt
+echo -e "q\n" | sbt/sbt -Phive scalastyle > scalastyle.txt
# Check style with YARN alpha built too
-echo -e "q\n" | SPARK_HADOOP_VERSION=0.23.9 SPARK_YARN=true sbt/sbt yarn-alpha/scalastyle \
+echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-0.23 -Dhadoop.version=0.23.9 yarn-alpha/scalastyle \
>> scalastyle.txt
# Check style with YARN built too
-echo -e "q\n" | SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt yarn/scalastyle \
+echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 yarn/scalastyle \
>> scalastyle.txt
ERRORS=$(cat scalastyle.txt | grep -e "\
")
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md
index b280df0c8eeb8..7e55131754a3f 100644
--- a/docs/bagel-programming-guide.md
+++ b/docs/bagel-programming-guide.md
@@ -46,7 +46,7 @@ import org.apache.spark.bagel.Bagel._
Next, we load a sample graph from a text file as a distributed dataset and package it into `PRVertex` objects. We also cache the distributed dataset because Bagel will use it multiple times and we'd like to avoid recomputing it.
{% highlight scala %}
-val input = sc.textFile("data/pagerank_data.txt")
+val input = sc.textFile("data/mllib/pagerank_data.txt")
val numVerts = input.count()
diff --git a/docs/configuration.md b/docs/configuration.md
index b84104cc7e653..a70007c165442 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -336,13 +336,12 @@ Apart from these, the following properties are also available, and may be useful
spark.io.compression.codec |
- org.apache.spark.io. LZFCompressionCodec |
+ org.apache.spark.io. SnappyCompressionCodec |
The codec used to compress internal data such as RDD partitions and shuffle outputs.
- By default, Spark provides two codecs: org.apache.spark.io.LZFCompressionCodec
- and org.apache.spark.io.SnappyCompressionCodec . Of these two choices,
- Snappy offers faster compression and decompression, while LZF offers a better compression
- ratio.
+ By default, Spark provides three codecs: org.apache.spark.io.LZ4CompressionCodec ,
+ org.apache.spark.io.LZFCompressionCodec ,
+ and org.apache.spark.io.SnappyCompressionCodec .
|
@@ -350,7 +349,15 @@ Apart from these, the following properties are also available, and may be useful
32768 |
Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec
- is used.
+ is used. Lowering this block size will also lower shuffle memory usage when Snappy is used.
+ |
+
+
+ spark.io.compression.lz4.block.size |
+ 32768 |
+
+ Block size (in bytes) used in LZ4 compression, in the case when LZ4 compression codec
+ is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used.
|
@@ -412,7 +419,7 @@ Apart from these, the following properties are also available, and may be useful
spark.broadcast.factory |
- org.apache.spark.broadcast. HttpBroadcastFactory |
+ org.apache.spark.broadcast. TorrentBroadcastFactory |
Which broadcast implementation to use.
|
@@ -699,6 +706,25 @@ Apart from these, the following properties are also available, and may be useful
(in milliseconds)
+
+ spark.scheduler.minRegisteredExecutorsRatio |
+ 0 |
+
+ The minimum ratio of registered executors (registered executors / total expected executors)
+ to wait for before scheduling begins. Specified as a double between 0 and 1.
+ Regardless of whether the minimum ratio of executors has been reached,
+ the maximum amount of time it will wait before scheduling begins is controlled by config
+ spark.scheduler.maxRegisteredExecutorsWaitingTime
+ |
+
+
+ spark.scheduler.maxRegisteredExecutorsWaitingTime |
+ 30000 |
+
+ Maximum amount of time to wait for executors to register before scheduling begins
+ (in milliseconds).
+ |
+
#### Security
@@ -773,6 +799,15 @@ Apart from these, the following properties are also available, and may be useful
into blocks of data before storing them in Spark.
+
+ spark.streaming.receiver.maxRate |
+ infinite |
+
+ Maximum rate (per second) at which each receiver will push data into blocks. Effectively,
+ each stream will consume at most this number of records per second.
+ Setting this configuration to 0 or a negative number will put no limit on the rate.
+ |
+
spark.streaming.unpersist |
true |
diff --git a/docs/hadoop-third-party-distributions.md b/docs/hadoop-third-party-distributions.md
index 32403bc6957a2..ab1023b8f1842 100644
--- a/docs/hadoop-third-party-distributions.md
+++ b/docs/hadoop-third-party-distributions.md
@@ -48,9 +48,9 @@ the _exact_ Hadoop version you are running to avoid any compatibility errors.
-In SBT, the equivalent can be achieved by setting the SPARK_HADOOP_VERSION flag:
+In SBT, the equivalent can be achieved by setting the the `hadoop.version` property:
- SPARK_HADOOP_VERSION=1.0.4 sbt/sbt assembly
+ sbt/sbt -Dhadoop.version=1.0.4 assembly
# Linking Applications to the Hadoop Version
diff --git a/docs/mllib-basics.md b/docs/mllib-basics.md
index 5796e16e8f99c..f9585251fafac 100644
--- a/docs/mllib-basics.md
+++ b/docs/mllib-basics.md
@@ -193,7 +193,7 @@ import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
-val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "mllib/data/sample_libsvm_data.txt")
+val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
{% endhighlight %}
@@ -207,7 +207,7 @@ import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.api.java.JavaRDD;
JavaRDD