Skip to content

Commit

Permalink
[SPARK-19537] Move pendingPartitions to ShuffleMapStage.
Browse files Browse the repository at this point in the history
The pendingPartitions instance variable should be moved to ShuffleMapStage,
because it is only used by ShuffleMapStages. This change is purely refactoring
and does not change functionality.

I fixed this in an attempt to clarify some of the discussion around apache#16620, which I was having trouble reasoning about.  I stole the helpful comment Imran wrote for pendingPartitions and used it here.

cc squito markhamstra jinxing64

Author: Kay Ousterhout <[email protected]>

Closes apache#16876 from kayousterhout/SPARK-19537.
  • Loading branch information
kayousterhout authored and xuanyuanking committed Sep 15, 2017
1 parent e49c997 commit 0337d23
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 9 deletions.
20 changes: 13 additions & 7 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -931,8 +931,6 @@ class DAGScheduler(
/** Called when stage's parents are available and we can now do its task. */
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingPartitions.clear()

// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
Expand Down Expand Up @@ -1011,9 +1009,11 @@ class DAGScheduler(
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
Expand All @@ -1037,9 +1037,8 @@ class DAGScheduler(
}

if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
Expand Down Expand Up @@ -1144,7 +1143,6 @@ class DAGScheduler(
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
// Cast to ResultStage here because it's part of the ResultTask
Expand Down Expand Up @@ -1180,6 +1178,7 @@ class DAGScheduler(

case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
shuffleStage.pendingPartitions -= task.partitionId
updateAccumulators(event)
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
Expand Down Expand Up @@ -1232,7 +1231,14 @@ class DAGScheduler(

case Resubmitted =>
logInfo("Resubmitted " + task + ", so marking it as still running")
stage.pendingPartitions += task.partitionId
stage match {
case sms: ShuffleMapStage =>
sms.pendingPartitions += task.partitionId

case _ =>
assert(false, "TaskSetManagers should only send Resubmitted task statuses for " +
"tasks in ShuffleMapStages.")
}

case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
val failedStage = stageIdToStage(task.stageId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import scala.collection.mutable.HashSet

import org.apache.spark.ShuffleDependency
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.BlockManagerId
Expand Down Expand Up @@ -47,6 +49,17 @@ private[spark] class ShuffleMapStage(

private[this] var _numAvailableOutputs: Int = 0

/**
* Partitions that either haven't yet been computed, or that were computed on an executor
* that has since been lost, so should be re-computed. This variable is used by the
* DAGScheduler to determine when a stage has completed. Task successes in both the active
* attempt for the stage or in earlier attempts for this stage can cause paritition ids to get
* removed from pendingPartitions. As a result, this variable may be inconsistent with the pending
* tasks in the TaskSetManager for the active attempt for the stage (the partitions stored here
* will always be a subset of the partitions that the TaskSetManager thinks are pending).
*/
val pendingPartitions = new HashSet[Int]

/**
* List of [[MapStatus]] for each partition. The index of the array is the map partition id,
* and each value in the array is the list of possible [[MapStatus]] for a partition
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Stage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ private[scheduler] abstract class Stage(
/** Set of jobs that this stage belongs to. */
val jobIds = new HashSet[Int]

val pendingPartitions = new HashSet[Int]

/** The ID to use for the next new attempt for this stage. */
private var nextAttemptId: Int = 0

Expand Down

0 comments on commit 0337d23

Please sign in to comment.