Skip to content

Commit

Permalink
[SPARK-6754] Remove unnecessary TaskContextHelper
Browse files Browse the repository at this point in the history
The TaskContextHelper was originally necessary because TaskContext was written in Java, which does
not have a way to specify that classes are package-private, so TaskContextHelper existed to work
around this. Now that TaskContext has been re-written in Scala, this class is no longer necessary.

rxin can you look at this? It looks like you missed this bit of cleanup when you moved TaskContext from Java to Scala in apache#4324

cc ScrapCodes and pwendell who added this originally.

Author: Kay Ousterhout <[email protected]>

Closes apache#5402 from kayousterhout/SPARK-6754 and squashes the following commits:

f089800 [Kay Ousterhout] [SPARK-6754] Remove unnecessary TaskContextHelper
  • Loading branch information
kayousterhout authored and rxin committed Apr 8, 2015
1 parent d138aa8 commit 8d2a36c
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 34 deletions.
29 changes: 0 additions & 29 deletions core/src/main/scala/org/apache/spark/TaskContextHelper.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -645,13 +645,13 @@ class DAGScheduler(
val split = rdd.partitions(job.partitions(0))
val taskContext = new TaskContextImpl(job.finalStage.id, job.partitions(0), taskAttemptId = 0,
attemptNumber = 0, runningLocally = true)
TaskContextHelper.setTaskContext(taskContext)
TaskContext.setTaskContext(taskContext)
try {
val result = job.func(taskContext, rdd.iterator(split, taskContext))
job.listener.taskSucceeded(0, result)
} finally {
taskContext.markTaskCompleted()
TaskContextHelper.unset()
TaskContext.unset()
}
} catch {
case e: Exception =>
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.nio.ByteBuffer

import scala.collection.mutable.HashMap

import org.apache.spark.{TaskContextHelper, TaskContextImpl, TaskContext}
import org.apache.spark.{TaskContextImpl, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.ByteBufferInputStream
Expand Down Expand Up @@ -54,7 +54,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
final def run(taskAttemptId: Long, attemptNumber: Int): T = {
context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
TaskContextHelper.setTaskContext(context)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
taskThread = Thread.currentThread()
if (_killed) {
Expand All @@ -64,7 +64,7 @@ private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) ex
runTask(context)
} finally {
context.markTaskCompleted()
TaskContextHelper.unset()
TaskContext.unset()
}
}

Expand Down

0 comments on commit 8d2a36c

Please sign in to comment.