Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Gummelt committed Apr 21, 2017
1 parent 810c6b2 commit ad4e33b
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
org.apache.spark.deploy.security.HadoopFSCredentialProvider
org.apache.spark.deploy.security.HBaseCredentialProvider
org.apache.spark.deploy.security.HiveCredentialProvider
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ package org.apache.spark.deploy.security
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

/** [[HadoopAccessManager]] returns information related to how Hadoop delegation tokens should be
* fetched.
/**
* Methods in [[HadoopAccessManager]] return scheduler-specific information related to how Hadoop
* delegation tokens should be fetched.
*/
private[spark] trait HadoopAccessManager {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.deploy.security

import org.scalatest.{BeforeAndAfter, Matchers}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.Token
import org.scalatest.{BeforeAndAfter, Matchers}

import org.apache.spark.{SparkConf, SparkFunSuite}

class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
Expand Down Expand Up @@ -136,9 +138,9 @@ class TestCredentialProvider extends ServiceCredentialProvider {
override def credentialsRequired(conf: Configuration): Boolean = true

override def obtainCredentials(
hadoopConf: Configuration,
hadoopAccessManager: HadoopAccessManager,
creds: Credentials): Option[Long] = {
hadoopConf: Configuration,
hadoopAccessManager: HadoopAccessManager,
creds: Credentials): Option[Long] = {
if (creds == null) {
// Guard out other unit test failures.
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ trait ServiceCredentialProvider {
* renewal, otherwise None should be returned.
*/
def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long]
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long]
}


Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[yarn] class YARNConfigurableCredentialManager(
hadoopConf,
new YARNHadoopAccessManager(hadoopConf, sparkConf)) {

private val deprecatedCredentialProviders = getDeprecatedCredentialProviders
val deprecatedCredentialProviders = getDeprecatedCredentialProviders

def getDeprecatedCredentialProviders:
Map[String, org.apache.spark.deploy.yarn.security.ServiceCredentialProvider] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.deploy.yarn.security.YARNTestCredentialProvider
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package org.apache.spark.deploy.yarn.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.Token
import org.scalatest.{BeforeAndAfter, Matchers}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.security.ConfigurableCredentialManager

class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
class YARNConfigurableCredentialManagerSuite
extends SparkFunSuite with Matchers with BeforeAndAfter {
private var credentialManager: YARNConfigurableCredentialManager = null
private var sparkConf: SparkConf = null
private var hadoopConf: Configuration = null
Expand All @@ -33,21 +36,39 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit

sparkConf = new SparkConf()
hadoopConf = new Configuration()
System.setProperty("SPARK_YARN_MODE", "true")
}

override def afterAll(): Unit = {
System.clearProperty("SPARK_YARN_MODE")
test("Correctly loads deprecated credential providers") {
credentialManager = new YARNConfigurableCredentialManager(sparkConf, hadoopConf)

super.afterAll()
credentialManager.deprecatedCredentialProviders.get("yarn-test") should not be (None)
}
}

test("Correctly load ") {
credentialManager = new YARNConfigurableCredentialManager(sparkConf, hadoopConf)
class YARNTestCredentialProvider extends ServiceCredentialProvider {
val tokenRenewalInterval = 86400 * 1000L
var timeOfNextTokenRenewal = 0L

override def serviceName: String = "yarn-test"

override def credentialsRequired(conf: Configuration): Boolean = true

override def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
creds: Credentials): Option[Long] = {
if (creds == null) {
// Guard out other unit test failures.
return None
}

val emptyToken = new Token()
emptyToken.setService(new Text(serviceName))
creds.addToken(emptyToken.getService, emptyToken)

val currTime = System.currentTimeMillis()
timeOfNextTokenRenewal = (currTime - currTime % tokenRenewalInterval) + tokenRenewalInterval

assert(credentialManager
.getServiceCredentialProvider("hadoopfs")
.get
.isInstanceOf[YARNHadoopAccessManager])
Some(timeOfNextTokenRenewal)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,32 @@
package org.apache.spark.deploy.yarn.security

import org.apache.hadoop.conf.Configuration
import org.scalatest.{Matchers, PrivateMethodTester}
import org.scalatest.Matchers

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.deploy.yarn.security.YARNHadoopAccessManager
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}

class YARNHadoopFSCredentialProviderSuite
extends SparkFunSuite
with PrivateMethodTester
with Matchers {
private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)

private def getTokenRenewer(
fsCredentialProvider: YARNHadoopAccessManager, conf: Configuration): String = {
fsCredentialProvider invokePrivate _getTokenRenewer(conf)
}

private var hadoopFsCredentialProvider: YARNHadoopAccessManager = null

override def beforeAll() {
super.beforeAll()

if (hadoopFsCredentialProvider == null) {
hadoopFsCredentialProvider = new YARNHadoopAccessManager()
}
}

override def afterAll() {
if (hadoopFsCredentialProvider != null) {
hadoopFsCredentialProvider = null
}

super.afterAll()
}
class YARNHadoopAccessManagerSuite extends SparkFunSuite with Matchers {

test("check token renewer") {
val hadoopConf = new Configuration()
hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:[email protected]")
val renewer = getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)

val sparkConf = new SparkConf()
val yarnHadoopAccessManager = new YARNHadoopAccessManager(hadoopConf, sparkConf)

val renewer = yarnHadoopAccessManager.getTokenRenewer
renewer should be ("yarn/myrm:[email protected]")
}

test("check token renewer default") {
val hadoopConf = new Configuration()
val sparkConf = new SparkConf()
val yarnHadoopAccessManager = new YARNHadoopAccessManager(hadoopConf, sparkConf)

val caught =
intercept[SparkException] {
getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
yarnHadoopAccessManager.getTokenRenewer
}
assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
}
Expand Down

0 comments on commit ad4e33b

Please sign in to comment.