Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

[WIP] Use HDFS Delegation Token in driver/executor pods as part of Secure HDFS Support #379

Conversation

kimoonkim
Copy link
Member

@kimoonkim kimoonkim commented Jul 18, 2017

What changes were proposed in this pull request?

This is stage 3 of Secure HDFS Support, which is an on-going work of setting up Secure HDFS interaction with Spark-on-K8S. This change should be integrated with Stage 1 - 2, being implemented in other PRs.

The architecture is discussed in this community-wide google doc
This initiative can be broken down into 4 stages.

STAGE 1

  • Detecting HADOOP_CONF_DIR environmental variable and using Config Maps to store all Hadoop config files locally, while also setting HADOOP_CONF_DIR locally in the driver / executors

STAGE 2

  • Grabbing TGT from LTC or using keytabs+principle and creating a DT that will be mounted as a secret

STAGE 3

  • Driver + Executor Logic

STAGE 4

  • Service Pod that handles renewals

How was this patch tested?

Manually tested with the following steps. (Note step 1 - 3 and use of the pre-created secret below will go away when the submission client generates the secret automatically)

  1. Obtain an HDFS delegation token after signing on with Kerberos. (See this google doc for the source code of MyHdfsCredentialsUser)
$ kinit user1
$ hadoop com.pepperdata.MyHdfsCredentialsUser /tmp/hadoop-token-file
  1. Copy the /tmp/hadoop-token-file to your Macbook.

  2. Create a K8s secret from the file

$ kubectl create secret generic kimoon-hadoop-token-file --from-file hadoop-token-file
$ kubectl get secret -o json kimoon-hadoop-token-file
{
    "apiVersion": "v1",
    "data": {
        "hadoop-token-file": "SERUUwABDjEwLjMyLjAuNDo4MDIwLAAUdXNlcjFAUEVQUEVSREFUQS5DT00EeWFybgCKAV1Xc4d7igFde4ALewcVFEVJSbfCLNmgyRYhAewiuuMmHF8TFUhERlNfREVMRUdBVElPTl9UT0tFTg4xMC4zMi4wLjQ6ODAyMAA="
    },
    "kind": "Secret",
    "metadata": {
        "creationTimestamp": "2017-07-18T20:46:55Z",
        "name": "kimoon-hadoop-token-file",
        "namespace": "default",
        "resourceVersion": "22675361",
        "selfLink": "/api/v1/namespaces/default/secrets/kimoon-hadoop-token-file",
        "uid": "3c36588f-6bfa-11e7-9e51-02f2c310e88c"
    },
    "type": "Opaque"
}
  1. Finally, submit a job while specifying the secret name as a config key
$ /usr/local/spark-on-k8s/bin/spark-submit  \
  --class org.apache.spark.examples.HdfsTest  \
  --conf spark.app.name=spark-hdfs  \
  --conf spark.executor.instances=1  \
  --conf spark.hadoop.fs.defaultFS=hdfs://10.32.0.4:8020  \
  --conf spark.kubernetes.mounted.hadoopSecret=kimoon-hadoop-token-file  \
   local:///opt/spark/examples/jars/spark-examples_2.11-2.1.0-k8s-0.2.0-SNAPSHOT.jar  \
  /tmp/etc-hosts

From the debug-enabled log, we can see "TOKEN" is used as authentication mechanism for the secure namenode.

17/07/18 22:54:45 DEBUG SaslRpcClient: Sending sasl message state: NEGOTIATE
17/07/18 22:54:45 DEBUG SaslRpcClient: Received SASL message state: NEGOTIATE
auths {
method: "TOKEN"
mechanism: "DIGEST-MD5"
protocol: ""
serverId: "default"
challenge: "realm="default",nonce="HwXTpS3npEOeYBV/MK9D97oPqKg+ita8HfttmdMk",qop="auth",charset=utf-8,algorithm=md5-sess"
}
auths {
method: "KERBEROS"
mechanism: "GSSAPI"
protocol: "hdfs"
serverId: "cdh58-kimoon-rc-3839w"
}
17/07/18 22:54:45 DEBUG SaslRpcClient: Get token info proto:interface org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB info:@org.apache.hadoop.security.token.TokenInfo(value=class org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector)
17/07/18 22:54:45 DEBUG SaslRpcClient: Creating SASL DIGEST-MD5(TOKEN) client to authenticate to service at default
17/07/18 22:54:45 DEBUG SaslRpcClient: Use TOKEN authentication for protocol ClientNamenodeProtocolPB
17/07/18 22:54:45 DEBUG SaslRpcClient: SASL client callback: setting username: ABR1c2VyMUBQRVBQRVJEQVRBLkNPTQR5YXJuAIoBXVdzh3uKAV17gAt7BxU=
17/07/18 22:54:45 DEBUG SaslRpcClient: SASL client callback: setting userPassword
17/07/18 22:54:45 DEBUG SaslRpcClient: SASL client callback: setting realm: default
17/07/18 22:54:45 DEBUG SaslRpcClient: Sending sasl message state: INITIATE
token: "charset=utf-8,username="ABR1c2VyMUBQRVBQRVJEQVRBLkNPTQR5YXJuAIoBXVdzh3uKAV17gAt7BxU=",realm="default",nonce="HwXTpS3npEOeYBV/MK9D97oPqKg+ita8HfttmdMk",nc=00000001,cnonce="kgMSsxud74r2fQr9/rURFPIW6gP++5IMj+0WxgEz",digest-uri="/default",maxbuf=65536,response=7a79b9ba186f3e84f2c5c90f3e7d49d8,qop=auth"
auths {
method: "TOKEN"
mechanism: "DIGEST-MD5"
protocol: ""
serverId: "default"
}
17/07/18 22:54:45 DEBUG SaslRpcClient: Received SASL message state: SUCCESS

The job will fail without the DT secret. Here's the failing job's log where the driver can't use either TOKEN nor KERBEROS authentication. Note the driver pod is not configured to enable Kerberos, which would be ok as long as there is a valid DT like the successful job run above:

Exception in thread "main" org.apache.hadoop.security.AccessControlException: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2110)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1317)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:381)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:506)
at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:486)
at org.apache.spark.examples.HdfsTest$.main(HdfsTest.scala:36)
at org.apache.spark.examples.HdfsTest.main(HdfsTest.scala)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
at org.apache.hadoop.ipc.Client.call(Client.java:1475)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy31.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy32.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
... 18 more

@ifilonenko ifilonenko self-requested a review July 18, 2017 23:36
maybeMountedHadoopSecret, driverSpec.driverContainer)
driverSpec.copy(
driverPod = podWithMountedHadoopToken,
otherKubernetesResources = driverSpec.otherKubernetesResources,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary to add since you are doing a .copy()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Will update the code in the next diff.

private val maybeMountedHadoopSecret = submissionSparkConf.getOption(MOUNTED_HADOOP_SECRET_CONF)

override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val podWithMountedHadoopToken = HadoopSecretUtil.configurePod(maybeMountedHadoopSecret,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: arguments should be one on each line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address this in the next diff.

@@ -69,6 +69,8 @@ package object config extends Logging {
private[spark] val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
private[spark] val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"

private[spark] val MOUNTED_HADOOP_SECRET_CONF = "spark.kubernetes.mounted.hadoopSecret"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep consistent with mine. I called this spark.kubernetes.kerberos._

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not use "kerberos" in the name because Hadoop delegation tokens are relatively independent of kerberos. And also we should indicate this is related with "hadoop", as opposed to spark core.

@@ -43,6 +43,13 @@ package object constants {
s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME"
private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials"

// Hadoop credentials secrets for the Spark app.
private[spark] val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put these into /etc/hadoop/ so that only root access will be allowed to this folder. thoughts and reasoning for location? Also, should this be customizable in config.scala

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In YARN, the token file is placed into a work dir for the executors. So I don't think /etc/hadoop is a good location. And root access or not is irrelevant in pods since everything runs as root.

I chose this dir because we have the other code using it. I think it's better to follow the convention here. From line 30 above:

private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR =
     "/mnt/secrets/spark-kubernetes-credentials"

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/mnt/secrets is good for the credentials. Use /etc/hadoop for configuration files.

@@ -94,6 +94,7 @@ private[spark] class DriverConfigurationStepsOrchestrator(
submissionSparkConf)
val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
submissionSparkConf, kubernetesResourceNamePrefix)
val hadoopCredentialsStep = new DriverHadoopCredentialsStep(submissionSparkConf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to be slightly refactored after merging this in after PR 373 because of the hadoopStepsOrchestrator that is leveraged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know. Thanks.


import org.apache.spark.deploy.kubernetes.constants._

object HadoopSecretUtil {
Copy link
Member

@ifilonenko ifilonenko Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for why this needs to be a seperate util and cant just be as part of a step. where is this being re-used?

Copy link
Member

@ifilonenko ifilonenko Jul 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on another note the pod and container will be shared by the PodWithDetachedMainContainer() so it will need to be refactored when PRs are merged

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used by both the driver and executors.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #373 for how one can share a module between the driver and the executors. But we also can think about how to do this better since the abstraction is a little difficult to reason about right now. We do a similar operation with setting the init-container bootstrap on the executors.

val podWithMountedHadoopToken = HadoopSecretUtil.configurePod(maybeMountedHadoopSecret,
driverSpec.driverPod)
val containerWithMountedHadoopToken = HadoopSecretUtil.configureContainer(
maybeMountedHadoopSecret, driverSpec.driverContainer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above, would be cleaner to write a PodWithDetachedContainer() that shares the container and pod so you wouldn't need to call this twice this way. I wrote one in PR 373 called PodWithMainContainer()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the suggested plan.

val executorPodWithMountedHadoopToken = HadoopSecretUtil.configurePod(maybeMountedHadoopSecret,
executorPodWithNodeAffinity)
val containerWithMountedHadoopToken = HadoopSecretUtil.configureContainer(
maybeMountedHadoopSecret, initBootstrappedExecutorContainer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above with detachedContainer logic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend to leverage a Bootstrap method similair to how SparkPodInitBootStrap functions that could be passed in as an Option

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This builder code would deserve some refactoring. But I'd like to put it off until the prototype works end-to-end and we see clearly what is needed.

// unless other use is found.
private[spark] val MOUNTED_HADOOP_SECRET_CONF =
ConfigBuilder("spark.kubernetes.mounted.hadoopSecret")
.doc("Use a Kubernetes secret containing Hadoop tokens such as an HDFS delegation token." +
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make the key configurable as well so that it doesn't always have to be "hadoop-token-file".

@kimoonkim
Copy link
Member Author

rerun unit tests please

@ifilonenko
Copy link
Member

After rebasing to apache-spark-on-k8s:hdfs-kerberos-support. This is fine to merge as #391 will be refactoring this to bring together the first three stages together.

@kimoonkim kimoonkim changed the base branch from branch-2.1-kubernetes to hdfs-kerberos-support August 1, 2017 22:37
@kimoonkim
Copy link
Member Author

Rebased to hdfs-kerberos-support.

@ifilonenko ifilonenko merged commit a004888 into apache-spark-on-k8s:hdfs-kerberos-support Aug 1, 2017
ifilonenko pushed a commit to ifilonenko/spark that referenced this pull request Feb 26, 2019
Follow up to jackson 2.9.5 upgrade where RDDOperationScope can't find the serializer for the Option (parent variable) and serializes it using defaults. This force jackson to use static typing, i.e. use the class types and not values determined using reflection at runtime
ifilonenko pushed a commit to ifilonenko/spark that referenced this pull request Feb 26, 2019
Follow up to jackson 2.9.5 upgrade where RDDOperationScope can't find the serializer for the Option (parent variable) and serializes it using defaults. This force jackson to use static typing, i.e. use the class types and not values determined using reflection at runtime
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants