Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20434][YARN][CORE] Move Hadoop delegation token code from yarn to core #17723

Closed
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ce63a9b
[Mesosphere SPARK-126] Move YarnSparkHadoopUtil token helpers into th…
Feb 10, 2016
75d849a
[Mesosphere SPARK-126] Add Mesos Kerberos support
Feb 10, 2016
35002f2
Par down kerberos support
Apr 17, 2017
13981c8
cleanup
Apr 17, 2017
af4a3e4
style
Apr 17, 2017
5cc66dc
Add MesosSecurityManager
Apr 18, 2017
a47c9c0
info logs
Apr 18, 2017
c8ec049
style
Apr 18, 2017
954eeff
Re-add org.apache.spark.deploy.yarn.security.ServiceCredentialProvide…
Apr 18, 2017
2d76928
move YARNHadoopFSCredentialProviderSuite
Apr 18, 2017
d8a968d
Move hive test deps to the core module
Apr 19, 2017
b8093c8
remove test scope
Apr 19, 2017
25d5088
remove test scope
Apr 19, 2017
4c387eb
Removed MesosSecurityManager, added RPC call, removed META-INF Servic…
Apr 20, 2017
e32afee
add InterfaceStability annotation to ServiceCredentialProvider
Apr 20, 2017
be69f5a
Add HadoopAccessManager
Apr 21, 2017
55616da
Remove mesos code
Apr 21, 2017
240df31
re-add mistakenly removed files
Apr 21, 2017
810c6b2
test ConfigurableCredentialManager.obtainUserTokens
Apr 21, 2017
ad4e33b
add tests
Apr 21, 2017
e15f1ab
rat-excludes
Apr 21, 2017
a546aab
fix RAT
Apr 21, 2017
d6d21d1
style
Apr 21, 2017
092aac7
Remove unneeded import
Apr 24, 2017
38adaae
Make ServiceCredentialProvider private
May 18, 2017
92ac3f0
Addressed style comments
May 18, 2017
cd58b6c
review comments
May 22, 2017
bf758e6
style
May 23, 2017
e820b09
Remove YARNHadoopAccessManagerSuite.scala
May 23, 2017
7f4ca86
Move thrifts deps back to yarn/pom.xml
May 31, 2017
cda3538
dependency testing
Jun 2, 2017
376dba0
Fix dependency issues, and address style comments
Jun 2, 2017
0ffe8f0
Fix scalastyle
Jun 2, 2017
7796e14
Add other deps to provided scope
Jun 2, 2017
1479c60
Replicate deps in yarn to fix transitivity issue
Jun 5, 2017
4d57f7b
update comments
Jun 6, 2017
7e2f90d
style
Jun 8, 2017
563b80a
Don't throw an exception when Hive classes are not loaded
Jun 8, 2017
c684d88
rename
Jun 12, 2017
c4149dd
fix docs
Jun 15, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,34 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>

<!--
Testing Hive reflection needs hive on the test classpath only.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that possible to avoid adding these dependencies to core? Does it make sense to do the test in org.apache.spark.sql.hive? cc @vanzin @mridulm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is in core, so the test should be in core.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we workaround it? It's weird to test code of sql module in hive module, but it's weirder to let sql module depends on hive module. If we can't work around it, I'd like to move the test to hive module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean. This is the core module, not the sql module.

Copy link
Contributor

@cloud-fan cloud-fan May 30, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry I made a mistake, but it's much weirder to let core module depend on hive module...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We went back in forth on this in the main thread, which is admittedly long and hard to follow at this point. Here's the same concern laid out by mridulm: #17723 (comment)

We ultimately decided that it would be acceptable to add these dependencies, so long as we don't expose them in the public Spark interfaces. Here's a comment to that effect: #17723 (comment)

The reason they're required is that this PR moves the Hadoop delegation token providers from the yarn module into the core module, so that other resource managers, such as Mesos, can also fetch delegation tokens to access secure hadoop services. One of those delegation token providers is Hive: https://github.com/apache/spark/pull/17723/files#diff-8be1059872a069d1bb4c5fc3ca394968

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, to clarify, this is not adding a dependency to the Spark hive module. It's adding a dependency to Apache Hive.

It doesn't need the spark hive modules, so the -Phive flag is not checked.
-->
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-exec</artifactId>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't know how to place these in the test scope, which is where they belong. See my comment here: https://github.com/apache/spark/pull/17665/files#r112337820

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I tried a few things, and the one that got me further was just having this:

    <dependency>
      <groupId>${hive.group}</groupId>
      <artifactId>hive-metastore</artifactId>
      <scope>test</scope>
    </dependency>

And nix the others. Adding the others in test scope caused some weird error in sbt, even with all dependencies (we have the dependencies you had problems with cached locally).

My comment was going to be to add that, then rewrite the code to use the metastore API instead of the Hive class from hive-exec... but then I noticed that test is not doing much, because there are no metastore servers to talk to. It's even there, hardcoded in the test:

  test("obtain tokens For HiveMetastore") {
    ...
    credentials.getAllTokens.size() should be (0)

All it seems to be doing is making sure the reflection-based code is not completely broken. That is something already, though.

So I have two suggestions, in order of preference:

  • add the dependencies in "provided" scope, and change the code to use actual types and not reflection. Because the classes may not exist at runtime, that means having to handle NoClassDefFoundError in creative ways.

  • keep the reflection code, and remove this test. Or maybe move it to a separate module as others have suggested.

I kinda like the first because it's always good to avoid reflection, and this is a particularly ugly use of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into it.

Do you know why reflection was used in the first place? Why not just add the Hive dependencies to compile scope? I'm thinking that's what we should do now, and drop reflection.

So I'm agreeing with your first bullet point, but proposing that we add the hive deps to compile rather than provided.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just add the Hive dependencies to compile scope?

Because technically Hive is an optional dependency for Spark, and moving it to compile scope would break that.

(Whether that should change or not is a separate discussion, but probably better not to have it as part of this change.)

Copy link
Contributor Author

@mgummelt mgummelt Jun 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright I added hive-exec to provided scope, and removed the reflection.

</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-metastore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I had introduced SparkHadoopUtil a while back was to prevent core from depending on yarn - ofcourse we also had to support hadoop 1.x at that time in addition to standalone, mesos, etc.

I would expect core should still not depend on yarn - did anything change in this regard ?
This dependency might bring in unnecessary restrictions in evolution of non yarn schedulers (through dependencies, etc).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Core depends on hadoop-client which depends on YARN already. So the explicit dependency here isn't really needed.

The distinction was useful when Spark supported Hadoop 1.x (no YARN), but right now, it's mostly theoretical - every Spark build has YARN classes, regardless of whether the YARN module is included.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing it out, I just noticed that hadoop-client deps have changed in the long interim since I last looked at it :-)

</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
org.apache.spark.deploy.security.HadoopFSCredentialProvider
org.apache.spark.deploy.security.HBaseCredentialProvider
org.apache.spark.deploy.security.HiveCredentialProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import java.util.ServiceLoader

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.{Credentials, UserGroupInformation}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.util.Utils

/**
* A ConfigurableCredentialManager to manage all the registered credential providers and offer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is now a little incorrect, because this is not extensible anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* APIs for other modules to obtain credentials as well as renewal time. By default
* [[HadoopFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
* be loaded in if not explicitly disabled, any plugged-in credential provider wants to be
* managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]]
* interface and put into resources/META-INF/services to be loaded by ServiceLoader.
*
* Also each credential provider is controlled by
* spark.security.credentials.{service}.enabled, it will not be loaded in if set to false.
* For example, Hive's credential provider [[HiveCredentialProvider]] can be enabled/disabled by
* the configuration spark.security.credentials.hive.enabled.
*/
private[spark] class ConfigurableCredentialManager(
sparkConf: SparkConf,
hadoopConf: Configuration,
hadoopAccessManager: HadoopAccessManager)
extends Logging {

private val deprecatedProviderEnabledConfigs = List(
"spark.yarn.security.tokens.%s.enabled",
"spark.yarn.security.credentials.%s.enabled")
private val providerEnabledConfig = "spark.security.credentials.%s.enabled"

// Maintain all the registered credential providers
private val credentialProviders = getCredentialProviders
logDebug(s"Using the following credential providers: ${credentialProviders.keys.mkString(", ")}.")

def this(sparkConf: SparkConf, hadoopConf: Configuration) {
this(sparkConf, hadoopConf, new DefaultHadoopAccessManager(hadoopConf))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove this constructor def this(sparkConf: SparkConf, hadoopConf: Configuration)? This is only used for the test cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be open to removing this one, but see below for why we should keep the other one.

}

def this(sparkConf: SparkConf) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also remove this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this(sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
}

private def getCredentialProviders(): Map[String, ServiceCredentialProvider] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

either remove the () (preferable) or add it to invocation in private val credentialProviders declaration above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

val providers = loadCredentialProviders

// Filter out credentials in which spark.security.credentials.{service}.enabled is false.
providers
.filter(p => isServiceEnabled(p.serviceName))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style is .filter { p => ... }

I see this in other places so please go through all your changes and fix all instances.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style is .filter { p => ... }, also in other places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed all occurrences I could find.

.map(p => (p.serviceName, p))
.toMap
}

protected def isServiceEnabled(serviceName: String): Boolean = {
val key = providerEnabledConfig.format(serviceName)

deprecatedProviderEnabledConfigs.foreach { pattern =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look correct. What I'd expect:

  • try the new key, if it is set, then don't print out anything.
  • if it's not set, then try the deprecated keys (in some order) and use the first that is set, printing a warning message

This block of code is printing a warning whenever the old settings exist, regardless of the new setting being configured; and in case the new setting is not configured, the message is misleading (since you are not using the new setting).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add to @vanzin's comment, if both new and old config's are present - it would be good to warn user.

Copy link
Contributor Author

@mgummelt mgummelt May 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case the new setting is not configured, the message is misleading (since you are not using the new setting).

Yea, this is actually how it worked previously, too, so I was erring on the side of retaining the old, incorrect behavior. I'll change the message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you merge this with the other code below? Something like:

deprecatedProviderEnabledConfigs
  .map {
    // get value and print deprecated message
  }
  .somethingThatReturnsTrueOrFalse

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep functional an non-functional code separate when possible. I can change it if you feel otherwise.

val deprecatedKey = pattern.format(serviceName)
if (sparkConf.contains(deprecatedKey)) {
logWarning(s"${deprecatedKey} is deprecated, using ${key} instead")
}
}

val isEnabledDeprecated = deprecatedProviderEnabledConfigs.forall { pattern =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: def

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? It would be a pure, 0-ary function, which is better represented as a val.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used only when key is not defined in SparkConf.

sparkConf
.getOption(pattern.format(serviceName))
.map(_.toBoolean)
.getOrElse(true)
}

sparkConf
.getOption(key)
.map(_.toBoolean)
.getOrElse(isEnabledDeprecated)
}

private def loadCredentialProviders: List[ServiceCredentialProvider] = {
ServiceLoader.load(classOf[ServiceCredentialProvider], Utils.getContextOrSparkClassLoader)
.asScala.toList
}

/**
* Get credential provider for the specified service.
*/
def getServiceCredentialProvider(service: String): Option[ServiceCredentialProvider] = {
credentialProviders.get(service)
}

/**
* Writes delegation tokens to creds. Delegation tokens are fetched from all registered
* providers.
*
* @return nearest time of next renewal, Long.MaxValue if all the credentials aren't renewable,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sentence is a little redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Fixed.

* otherwise the nearest renewal time of any credentials will be returned.
*/
def obtainCredentials(
hadoopConf: Configuration,
creds: Credentials): Long = {
credentialProviders.values.flatMap { provider =>
if (provider.credentialsRequired(hadoopConf)) {
provider.obtainCredentials(hadoopConf, hadoopAccessManager, creds)
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
None
}
}.foldLeft(Long.MaxValue)(math.min)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation

class DefaultHadoopAccessManager(hadoopConf: Configuration) extends HadoopAccessManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs private[spark]? (Or even plain private if just used in this package.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


def getTokenRenewer: String = {
UserGroupInformation.getCurrentUser.getShortUserName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, do we want to depend on UGI for spark security ?
I would expect Kubernetes or spark standalone or some future resource manager to now be necessarily compatible with it (I was actually surprised the mesos is).
+CC @rxin

Can we externalize use of UGI from core ?

Yes, it has leaked into core in a few places - which is unfortunate IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is resolved by our previous discussions.

}

def hadoopFSsToAccess: Set[Path] = {
Set(FileSystem.get(hadoopConf).getHomeDirectory)
}

def getTokenRenewalInterval: Option[Long] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add override for these three functions too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please write a comment to explain why we return None.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import scala.reflect.runtime.universe
import scala.util.control.NonFatal
Expand All @@ -34,7 +34,7 @@ private[security] class HBaseCredentialProvider extends ServiceCredentialProvide

override def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
hadoopAccessManager: HadoopAccessManager,
creds: Credentials): Option[Long] = {
try {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

/**
* Methods in [[HadoopAccessManager]] return scheduler-specific information related to how Hadoop
* delegation tokens should be fetched.
*/
private[spark] trait HadoopAccessManager {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any design doc about this new trait?

cc @kanzhang who is the original creator of delegation token concepts in Hadoop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No design doc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For such big PRs, we normally prefer to reviewing the design doc at first. When the PR does not have a design doc, it could slow down the review.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I'll make sure to do that in the future. We happened to already have most of this code written internally before upstreaming to Apache, which is why I submitted before going through a design review.

As for this trait, I introduced it in order to parameterize HadoopFSCredentialProvider to support different behavior on YARN than on Mesos. If you look at YARNHadoopAccessManager, you'll see that it uses YARN specific configuration variables.

I had originally tried to introduce both a YARNHadoopFSCredentialProvider and a DefaultHadoopFSCredentialProvider in order to support different behaviors, but as vanzin pointed out here: #17665 (comment), we can't service load different providers on YARN than on Mesos.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining it. Just need to understand the design of this trait. Why we pick these three functions here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just removed this entire trait. The purpose of the trait was to parameterize HadoopFSCredentialProvider with two things: the filesystems to access and the renewer. I've instead done this by adding filesystems to the constructor, and I'll probably do the same for the renewer in the Mesos PR, but for now, we can just leave the YARN behavior.


/** The user allowed to renew delegation tokens */
def getTokenRenewer: String

/** The renewal interval, or [[None]] if the token shouldn't be renewed */
def getTokenRenewalInterval: Option[Long]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A basic question. Conceptually, I am wondering why getTokenRenewalInterval is scheduler-specific information?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Mesos behaving differently from Yarn when getting token renewal intervals?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. They don't. I have it factored out like this because the code which computes it depends on spark.yarn.principal, which of course should be specific to YARN, but once Mesos keytab/principal support gets merged, that variable will likely be deprecated and replaced with spark.kerberos.principal or something.

I'll go ahead and remove this function from this trait and put it back in HadoopFSCredentialProvider. It will use spark.yarn.principal until we rename it later.


/** The set of hadoop file systems to fetch delegation tokens for */
def hadoopFSsToAccess: Set[Path]
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy.security

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.internal.Logging

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: too many blank lines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (I was in PEP8 mode)

private[deploy] class HadoopFSCredentialProvider
extends ServiceCredentialProvider with Logging {
// Token renewal interval, this value will be set in the first call,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

The token renewal interval will be set in the first call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

// if None means no token renewer specified or no token can be renewed,
// so cannot get token renewal interval.
private var tokenRenewalInterval: Option[Long] = null

override val serviceName: String = "hadoopfs"

override def obtainCredentials(
hadoopConf: Configuration,
hadoopAccessManager: HadoopAccessManager,
creds: Credentials): Option[Long] = {

// NameNode to access, used to get tokens from different FileSystems
val tmpCreds = new Credentials()
val tokenRenewer = hadoopAccessManager.getTokenRenewer
hadoopAccessManager.hadoopFSsToAccess.foreach { dst =>
val dstFs = dst.getFileSystem(hadoopConf)
logInfo("getting token for: " + dst)
dstFs.addDelegationTokens(tokenRenewer, tmpCreds)
}

// Get the token renewal interval if it is not set. It will only be called once.
if (tokenRenewalInterval == null) {
tokenRenewalInterval = hadoopAccessManager.getTokenRenewalInterval
}

// Get the time of next renewal.
val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
val nextRenewalDates = tmpCreds.getAllTokens.asScala
.filter(_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier])
.map { t =>
val identifier = t.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
identifier.getIssueDate + interval
}
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
}

creds.addAll(tmpCreds)
nextRenewalDate
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.deploy.yarn.security
package org.apache.spark.deploy.security

import java.lang.reflect.UndeclaredThrowableException
import java.security.PrivilegedExceptionAction
Expand Down Expand Up @@ -62,7 +62,7 @@ private[security] class HiveCredentialProvider extends ServiceCredentialProvider

override def obtainCredentials(
hadoopConf: Configuration,
sparkConf: SparkConf,
hadoopAccessManager: HadoopAccessManager,
creds: Credentials): Option[Long] = {
val conf = hiveConf(hadoopConf)

Expand Down
Loading