Skip to content

Commit

Permalink
Some changes after working with andrew or
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Apr 8, 2014
1 parent f7d124f commit 037755c
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 29 deletions.
27 changes: 18 additions & 9 deletions core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,47 +21,56 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId

/**
* <span class="developer badge">Developer API</span>
* Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry
* tasks several times for "ephemeral" failures, and only report back failures that require some
* old stages to be resubmitted, such as shuffle map fetch failures.
*/
private[spark] sealed trait TaskEndReason

private[spark] case object Success extends TaskEndReason
sealed trait TaskEndReason

private[spark]
/** <span class="developer badge">Developer API</span> */
case object Success extends TaskEndReason

/** <span class="developer badge">Developer API</span> */
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it

private[spark] case class FetchFailed(
/** <span class="developer badge">Developer API</span> */
case class FetchFailed(
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int)
extends TaskEndReason

private[spark] case class ExceptionFailure(
/** <span class="developer badge">Developer API</span> */
case class ExceptionFailure(
className: String,
description: String,
stackTrace: Array[StackTraceElement],
metrics: Option[TaskMetrics])
extends TaskEndReason

/**
* <span class="developer badge">Developer API</span>
* The task finished successfully, but the result was lost from the executor's block manager before
* it was fetched.
*/
private[spark] case object TaskResultLost extends TaskEndReason
case object TaskResultLost extends TaskEndReason

private[spark] case object TaskKilled extends TaskEndReason
/** <span class="developer badge">Developer API</span> */
case object TaskKilled extends TaskEndReason

/**
* <span class="developer badge">Developer API</span>
* The task failed because the executor that it was running on was lost. This may happen because
* the task crashed the JVM.
*/
private[spark] case object ExecutorLostFailure extends TaskEndReason
case object ExecutorLostFailure extends TaskEndReason

/**
* <span class="developer badge">Developer API</span>
* We don't know why the task ended -- for example, because of a ClassNotFound exception when
* deserializing the task result.
*/
private[spark] case object UnknownReason extends TaskEndReason
case object UnknownReason extends TaskEndReason
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ class TaskMetrics extends Serializable {
var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None
}

object TaskMetrics {
private[spark] def empty(): TaskMetrics = new TaskMetrics
private[spark] object TaskMetrics {
def empty(): TaskMetrics = new TaskMetrics
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package org.apache.spark.scheduler

/**
* <span class="developer badge">Developer API</span>
* A result of a job in the DAGScheduler.
*/
private[spark] sealed trait JobResult
sealed trait JobResult

private[spark] case object JobSucceeded extends JobResult
case object JobSucceeded extends JobResult

// A failed stage ID of -1 means there is not a particular stage that caused the failure
private[spark] case class JobFailed(exception: Exception, failedStageId: Int) extends JobResult
case class JobFailed(exception: Exception, failedStageId: Int) extends JobResult
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package org.apache.spark.scheduler
import org.apache.spark.storage.RDDInfo

/**
* <span class="developer badge">Developer API</span>
* Stores information about a stage to pass from the scheduler to SparkListeners.
*/
private[spark]
class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfo: RDDInfo) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.scheduler

/**
* <span class="developer badge">Developer API</span>
* Information about a running task attempt inside a TaskSet.
*/
private[spark]
class TaskInfo(
val taskId: Long,
val index: Int,
Expand All @@ -46,15 +46,15 @@ class TaskInfo(

var serializedSize: Int = 0

def markGettingResult(time: Long = System.currentTimeMillis) {
private[spark] def markGettingResult(time: Long = System.currentTimeMillis) {
gettingResultTime = time
}

def markSuccessful(time: Long = System.currentTimeMillis) {
private[spark] def markSuccessful(time: Long = System.currentTimeMillis) {
finishTime = time
}

def markFailed(time: Long = System.currentTimeMillis) {
private[spark] def markFailed(time: Long = System.currentTimeMillis) {
finishTime = time
failed = true
}
Expand Down Expand Up @@ -83,11 +83,11 @@ class TaskInfo(

def duration: Long = {
if (!finished) {
throw new UnsupportedOperationException("duration() called on unfinished tasks")
throw new UnsupportedOperationException("duration() called on unfinished task")
} else {
finishTime - launchTime
}
}

def timeRunning(currentTime: Long): Long = currentTime - launchTime
private[spark] def timeRunning(currentTime: Long): Long = currentTime - launchTime
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.scheduler

private[spark] object TaskLocality extends Enumeration {
/** <span class="developer badge">Developer API</span> */
object TaskLocality extends Enumeration {
// Process local is expected to be used ONLY within TaskSetManager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class StorageStatus(
}
}

private[spark]

/** <span class="developer badge">Developer API</span> */
class RDDInfo(val id: Int, val name: String, val numPartitions: Int, val storageLevel: StorageLevel)
extends Ordered[RDDInfo] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.util.collection
import java.util.{Arrays, Comparator}

/**
* <span class="developer badge">Developer API</span>
* A simple open hash table optimized for the append-only use case, where keys
* are never removed, but the value for each key may be changed.
*
Expand All @@ -29,9 +30,8 @@ import java.util.{Arrays, Comparator}
*
* TODO: Cache the hash values of each key? java.util.HashMap does that.
*/
private[spark]
class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K,
V)] with Serializable {
class AppendOnlyMap[K, V](initialCapacity: Int = 64)
extends Iterable[(K, V)] with Serializable {
require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
require(initialCapacity >= 1, "Invalid initial capacity")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BlockId, BlockManager}

/**
* <span class="developer badge">Developer API</span>
* An append-only map that spills sorted content to disk when there is insufficient space for it
* to grow.
*
Expand All @@ -55,8 +56,7 @@ import org.apache.spark.storage.{BlockId, BlockManager}
* `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of
* this threshold, in case map size estimation is not sufficiently accurate.
*/

private[spark] class ExternalAppendOnlyMap[K, V, C](
class ExternalAppendOnlyMap[K, V, C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ package org.apache.spark.util.collection
import scala.reflect.ClassTag

/**
* <span class="developer badge">Developer API</span>
* A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
* but not deletions. This map is about 5X faster than java.util.HashMap, while using much less
* space overhead.
*
* Under the hood, it uses our OpenHashSet implementation.
*/
private[spark]
class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag](
initialCapacity: Int)
extends Iterable[(K, V)]
Expand Down

0 comments on commit 037755c

Please sign in to comment.