diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index de41053176313..e1bc3413819b6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -851,7 +851,6 @@ class DAGScheduler( stage.pendingTasks.clear() // First figure out the indexes of partition ids to compute. - println(s"finding partitions to compute for $stage") val partitionsToCompute: Seq[Int] = { stage match { case stage: ShuffleMapStage => @@ -945,7 +944,6 @@ class DAGScheduler( s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" } logDebug(debugString) - println(debugString) } } @@ -1064,7 +1062,6 @@ class DAGScheduler( if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) { markStageAsFinished(shuffleStage) - println(s"marking $shuffleStage as finished") logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) @@ -1091,7 +1088,6 @@ class DAGScheduler( .map(_._2).mkString(", ")) submitStage(shuffleStage) } else { - println(s"looking for newly runnable stage") val newlyRunnable = new ArrayBuffer[Stage] for (shuffleStage <- waitingStages) { logInfo("Missing parents for " + shuffleStage + ": " + @@ -1102,7 +1098,6 @@ class DAGScheduler( newlyRunnable += shuffleStage } val newlyRunnableWithJob = newlyRunnable.map{x => x -> activeJobForStage(x)} - println(s"newly runnable stages = $newlyRunnableWithJob") waitingStages --= newlyRunnable runningStages ++= newlyRunnable for { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3f65996ea5fe8..63610add8e747 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -88,7 +88,6 @@ class DAGSchedulerSuite // normally done by TaskSetManager taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) taskSets += taskSet - println(s"submitting taskSet $taskSet. taskSets = $taskSets") } override def cancelTasks(stageId: Int, interruptThread: Boolean) { cancelledStages += stageId @@ -558,18 +557,11 @@ class DAGSchedulerSuite /** This tests the case where another FetchFailed comes in while the map stage is getting * re-run. */ test("late fetch failures don't cause multiple concurrent attempts for the same map stage") { - println("begin late fetch failure") val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) - val jobId = submit(reduceRdd, Array(0, 1)) - println(s"late fetch failure: jobId = $jobId") - println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}") - println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}") - println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}") - println(s"late fetch failure: runningStages = ${scheduler.runningStages}") - println(s"late fetch failure: failedStages = ${scheduler.failedStages}") + submit(reduceRdd, Array(0, 1)) val mapStageId = 0 def countSubmittedMapStageAttempts(): Int = { @@ -579,26 +571,14 @@ class DAGSchedulerSuite // The map stage should have been submitted. assert(countSubmittedMapStageAttempts() === 1) - println("late fetch failure: taskSets = " + taskSets) - println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}") - println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}") - println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}") - println(s"late fetch failure: runningStages = ${scheduler.runningStages}") - println(s"late fetch failure: failedStages = ${scheduler.failedStages}") complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)))) + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) // The MapOutputTracker should know about both map output locations. assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB")) - - println("late fetch failure: taskSets = " + taskSets) - println("late fetch failure: submittedStages = " + sparkListener.submittedStageInfos) - println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}") - println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}") - println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}") - println(s"late fetch failure: runningStages = ${scheduler.runningStages}") - println(s"late fetch failure: failedStages = ${scheduler.failedStages}") + assert(mapOutputTracker.getServerStatuses(shuffleId, 1).map(_._1.host) === + Array("hostA", "hostB")) // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(CompletionEvent( @@ -643,7 +623,6 @@ class DAGSchedulerSuite */ test("extremely late fetch failures don't cause multiple concurrent attempts for " + "the same stage") { - println("begin extremely late fetch failure") val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId @@ -661,17 +640,15 @@ class DAGSchedulerSuite sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedMapStageAttempts() === 1) - println("extremely late fetch failure: taskSets = " + taskSets) // Complete the map stage. complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)))) + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) // The reduce stage should have been submitted. sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(countSubmittedReduceStageAttempts() === 1) - println("extremely late fetch failure: taskSets = " + taskSets) // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(CompletionEvent( taskSets(1).tasks(0),