Skip to content

Commit

Permalink
Merge pull request alteryx#23 from jerryshao/multi-user
Browse files Browse the repository at this point in the history
Add Spark multi-user support for standalone mode and Mesos

This PR add multi-user support for Spark both standalone mode and Mesos (coarse and fine grained ) mode, user can specify the user name who submit app through environment variable `SPARK_USER` or use default one. Executor will communicate with Hadoop using  specified user name.

Also I fixed one bug in JobLogger when different user wrote job log to specified folder which has no right file  permission.

I separate previous [PR750](mesos/spark#750) into two PRs, in this PR I only solve multi-user support problem. I will try to solve security auth problem in subsequent PR because security auth is a complicated problem especially for Shark Server like long-run app (both Kerberos TGT and HDFS delegation token should be renewed or re-created through app's run time).

(cherry picked from commit be7e8da)
Signed-off-by: Reynold Xin <[email protected]>
  • Loading branch information
rxin committed Nov 7, 2013
1 parent 1d9412b commit d5ae953
Show file tree
Hide file tree
Showing 5 changed files with 417 additions and 379 deletions.
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ class SparkContext(
executorEnvs ++= environment
}

// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Option {
Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER"))
}.getOrElse {
SparkContext.SPARK_UNKNOWN_USER
}
executorEnvs("SPARK_USER") = sparkUser

// Create and start the scheduler
private[spark] var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
Expand Down Expand Up @@ -984,6 +992,8 @@ object SparkContext {

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"

private[spark] val SPARK_UNKNOWN_USER = "<unknown>"

implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
Expand Down
18 changes: 15 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package org.apache.spark.deploy

import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.SparkException

Expand All @@ -27,6 +30,15 @@ import org.apache.spark.SparkException
*/
private[spark]
class SparkHadoopUtil {
val conf = newConfiguration()
UserGroupInformation.setConfiguration(conf)

def runAsUser(user: String)(func: () => Unit) {
val ugi = UserGroupInformation.createRemoteUser(user)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
}

/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
Expand All @@ -42,9 +54,9 @@ class SparkHadoopUtil {

def isYarnMode(): Boolean = { false }
}

object SparkHadoopUtil {
private val hadoop = {
private val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
Expand All @@ -56,7 +68,7 @@ object SparkHadoopUtil {
new SparkHadoopUtil
}
}

def get: SparkHadoopUtil = {
hadoop
}
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import java.util.concurrent._
import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap

import org.apache.spark.scheduler._
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -129,6 +130,8 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
Expand Down Expand Up @@ -176,7 +179,7 @@ private[spark] class Executor(
}
}

override def run() {
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
Expand Down
Loading

0 comments on commit d5ae953

Please sign in to comment.