Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
sagacifyTestUser committed Mar 25, 2015
2 parents 2338da5 + c5cc414 commit 5f09434
Show file tree
Hide file tree
Showing 119 changed files with 1,348 additions and 881 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[deploy] class ApplicationInfo(
}
}

private[master] val requestedCores = desc.maxCores.getOrElse(defaultCores)
private val requestedCores = desc.maxCores.getOrElse(defaultCores)

private[master] def coresLeft: Int = requestedCores - coresGranted

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,12 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
val workers = state.workers.sortBy(_.id)
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)

val activeAppHeaders = Seq("Application ID", "Name", "Cores in Use",
"Cores Requested", "Memory per Node", "Submitted Time", "User", "State", "Duration")
val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
"User", "State", "Duration")
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val activeAppsTable = UIUtils.listingTable(activeAppHeaders, activeAppRow, activeApps)

val completedAppHeaders = Seq("Application ID", "Name", "Cores Requested", "Memory per Node",
"Submitted Time", "User", "State", "Duration")
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
val completedApps = state.completedApps.sortBy(_.endTime).reverse
val completedAppsTable = UIUtils.listingTable(completedAppHeaders, completeAppRow,
completedApps)
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)

val driverHeaders = Seq("Submission ID", "Submitted Time", "Worker", "State", "Cores",
"Memory", "Main Class")
Expand Down Expand Up @@ -191,7 +187,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</tr>
}

private def appRow(app: ApplicationInfo, active: Boolean): Seq[Node] = {
private def appRow(app: ApplicationInfo): Seq[Node] = {
val killLink = if (parent.killEnabled &&
(app.state == ApplicationState.RUNNING || app.state == ApplicationState.WAITING)) {
val killLinkUri = s"app/kill?id=${app.id}&terminate=true"
Expand All @@ -201,7 +197,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
(<a href={killLinkUri} onclick={confirm}>kill</a>)
</span>
}

<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
Expand All @@ -210,15 +205,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td>
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
</td>
{
if (active) {
<td>
{app.coresGranted}
</td>
}
}
<td>
{if (app.requestedCores == Int.MaxValue) "*" else app.requestedCores}
{app.coresGranted}
</td>
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
Expand All @@ -230,14 +218,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</tr>
}

private def activeAppRow(app: ApplicationInfo): Seq[Node] = {
appRow(app, active = true)
}

private def completeAppRow(app: ApplicationInfo): Seq[Node] = {
appRow(app, active = false)
}

private def driverRow(driver: DriverInfo): Seq[Node] = {
val killLink = if (parent.killEnabled &&
(driver.state == DriverState.RUNNING ||
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ class TaskInfo(

def status: String = {
if (running) {
"RUNNING"
} else if (gettingResult) {
"GET RESULT"
if (gettingResult) {
"GET RESULT"
} else {
"RUNNING"
}
} else if (failed) {
"FAILED"
} else if (successful) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class FileShuffleBlockManager(conf: SparkConf)
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null

val openStartTime = System.nanoTime
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
Expand All @@ -135,6 +136,9 @@ class FileShuffleBlockManager(conf: SparkConf)
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
}
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, so should be included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)

override def releaseWriters(success: Boolean) {
if (consolidateShuffleFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C](
sorter.insertAll(records)
}

// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
Expand Down
25 changes: 18 additions & 7 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +: getFormattedTimeQuantiles(serializationTimes)

val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
if (info.gettingResultTime > 0) {
(info.finishTime - info.gettingResultTime).toDouble
} else {
0.0
}
getGettingResultTime(info).toDouble
}
val gettingResultQuantiles =
<td>
Expand Down Expand Up @@ -464,7 +460,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
val gettingResultTime = info.gettingResultTime
val gettingResultTime = getGettingResultTime(info)

val maybeAccumulators = info.accumulables
val accumulatorsReadable = maybeAccumulators.map{acc => s"${acc.name}: ${acc.update.get}"}
Expand Down Expand Up @@ -627,6 +623,19 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<td>{errorSummary}{details}</td>
}

private def getGettingResultTime(info: TaskInfo): Long = {
if (info.gettingResultTime > 0) {
if (info.finishTime > 0) {
info.finishTime - info.gettingResultTime
} else {
// The task is still fetching the result.
System.currentTimeMillis - info.gettingResultTime
}
} else {
0L
}
}

private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = {
val totalExecutionTime =
if (info.gettingResult) {
Expand All @@ -638,6 +647,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
}
val executorOverhead = (metrics.executorDeserializeTime +
metrics.resultSerializationTime)
math.max(0, totalExecutionTime - metrics.executorRunTime - executorOverhead)
math.max(
0,
totalExecutionTime - metrics.executorRunTime - executorOverhead - getGettingResultTime(info))
}
}
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1876,6 +1876,10 @@ private[spark] object Utils extends Logging {
startService: Int => (T, Int),
conf: SparkConf,
serviceName: String = ""): (T, Int) = {

require(startPort == 0 || (1024 <= startPort && startPort < 65536),
"startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.")

val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val maxRetries = portMaxRetries(conf)
for (offset <- 0 to maxRetries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,18 @@ private[spark] class ExternalSorter[K, V, C](
// Create our file writers if we haven't done so yet
if (partitionWriters == null) {
curWriteMetrics = new ShuffleWriteMetrics()
val openStartTime = System.nanoTime
partitionWriters = Array.fill(numPartitions) {
// Because these files may be read during shuffle, their compression must be controlled by
// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
// createTempShuffleBlock here; see SPARK-3426 for more context.
val (blockId, file) = diskBlockManager.createTempShuffleBlock()
blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open()
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
curWriteMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
}

// No need to sort stuff, just write each element out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag](

override def size: Int = if (haveNullValue) _keySet.size + 1 else _keySet.size

/** Tests whether this map contains a binding for a key. */
def contains(k: K): Boolean = {
if (k == null) {
haveNullValue
} else {
_keySet.getPos(k) != OpenHashSet.INVALID_POS
}
}

/** Get the value for a given key */
def apply(k: K): V = {
if (k == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
*/
def addWithoutResize(k: T): Int = {
var pos = hashcode(hasher.hash(k)) & _mask
var i = 1
var delta = 1
while (true) {
if (!_bitset.get(pos)) {
// This is a new key.
Expand All @@ -134,14 +134,12 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
// Found an existing key.
return pos
} else {
val delta = i
// quadratic probing with values increase by 1, 2, 3, ...
pos = (pos + delta) & _mask
i += 1
delta += 1
}
}
// Never reached here
assert(INVALID_POS != INVALID_POS)
INVALID_POS
throw new RuntimeException("Should never reach here.")
}

/**
Expand All @@ -163,21 +161,19 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
*/
def getPos(k: T): Int = {
var pos = hashcode(hasher.hash(k)) & _mask
var i = 1
val maxProbe = _data.size
while (i < maxProbe) {
var delta = 1
while (true) {
if (!_bitset.get(pos)) {
return INVALID_POS
} else if (k == _data(pos)) {
return pos
} else {
val delta = i
// quadratic probing with values increase by 1, 2, 3, ...
pos = (pos + delta) & _mask
i += 1
delta += 1
}
}
// Never reached here
INVALID_POS
throw new RuntimeException("Should never reach here.")
}

/** Return the value at the specified position. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,

override def size: Int = _keySet.size

/** Tests whether this map contains a binding for a key. */
def contains(k: K): Boolean = {
_keySet.getPos(k) != OpenHashSet.INVALID_POS
}

/** Get the value for a given key */
def apply(k: K): V = {
val pos = _keySet.getPos(k)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,14 @@ class OpenHashMapSuite extends FunSuite with Matchers {
assert(map(i.toString) === i.toString)
}
}

test("contains") {
val map = new OpenHashMap[String, Int](2)
map("a") = 1
assert(map.contains("a"))
assert(!map.contains("b"))
assert(!map.contains(null))
map(null) = 0
assert(map.contains(null))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,11 @@ class PrimitiveKeyOpenHashMapSuite extends FunSuite with Matchers {
assert(map(i.toLong) === i.toString)
}
}

test("contains") {
val map = new PrimitiveKeyOpenHashMap[Int, Int](1)
map(0) = 0
assert(map.contains(0))
assert(!map.contains(1))
}
}
18 changes: 9 additions & 9 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,15 @@ CURRENT_BLOCK=$BLOCK_BUILD
fi
}

echo ""
echo "========================================================================="
echo "Detecting binary incompatibilities with MiMa"
echo "========================================================================="

CURRENT_BLOCK=$BLOCK_MIMA

./dev/mima

echo ""
echo "========================================================================="
echo "Running Spark unit tests"
Expand Down Expand Up @@ -227,12 +236,3 @@ echo "========================================================================="
CURRENT_BLOCK=$BLOCK_PYSPARK_UNIT_TESTS

./python/run-tests

echo ""
echo "========================================================================="
echo "Detecting binary incompatibilities with MiMa"
echo "========================================================================="

CURRENT_BLOCK=$BLOCK_MIMA

./dev/mima
6 changes: 3 additions & 3 deletions dev/run-tests-codes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ readonly BLOCK_RAT=11
readonly BLOCK_SCALA_STYLE=12
readonly BLOCK_PYTHON_STYLE=13
readonly BLOCK_BUILD=14
readonly BLOCK_SPARK_UNIT_TESTS=15
readonly BLOCK_PYSPARK_UNIT_TESTS=16
readonly BLOCK_MIMA=17
readonly BLOCK_MIMA=15
readonly BLOCK_SPARK_UNIT_TESTS=16
readonly BLOCK_PYSPARK_UNIT_TESTS=17
4 changes: 2 additions & 2 deletions dev/run-tests-jenkins
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,12 @@ done
failing_test="Python style tests"
elif [ "$test_result" -eq "$BLOCK_BUILD" ]; then
failing_test="to build"
elif [ "$test_result" -eq "$BLOCK_MIMA" ]; then
failing_test="MiMa tests"
elif [ "$test_result" -eq "$BLOCK_SPARK_UNIT_TESTS" ]; then
failing_test="Spark unit tests"
elif [ "$test_result" -eq "$BLOCK_PYSPARK_UNIT_TESTS" ]; then
failing_test="PySpark unit tests"
elif [ "$test_result" -eq "$BLOCK_MIMA" ]; then
failing_test="MiMa tests"
else
failing_test="some tests"
fi
Expand Down
Loading

0 comments on commit 5f09434

Please sign in to comment.