-
Notifications
You must be signed in to change notification settings - Fork 28.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SPARK-20434][YARN][CORE] Move Hadoop delegation token code from yarn…
… to core ## What changes were proposed in this pull request? Move Hadoop delegation token code from `spark-yarn` to `spark-core`, so that other schedulers (such as Mesos), may use it. In order to avoid exposing Hadoop interfaces in spark-core, the new Hadoop delegation token classes are kept private. In order to provider backward compatiblity, and to allow YARN users to continue to load their own delegation token providers via Java service loading, the old YARN interfaces, as well as the client code that uses them, have been retained. Summary: - Move registered `yarn.security.ServiceCredentialProvider` classes from `spark-yarn` to `spark-core`. Moved them into a new, private hierarchy under `HadoopDelegationTokenProvider`. Client code in `HadoopDelegationTokenManager` now loads credentials from a whitelist of three providers (`HadoopFSDelegationTokenProvider`, `HiveDelegationTokenProvider`, `HBaseDelegationTokenProvider`), instead of service loading, which means that users are not able to implement their own delegation token providers, as they are in the `spark-yarn` module. - The `yarn.security.ServiceCredentialProvider` interface has been kept for backwards compatibility, and to continue to allow YARN users to implement their own delegation token provider implementations. Client code in YARN now fetches tokens via the new `YARNHadoopDelegationTokenManager` class, which fetches tokens from the core providers through `HadoopDelegationTokenManager`, as well as service loads them from `yarn.security.ServiceCredentialProvider`. Old Hierarchy: ``` yarn.security.ServiceCredentialProvider (service loaded) HadoopFSCredentialProvider HiveCredentialProvider HBaseCredentialProvider yarn.security.ConfigurableCredentialManager ``` New Hierarchy: ``` HadoopDelegationTokenManager HadoopDelegationTokenProvider (not service loaded) HadoopFSDelegationTokenProvider HiveDelegationTokenProvider HBaseDelegationTokenProvider yarn.security.ServiceCredentialProvider (service loaded) yarn.security.YARNHadoopDelegationTokenManager ``` ## How was this patch tested? unit tests Author: Michael Gummelt <[email protected]> Author: Dr. Stefan Schimanski <[email protected]> Closes #17723 from mgummelt/SPARK-20434-refactor-kerberos.
- Loading branch information
Michael Gummelt
authored and
Marcelo Vanzin
committed
Jun 15, 2017
1 parent
7dc3e69
commit a18d637
Showing
24 changed files
with
689 additions
and
532 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
119 changes: 119 additions & 0 deletions
119
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.security | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.fs.FileSystem | ||
import org.apache.hadoop.security.Credentials | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.internal.Logging | ||
|
||
/** | ||
* Manages all the registered HadoopDelegationTokenProviders and offer APIs for other modules to | ||
* obtain delegation tokens and their renewal time. By default [[HadoopFSDelegationTokenProvider]], | ||
* [[HiveDelegationTokenProvider]] and [[HBaseDelegationTokenProvider]] will be loaded in if not | ||
* explicitly disabled. | ||
* | ||
* Also, each HadoopDelegationTokenProvider is controlled by | ||
* spark.security.credentials.{service}.enabled, and will not be loaded if this config is set to | ||
* false. For example, Hive's delegation token provider [[HiveDelegationTokenProvider]] can be | ||
* enabled/disabled by the configuration spark.security.credentials.hive.enabled. | ||
* | ||
* @param sparkConf Spark configuration | ||
* @param hadoopConf Hadoop configuration | ||
* @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems. | ||
*/ | ||
private[spark] class HadoopDelegationTokenManager( | ||
sparkConf: SparkConf, | ||
hadoopConf: Configuration, | ||
fileSystems: Set[FileSystem]) | ||
extends Logging { | ||
|
||
private val deprecatedProviderEnabledConfigs = List( | ||
"spark.yarn.security.tokens.%s.enabled", | ||
"spark.yarn.security.credentials.%s.enabled") | ||
private val providerEnabledConfig = "spark.security.credentials.%s.enabled" | ||
|
||
// Maintain all the registered delegation token providers | ||
private val delegationTokenProviders = getDelegationTokenProviders | ||
logDebug(s"Using the following delegation token providers: " + | ||
s"${delegationTokenProviders.keys.mkString(", ")}.") | ||
|
||
private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = { | ||
val providers = List(new HadoopFSDelegationTokenProvider(fileSystems), | ||
new HiveDelegationTokenProvider, | ||
new HBaseDelegationTokenProvider) | ||
|
||
// Filter out providers for which spark.security.credentials.{service}.enabled is false. | ||
providers | ||
.filter { p => isServiceEnabled(p.serviceName) } | ||
.map { p => (p.serviceName, p) } | ||
.toMap | ||
} | ||
|
||
def isServiceEnabled(serviceName: String): Boolean = { | ||
val key = providerEnabledConfig.format(serviceName) | ||
|
||
deprecatedProviderEnabledConfigs.foreach { pattern => | ||
val deprecatedKey = pattern.format(serviceName) | ||
if (sparkConf.contains(deprecatedKey)) { | ||
logWarning(s"${deprecatedKey} is deprecated. Please use ${key} instead.") | ||
} | ||
} | ||
|
||
val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { pattern => | ||
sparkConf | ||
.getOption(pattern.format(serviceName)) | ||
.map(_.toBoolean) | ||
.getOrElse(true) | ||
} | ||
|
||
sparkConf | ||
.getOption(key) | ||
.map(_.toBoolean) | ||
.getOrElse(isEnabledDeprecated) | ||
} | ||
|
||
/** | ||
* Get delegation token provider for the specified service. | ||
*/ | ||
def getServiceDelegationTokenProvider(service: String): Option[HadoopDelegationTokenProvider] = { | ||
delegationTokenProviders.get(service) | ||
} | ||
|
||
/** | ||
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered | ||
* providers. | ||
* | ||
* @return Time after which the fetched delegation tokens should be renewed. | ||
*/ | ||
def obtainDelegationTokens( | ||
hadoopConf: Configuration, | ||
creds: Credentials): Long = { | ||
delegationTokenProviders.values.flatMap { provider => | ||
if (provider.delegationTokensRequired(hadoopConf)) { | ||
provider.obtainDelegationTokens(hadoopConf, creds) | ||
} else { | ||
logDebug(s"Service ${provider.serviceName} does not require a token." + | ||
s" Check your configuration to see if security is disabled or not.") | ||
None | ||
} | ||
}.foldLeft(Long.MaxValue)(math.min) | ||
} | ||
} |
50 changes: 50 additions & 0 deletions
50
core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenProvider.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.security | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.security.Credentials | ||
|
||
/** | ||
* Hadoop delegation token provider. | ||
*/ | ||
private[spark] trait HadoopDelegationTokenProvider { | ||
|
||
/** | ||
* Name of the service to provide delegation tokens. This name should be unique. Spark will | ||
* internally use this name to differentiate delegation token providers. | ||
*/ | ||
def serviceName: String | ||
|
||
/** | ||
* Returns true if delegation tokens are required for this service. By default, it is based on | ||
* whether Hadoop security is enabled. | ||
*/ | ||
def delegationTokensRequired(hadoopConf: Configuration): Boolean | ||
|
||
/** | ||
* Obtain delegation tokens for this service and get the time of the next renewal. | ||
* @param hadoopConf Configuration of current Hadoop Compatible system. | ||
* @param creds Credentials to add tokens and security keys to. | ||
* @return If the returned tokens are renewable and can be renewed, return the time of the next | ||
* renewal, otherwise None should be returned. | ||
*/ | ||
def obtainDelegationTokens( | ||
hadoopConf: Configuration, | ||
creds: Credentials): Option[Long] | ||
} |
126 changes: 126 additions & 0 deletions
126
core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.security | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.util.Try | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.fs.FileSystem | ||
import org.apache.hadoop.mapred.Master | ||
import org.apache.hadoop.security.{Credentials, UserGroupInformation} | ||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier | ||
|
||
import org.apache.spark.SparkException | ||
import org.apache.spark.internal.Logging | ||
|
||
private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Set[FileSystem]) | ||
extends HadoopDelegationTokenProvider with Logging { | ||
|
||
// This tokenRenewalInterval will be set in the first call to obtainDelegationTokens. | ||
// If None, no token renewer is specified or no token can be renewed, | ||
// so we cannot get the token renewal interval. | ||
private var tokenRenewalInterval: Option[Long] = null | ||
|
||
override val serviceName: String = "hadoopfs" | ||
|
||
override def obtainDelegationTokens( | ||
hadoopConf: Configuration, | ||
creds: Credentials): Option[Long] = { | ||
|
||
val newCreds = fetchDelegationTokens( | ||
getTokenRenewer(hadoopConf), | ||
fileSystems) | ||
|
||
// Get the token renewal interval if it is not set. It will only be called once. | ||
if (tokenRenewalInterval == null) { | ||
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems) | ||
} | ||
|
||
// Get the time of next renewal. | ||
val nextRenewalDate = tokenRenewalInterval.flatMap { interval => | ||
val nextRenewalDates = newCreds.getAllTokens.asScala | ||
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]) | ||
.map { token => | ||
val identifier = token | ||
.decodeIdentifier() | ||
.asInstanceOf[AbstractDelegationTokenIdentifier] | ||
identifier.getIssueDate + interval | ||
} | ||
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min) | ||
} | ||
|
||
creds.addAll(newCreds) | ||
nextRenewalDate | ||
} | ||
|
||
def delegationTokensRequired(hadoopConf: Configuration): Boolean = { | ||
UserGroupInformation.isSecurityEnabled | ||
} | ||
|
||
private def getTokenRenewer(hadoopConf: Configuration): String = { | ||
val tokenRenewer = Master.getMasterPrincipal(hadoopConf) | ||
logDebug("Delegation token renewer is: " + tokenRenewer) | ||
|
||
if (tokenRenewer == null || tokenRenewer.length() == 0) { | ||
val errorMessage = "Can't get Master Kerberos principal for use as renewer." | ||
logError(errorMessage) | ||
throw new SparkException(errorMessage) | ||
} | ||
|
||
tokenRenewer | ||
} | ||
|
||
private def fetchDelegationTokens( | ||
renewer: String, | ||
filesystems: Set[FileSystem]): Credentials = { | ||
|
||
val creds = new Credentials() | ||
|
||
filesystems.foreach { fs => | ||
logInfo("getting token for: " + fs) | ||
fs.addDelegationTokens(renewer, creds) | ||
} | ||
|
||
creds | ||
} | ||
|
||
private def getTokenRenewalInterval( | ||
hadoopConf: Configuration, | ||
filesystems: Set[FileSystem]): Option[Long] = { | ||
// We cannot use the tokens generated with renewer yarn. Trying to renew | ||
// those will fail with an access control issue. So create new tokens with the logged in | ||
// user as renewer. | ||
val creds = fetchDelegationTokens( | ||
UserGroupInformation.getCurrentUser.getUserName, | ||
filesystems) | ||
|
||
val renewIntervals = creds.getAllTokens.asScala.filter { | ||
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] | ||
}.flatMap { token => | ||
Try { | ||
val newExpiration = token.renew(hadoopConf) | ||
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] | ||
val interval = newExpiration - identifier.getIssueDate | ||
logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") | ||
interval | ||
}.toOption | ||
} | ||
if (renewIntervals.isEmpty) None else Some(renewIntervals.min) | ||
} | ||
} |
Oops, something went wrong.