-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20434][YARN][CORE] Move Hadoop delegation token code from yarn to core #17723
Changes from 29 commits
ce63a9b
75d849a
35002f2
13981c8
af4a3e4
5cc66dc
a47c9c0
c8ec049
954eeff
2d76928
d8a968d
b8093c8
25d5088
4c387eb
e32afee
be69f5a
55616da
240df31
810c6b2
ad4e33b
e15f1ab
a546aab
d6d21d1
092aac7
38adaae
92ac3f0
cd58b6c
bf758e6
e820b09
7f4ca86
cda3538
376dba0
0ffe8f0
7796e14
1479c60
4d57f7b
7e2f90d
563b80a
c684d88
c4149dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -357,6 +357,34 @@ | |
<groupId>org.apache.commons</groupId> | ||
<artifactId>commons-crypto</artifactId> | ||
</dependency> | ||
|
||
<!-- | ||
Testing Hive reflection needs hive on the test classpath only. | ||
It doesn't need the spark hive modules, so the -Phive flag is not checked. | ||
--> | ||
<dependency> | ||
<groupId>${hive.group}</groupId> | ||
<artifactId>hive-exec</artifactId> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still don't know how to place these in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I tried a few things, and the one that got me further was just having this:
And nix the others. Adding the others in test scope caused some weird error in sbt, even with all dependencies (we have the dependencies you had problems with cached locally). My comment was going to be to add that, then rewrite the code to use the metastore API instead of the
All it seems to be doing is making sure the reflection-based code is not completely broken. That is something already, though. So I have two suggestions, in order of preference:
I kinda like the first because it's always good to avoid reflection, and this is a particularly ugly use of it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for looking into it. Do you know why reflection was used in the first place? Why not just add the Hive dependencies to compile scope? I'm thinking that's what we should do now, and drop reflection. So I'm agreeing with your first bullet point, but proposing that we add the hive deps to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Because technically Hive is an optional dependency for Spark, and moving it to compile scope would break that. (Whether that should change or not is a separate discussion, but probably better not to have it as part of this change.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright I added hive-exec to provided scope, and removed the reflection. |
||
</dependency> | ||
<dependency> | ||
<groupId>${hive.group}</groupId> | ||
<artifactId>hive-metastore</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.thrift</groupId> | ||
<artifactId>libthrift</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.thrift</groupId> | ||
<artifactId>libfb303</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-yarn-api</artifactId> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason why I had introduced I would expect There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Core depends on The distinction was useful when Spark supported Hadoop 1.x (no YARN), but right now, it's mostly theoretical - every Spark build has YARN classes, regardless of whether the YARN module is included. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing it out, I just noticed that hadoop-client deps have changed in the long interim since I last looked at it :-) |
||
</dependency> | ||
</dependencies> | ||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.security | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.fs.FileSystem | ||
import org.apache.hadoop.security.Credentials | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.internal.Logging | ||
|
||
/** | ||
* A ConfigurableCredentialManager to manage all the registered credential providers and offer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment is now a little incorrect, because this is not extensible anymore. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
* APIs for other modules to obtain credentials as well as renewal time. By default | ||
* [[HadoopFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will | ||
* be loaded in if not explicitly disabled, any plugged-in credential provider wants to be | ||
* managed by ConfigurableCredentialManager needs to implement [[HadoopDelegationTokenProvider]] | ||
* interface and put into resources/META-INF/services to be loaded by ServiceLoader. | ||
* | ||
* Also each credential provider is controlled by | ||
* spark.security.credentials.{service}.enabled, it will not be loaded in if set to false. | ||
* For example, Hive's credential provider [[HiveCredentialProvider]] can be enabled/disabled by | ||
* the configuration spark.security.credentials.hive.enabled. | ||
* | ||
* @param sparkConf Spark configuration | ||
* @param hadoopConf Hadoop configuration | ||
* @param fileSystems Delegation tokens will be fetched for these Hadoop filesystems. | ||
*/ | ||
private[spark] class ConfigurableCredentialManager( | ||
sparkConf: SparkConf, | ||
hadoopConf: Configuration, | ||
fileSystems: Set[FileSystem]) | ||
extends Logging { | ||
|
||
private val deprecatedProviderEnabledConfigs = List( | ||
"spark.yarn.security.tokens.%s.enabled", | ||
"spark.yarn.security.credentials.%s.enabled") | ||
private val providerEnabledConfig = "spark.security.credentials.%s.enabled" | ||
|
||
// Maintain all the registered credential providers | ||
private val credentialProviders = getCredentialProviders | ||
logDebug(s"Using the following credential providers: ${credentialProviders.keys.mkString(", ")}.") | ||
|
||
private def getCredentialProviders: Map[String, HadoopDelegationTokenProvider] = { | ||
val providers = List(new HadoopFSCredentialProvider(fileSystems), | ||
new HiveCredentialProvider, | ||
new HBaseCredentialProvider) | ||
|
||
// Filter out credentials in which spark.security.credentials.{service}.enabled is false. | ||
providers | ||
.filter(p => isServiceEnabled(p.serviceName)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: style is I see this in other places so please go through all your changes and fix all instances. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: style is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed all occurrences I could find. |
||
.map(p => (p.serviceName, p)) | ||
.toMap | ||
} | ||
|
||
def isServiceEnabled(serviceName: String): Boolean = { | ||
val key = providerEnabledConfig.format(serviceName) | ||
|
||
deprecatedProviderEnabledConfigs.foreach { pattern => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't look correct. What I'd expect:
This block of code is printing a warning whenever the old settings exist, regardless of the new setting being configured; and in case the new setting is not configured, the message is misleading (since you are not using the new setting). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To add to @vanzin's comment, if both new and old config's are present - it would be good to warn user. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yea, this is actually how it worked previously, too, so I was erring on the side of retaining the old, incorrect behavior. I'll change the message. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't you merge this with the other code below? Something like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to keep functional an non-functional code separate when possible. I can change it if you feel otherwise. |
||
val deprecatedKey = pattern.format(serviceName) | ||
if (sparkConf.contains(deprecatedKey)) { | ||
logWarning(s"${deprecatedKey} is deprecated. Please use ${key} instead.") | ||
} | ||
} | ||
|
||
val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { pattern => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why? It would be a pure, 0-ary function, which is better represented as a val. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be used only when |
||
sparkConf | ||
.getOption(pattern.format(serviceName)) | ||
.map(_.toBoolean) | ||
.getOrElse(true) | ||
} | ||
|
||
sparkConf | ||
.getOption(key) | ||
.map(_.toBoolean) | ||
.getOrElse(isEnabledDeprecated) | ||
} | ||
|
||
/** | ||
* Get credential provider for the specified service. | ||
*/ | ||
def getServiceCredentialProvider(service: String): Option[HadoopDelegationTokenProvider] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this If possible, we need to change the names and terms used in the class There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right. I made a few changes to naming:
and updated a bunch of comments. cc @vanzin. you might be interested in this since these renames are non-trivial |
||
credentialProviders.get(service) | ||
} | ||
|
||
/** | ||
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered | ||
* providers. | ||
* | ||
* @return Time after which the fetched delegation tokens should be renewed. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be more accurate to explain the first service provider that needs to renew. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is the most accurate and succinct explanation of the contract. Since we aren't returning the renewal time of all tokens, it is true that after the returned timeout, all tokens must be renewed. I could say "Time after which one of the returned tokens must be renewed", but this is a circuitous instruction to the user, since they actually must renew all. |
||
*/ | ||
def obtainCredentials( | ||
hadoopConf: Configuration, | ||
creds: Credentials): Long = { | ||
credentialProviders.values.flatMap { provider => | ||
if (provider.credentialsRequired(hadoopConf)) { | ||
provider.obtainCredentials(hadoopConf, creds) | ||
} else { | ||
logDebug(s"Service ${provider.serviceName} does not require a token." + | ||
s" Check your configuration to see if security is disabled or not.") | ||
None | ||
} | ||
}.foldLeft(Long.MaxValue)(math.min) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.yarn.security | ||
package org.apache.spark.deploy.security | ||
|
||
import scala.reflect.runtime.universe | ||
import scala.util.control.NonFatal | ||
|
@@ -24,17 +24,16 @@ import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.security.Credentials | ||
import org.apache.hadoop.security.token.{Token, TokenIdentifier} | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.util.Utils | ||
|
||
private[security] class HBaseCredentialProvider extends ServiceCredentialProvider with Logging { | ||
private[security] class HBaseCredentialProvider | ||
extends HadoopDelegationTokenProvider with Logging { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Shorten it to one line There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's too long now with the rename. |
||
|
||
override def serviceName: String = "hbase" | ||
|
||
override def obtainCredentials( | ||
hadoopConf: Configuration, | ||
sparkConf: SparkConf, | ||
creds: Credentials): Option[Long] = { | ||
try { | ||
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.security | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.security.Credentials | ||
|
||
/** | ||
* Hadoop delegation token provider. | ||
*/ | ||
private[spark] trait HadoopDelegationTokenProvider { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that this is private, I've reverted There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to update the PR description since it mentions deprecating the YARN interface still. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
||
/** | ||
* Name of the service to provide credentials. This name should unique, Spark internally will | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "should be" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
* use this name to differentiate credential provider. | ||
*/ | ||
def serviceName: String | ||
|
||
/** | ||
* Returns true if credentials are required for this service. By default, it is based on whether | ||
* Hadoop security is enabled. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It sounds like you update the comments in the original file, but you did not update the comment here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comments are identical. Can you be more specific? |
||
*/ | ||
def credentialsRequired(hadoopConf: Configuration): Boolean | ||
|
||
/** | ||
* Obtain credentials for this service and get the time of the next renewal. | ||
* @param hadoopConf Configuration of current Hadoop Compatible system. | ||
* @param creds Credentials to add tokens and security keys to. | ||
* @return If this Credential is renewable and can be renewed, return the time of the next | ||
* renewal, otherwise None should be returned. | ||
*/ | ||
def obtainCredentials( | ||
hadoopConf: Configuration, | ||
creds: Credentials): Option[Long] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.security | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.util.Try | ||
|
||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.fs.FileSystem | ||
import org.apache.hadoop.mapred.Master | ||
import org.apache.hadoop.security.{Credentials, UserGroupInformation} | ||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier | ||
|
||
import org.apache.spark.SparkException | ||
import org.apache.spark.internal.Logging | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: too many blank lines There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed (I was in PEP8 mode) |
||
|
||
private[deploy] class HadoopFSCredentialProvider(fileSystems: Set[FileSystem]) | ||
extends HadoopDelegationTokenProvider with Logging { | ||
// Token renewal interval, this value will be set in the first call, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
// if None means no token renewer specified or no token can be renewed, | ||
// so cannot get token renewal interval. | ||
private var tokenRenewalInterval: Option[Long] = null | ||
|
||
override val serviceName: String = "hadoopfs" | ||
|
||
override def obtainCredentials( | ||
hadoopConf: Configuration, | ||
creds: Credentials): Option[Long] = { | ||
|
||
val newCreds = fetchDelegationTokens( | ||
getTokenRenewer(hadoopConf), | ||
fileSystems) | ||
|
||
// Get the token renewal interval if it is not set. It will only be called once. | ||
if (tokenRenewalInterval == null) { | ||
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, fileSystems) | ||
} | ||
|
||
// Get the time of next renewal. | ||
val nextRenewalDate = tokenRenewalInterval.flatMap { interval => | ||
val nextRenewalDates = newCreds.getAllTokens.asScala | ||
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]) | ||
.map { token => | ||
val identifier = token | ||
.decodeIdentifier() | ||
.asInstanceOf[AbstractDelegationTokenIdentifier] | ||
identifier.getIssueDate + interval | ||
} | ||
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min) | ||
} | ||
|
||
creds.addAll(newCreds) | ||
nextRenewalDate | ||
} | ||
|
||
def credentialsRequired(hadoopConf: Configuration): Boolean = { | ||
UserGroupInformation.isSecurityEnabled | ||
} | ||
|
||
private def getTokenRenewer(hadoopConf: Configuration): String = { | ||
val tokenRenewer = Master.getMasterPrincipal(hadoopConf) | ||
logDebug("Delegation token renewer is: " + tokenRenewer) | ||
|
||
if (tokenRenewer == null || tokenRenewer.length() == 0) { | ||
val errorMessage = "Can't get Master Kerberos principal for use as renewer." | ||
logError(errorMessage) | ||
throw new SparkException(errorMessage) | ||
} | ||
|
||
tokenRenewer | ||
} | ||
|
||
private def fetchDelegationTokens( | ||
renewer: String, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: indent extra level |
||
filesystems: Set[FileSystem]): Credentials = { | ||
val creds = new Credentials() | ||
|
||
filesystems.foreach { fs => | ||
logInfo("getting token for: " + fs) | ||
fs.addDelegationTokens(renewer, creds) | ||
} | ||
|
||
creds | ||
} | ||
|
||
private def getTokenRenewalInterval( | ||
hadoopConf: Configuration, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: indent extra level |
||
filesystems: Set[FileSystem]): Option[Long] = { | ||
// We cannot use the tokens generated with renewer yarn. Trying to renew | ||
// those will fail with an access control issue. So create new tokens with the logged in | ||
// user as renewer. | ||
val creds = fetchDelegationTokens( | ||
UserGroupInformation.getCurrentUser.getUserName, | ||
filesystems) | ||
|
||
val renewIntervals = creds.getAllTokens.asScala.filter { | ||
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] | ||
}.flatMap { token => | ||
Try { | ||
val newExpiration = token.renew(hadoopConf) | ||
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] | ||
val interval = newExpiration - identifier.getIssueDate | ||
logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") | ||
interval | ||
}.toOption | ||
} | ||
if (renewIntervals.isEmpty) None else Some(renewIntervals.min) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,7 @@ | |
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.deploy.yarn.security | ||
package org.apache.spark.deploy.security | ||
|
||
import java.lang.reflect.UndeclaredThrowableException | ||
import java.security.PrivilegedExceptionAction | ||
|
@@ -29,11 +29,10 @@ import org.apache.hadoop.io.Text | |
import org.apache.hadoop.security.{Credentials, UserGroupInformation} | ||
import org.apache.hadoop.security.token.Token | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.util.Utils | ||
|
||
private[security] class HiveCredentialProvider extends ServiceCredentialProvider with Logging { | ||
private[security] class HiveCredentialProvider extends HadoopDelegationTokenProvider with Logging { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the only reason we need to make spark core depend on hive? My concern is that, previously it's possible for users to build spark without hive, but now it's impossible. Can we move this file to hive module and use reflection in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It's still possible to build (as in package) a Spark distribution without Hive. This code already uses reflection, and the new dependencies should be test-scope only (aside from any issues that still need to be solved). So you need Hive to run the tests, but you don't need Hive at runtime. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, those deps should be in test scope, but I ran into trouble doing that: https://github.com/apache/spark/pull/17723/files#r112788867 I'll go back and try again. Suggestions welcome. |
||
|
||
override def serviceName: String = "hive" | ||
|
||
|
@@ -62,7 +61,6 @@ private[security] class HiveCredentialProvider extends ServiceCredentialProvider | |
|
||
override def obtainCredentials( | ||
hadoopConf: Configuration, | ||
sparkConf: SparkConf, | ||
creds: Credentials): Option[Long] = { | ||
val conf = hiveConf(hadoopConf) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that possible to avoid adding these dependencies to core? Does it make sense to do the test in
org.apache.spark.sql.hive
? cc @vanzin @mridulmThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is in core, so the test should be in core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we workaround it? It's weird to test code of sql module in hive module, but it's weirder to let sql module depends on hive module. If we can't work around it, I'd like to move the test to hive module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what you mean. This is the core module, not the sql module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh sorry I made a mistake, but it's much weirder to let core module depend on hive module...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We went back in forth on this in the main thread, which is admittedly long and hard to follow at this point. Here's the same concern laid out by mridulm: #17723 (comment)
We ultimately decided that it would be acceptable to add these dependencies, so long as we don't expose them in the public Spark interfaces. Here's a comment to that effect: #17723 (comment)
The reason they're required is that this PR moves the Hadoop delegation token providers from the yarn module into the core module, so that other resource managers, such as Mesos, can also fetch delegation tokens to access secure hadoop services. One of those delegation token providers is Hive: https://github.com/apache/spark/pull/17723/files#diff-8be1059872a069d1bb4c5fc3ca394968
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, to clarify, this is not adding a dependency to the Spark hive module. It's adding a dependency to Apache Hive.